Commit 0602cef3 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Factorize code for checking node daemon certificate



This code is going to be used by a new utility for setting up the node
daemon. Unit tests are updated/added.

Additionally, the certificate and key stored in “server.pem” are
verified, too.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarGuido Trotter <ultrotter@google.com>
parent 686d24f0
......@@ -27,7 +27,6 @@ import os.path
import optparse
import sys
import logging
import errno
import OpenSSL
from ganeti import cli
......@@ -94,47 +93,28 @@ def VerifyOptions(parser, opts, args):
return opts
def _VerifyCertificate(cert, _noded_cert_file=pathutils.NODED_CERT_FILE):
def _VerifyCertificate(cert_pem, _check_fn=utils.CheckNodeCertificate):
"""Verifies a certificate against the local node daemon certificate.
@type cert: string
@param cert: Certificate in PEM format (no key)
@type cert_pem: string
@param cert_pem: Certificate in PEM format (no key)
"""
try:
OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, cert)
OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, cert_pem)
except OpenSSL.crypto.Error, err:
pass
else:
raise JoinError("No private key may be given")
try:
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
cert = \
OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_pem)
except Exception, err:
raise errors.X509CertError("(stdin)",
"Unable to load certificate: %s" % err)
try:
noded_pem = utils.ReadFile(_noded_cert_file)
except EnvironmentError, err:
if err.errno != errno.ENOENT:
raise
logging.debug("Local node certificate was not found (file %s)",
_noded_cert_file)
return
try:
key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, noded_pem)
except Exception, err:
raise errors.X509CertError(_noded_cert_file,
"Unable to load private key: %s" % err)
check_fn = utils.PrepareX509CertKeyCheck(cert, key)
try:
check_fn()
except OpenSSL.SSL.Error:
raise JoinError("Given cluster certificate does not match local key")
_check_fn(cert)
def VerifyCertificate(data, _verify_fn=_VerifyCertificate):
......
......@@ -27,9 +27,12 @@ import OpenSSL
import re
import datetime
import calendar
import errno
import logging
from ganeti import errors
from ganeti import constants
from ganeti import pathutils
from ganeti.utils import text as utils_text
from ganeti.utils import io as utils_io
......@@ -338,3 +341,58 @@ def PrepareX509CertKeyCheck(cert, key):
ctx.use_certificate(cert)
return ctx.check_privatekey
def CheckNodeCertificate(cert, _noded_cert_file=pathutils.NODED_CERT_FILE):
"""Checks the local node daemon certificate against given certificate.
Both certificates must be signed with the same key (as stored in the local
L{pathutils.NODED_CERT_FILE} file). No error is raised if no local
certificate can be found.
@type cert: OpenSSL.crypto.X509
@param cert: X509 certificate object
@raise errors.X509CertError: When an error related to X509 occurred
@raise errors.GenericError: When the verification failed
"""
try:
noded_pem = utils_io.ReadFile(_noded_cert_file)
except EnvironmentError, err:
if err.errno != errno.ENOENT:
raise
logging.debug("Node certificate file '%s' was not found", _noded_cert_file)
return
try:
noded_cert = \
OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, noded_pem)
except Exception, err:
raise errors.X509CertError(_noded_cert_file,
"Unable to load certificate: %s" % err)
try:
noded_key = \
OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, noded_pem)
except Exception, err:
raise errors.X509CertError(_noded_cert_file,
"Unable to load private key: %s" % err)
# Check consistency of server.pem file
check_fn = PrepareX509CertKeyCheck(noded_cert, noded_key)
try:
check_fn()
except OpenSSL.SSL.Error:
# This should never happen as it would mean the certificate in server.pem
# is out of sync with the private key stored in the same file
raise errors.X509CertError(_noded_cert_file,
"Certificate does not match with private key")
# Check with supplied certificate with local key
check_fn = PrepareX509CertKeyCheck(cert, noded_key)
try:
check_fn()
except OpenSSL.SSL.Error:
raise errors.GenericError("Given cluster certificate does not match"
" local key")
......@@ -72,49 +72,27 @@ class TestVerifyCertificate(testutils.GanetiTestCase):
def testNoCert(self):
prepare_node_join.VerifyCertificate({}, _verify_fn=NotImplemented)
def testMismatchingKey(self):
other_cert = self._TestDataFilename("cert1.pem")
node_cert = self._TestDataFilename("cert2.pem")
self.assertRaises(_JoinError, prepare_node_join._VerifyCertificate,
utils.ReadFile(other_cert), _noded_cert_file=node_cert)
def testGivenPrivateKey(self):
cert_filename = self._TestDataFilename("cert2.pem")
cert_pem = utils.ReadFile(cert_filename)
self.assertRaises(_JoinError, prepare_node_join._VerifyCertificate,
cert_pem, _noded_cert_file=cert_filename)
def testMatchingKey(self):
cert_filename = self._TestDataFilename("cert2.pem")
# Extract certificate
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
utils.ReadFile(cert_filename))
cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
cert)
prepare_node_join._VerifyCertificate(cert_pem,
_noded_cert_file=cert_filename)
def testMissingFile(self):
cert = self._TestDataFilename("cert1.pem")
nodecert = utils.PathJoin(self.tmpdir, "does-not-exist")
prepare_node_join._VerifyCertificate(utils.ReadFile(cert),
_noded_cert_file=nodecert)
cert_pem, _check_fn=NotImplemented)
def testInvalidCertificate(self):
self.assertRaises(errors.X509CertError,
prepare_node_join._VerifyCertificate,
"Something that's not a certificate",
_noded_cert_file=NotImplemented)
_check_fn=NotImplemented)
def testNoPrivateKey(self):
cert = self._TestDataFilename("cert1.pem")
self.assertRaises(errors.X509CertError,
prepare_node_join._VerifyCertificate,
utils.ReadFile(cert), _noded_cert_file=cert)
@staticmethod
def _Check(cert):
assert cert.get_subject()
def testSuccessfulCheck(self):
cert_filename = self._TestDataFilename("cert1.pem")
cert_pem = utils.ReadFile(cert_filename)
prepare_node_join._VerifyCertificate(cert_pem, _check_fn=self._Check)
class TestVerifyClusterName(unittest.TestCase):
......
......@@ -287,5 +287,89 @@ class TestGenerateSelfSignedX509Cert(unittest.TestCase):
self.assert_(self._checkCertificate(cert1))
class TestCheckNodeCertificate(testutils.GanetiTestCase):
def setUp(self):
testutils.GanetiTestCase.setUp(self)
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
testutils.GanetiTestCase.tearDown(self)
shutil.rmtree(self.tmpdir)
def testMismatchingKey(self):
other_cert = self._TestDataFilename("cert1.pem")
node_cert = self._TestDataFilename("cert2.pem")
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
utils.ReadFile(other_cert))
try:
utils.CheckNodeCertificate(cert, _noded_cert_file=node_cert)
except errors.GenericError, err:
self.assertEqual(str(err),
"Given cluster certificate does not match local key")
else:
self.fail("Exception was not raised")
def testMatchingKey(self):
cert_filename = self._TestDataFilename("cert2.pem")
# Extract certificate
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
utils.ReadFile(cert_filename))
cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
cert)
utils.CheckNodeCertificate(cert, _noded_cert_file=cert_filename)
def testMissingFile(self):
cert_path = self._TestDataFilename("cert1.pem")
nodecert = utils.PathJoin(self.tmpdir, "does-not-exist")
utils.CheckNodeCertificate(NotImplemented, _noded_cert_file=nodecert)
self.assertFalse(os.path.exists(nodecert))
def testInvalidCertificate(self):
tmpfile = utils.PathJoin(self.tmpdir, "cert")
utils.WriteFile(tmpfile, data="not a certificate")
self.assertRaises(errors.X509CertError, utils.CheckNodeCertificate,
NotImplemented, _noded_cert_file=tmpfile)
def testNoPrivateKey(self):
cert = self._TestDataFilename("cert1.pem")
self.assertRaises(errors.X509CertError, utils.CheckNodeCertificate,
NotImplemented, _noded_cert_file=cert)
def testMismatchInNodeCert(self):
cert1_path = self._TestDataFilename("cert1.pem")
cert2_path = self._TestDataFilename("cert2.pem")
tmpfile = utils.PathJoin(self.tmpdir, "cert")
# Extract certificate
cert1 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
utils.ReadFile(cert1_path))
cert1_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
cert1)
# Extract mismatching key
key2 = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
utils.ReadFile(cert2_path))
key2_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM,
key2)
# Write to file
utils.WriteFile(tmpfile, data=cert1_pem + key2_pem)
try:
utils.CheckNodeCertificate(cert1, _noded_cert_file=tmpfile)
except errors.X509CertError, err:
self.assertEqual(err.args,
(tmpfile, "Certificate does not match with private key"))
else:
self.fail("Exception was not raised")
if __name__ == "__main__":
testutils.GanetiTestProgram()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment