Commit 01fa4e6c authored by Helga Velroyen's avatar Helga Velroyen
Browse files

Unit test for LURenewCrypto's valid case



This adds a unit test which tests a successful run of
LURenewCrypto. While writing this, some options for
improvement became apparent and are fixed in this patch
as well.
Signed-off-by: default avatarHelga Velroyen <helgav@google.com>
Reviewed-by: default avatarPetr Pudlak <pudlak@google.com>
parent 890b7bfb
......@@ -129,12 +129,14 @@ class LUClusterRenewCrypto(NoHooksLU):
except IOError:
logging.info("No old certificate available.")
new_master_digest = _UpdateMasterClientCert(self, master_uuid, cluster,
feedback_fn)
# Technically it should not be necessary to set the cert
# paths. However, due to a bug in the mock library, we
# have to do this to be able to test the function properly.
_UpdateMasterClientCert(
self, master_uuid, cluster, feedback_fn,
client_cert=pathutils.NODED_CLIENT_CERT_FILE,
client_cert_tmp=pathutils.NODED_CLIENT_CERT_FILE_TMP)
utils.AddNodeToCandidateCerts(master_uuid,
new_master_digest,
cluster.candidate_certs)
nodes = self.cfg.GetAllNodesInfo()
for (node_uuid, node_info) in nodes.items():
if node_info.offline:
......
......@@ -37,6 +37,8 @@ import OpenSSL
import copy
import unittest
import operator
import shutil
import os
from ganeti.cmdlib import cluster
from ganeti import constants
......@@ -2264,5 +2266,65 @@ class TestLUClusterVerifyDisks(CmdlibTestCase):
self.assertEqual(1, len(result["jobs"]))
class TestLUClusterRenewCrypto(CmdlibTestCase):
def setUp(self):
super(TestLUClusterRenewCrypto, self).setUp()
self._node_cert = self._CreateTempFile()
shutil.copy(testutils.TestDataFilename("cert1.pem"), self._node_cert)
self._client_node_cert = self._CreateTempFile()
shutil.copy(testutils.TestDataFilename("cert2.pem"), self._client_node_cert)
self._client_node_cert_tmp = self._CreateTempFile()
def tearDown(self):
super(TestLUClusterRenewCrypto, self).tearDown()
def _GetFakeDigest(self, uuid):
"""Creates a fake SSL digest depending on the UUID of a node.
@type uuid: string
@param uuid: node UUID
@returns: a string impersonating a SSL digest
"""
return "FA:KE:%s:%s:%s:%s" % (uuid[0:2], uuid[2:4], uuid[4:6], uuid[6:8])
@patchPathutils("cluster")
def testSuccessfulCase(self, pathutils):
# patch pathutils to point to temporary files
pathutils.NODED_CERT_FILE = self._node_cert
pathutils.NODED_CLIENT_CERT_FILE = self._client_node_cert
pathutils.NODED_CLIENT_CERT_FILE_TMP = \
self._client_node_cert_tmp
# create a few non-master, online nodes
num_nodes = 3
for _ in range(num_nodes):
self.cfg.AddNewNode()
# make sure the RPC calls are successful for all nodes
self.rpc.call_node_crypto_tokens = \
lambda node_uuid, _: self.RpcResultsBuilder() \
.CreateSuccessfulNodeResult(node_uuid,
[(constants.CRYPTO_TYPE_SSL_DIGEST, self._GetFakeDigest(node_uuid))])
op = opcodes.OpClusterRenewCrypto()
self.ExecOpCode(op)
# Check if the correct certificates exist and don't exist on the master
self.assertTrue(os.path.exists(pathutils.NODED_CERT_FILE))
self.assertTrue(os.path.exists(pathutils.NODED_CLIENT_CERT_FILE))
self.assertFalse(os.path.exists(pathutils.NODED_CLIENT_CERT_FILE_TMP))
# Check if we have the correct digests in the configuration
cluster = self.cfg.GetClusterInfo()
self.assertEqual(num_nodes + 1, len(cluster.candidate_certs))
nodes = self.cfg.GetAllNodesInfo()
for (node_uuid, _) in nodes.items():
expected_digest = self._GetFakeDigest(node_uuid)
self.assertEqual(expected_digest, cluster.candidate_certs[node_uuid])
if __name__ == "__main__":
testutils.GanetiTestProgram()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment