diff --git a/lib/utils/x509.py b/lib/utils/x509.py index b0d9f904c4a3ce63f69a943129103ac6d863f62e..0a91f41facfa8ff7b48ccba0fd4fdd481a6d8d52 100644 --- a/lib/utils/x509.py +++ b/lib/utils/x509.py @@ -1,7 +1,7 @@ # # -# Copyright (C) 2006, 2007, 2010, 2011 Google Inc. +# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -180,8 +180,10 @@ def VerifyX509Certificate(cert, warn_days, error_days): # Depending on the pyOpenSSL version, this can just return (None, None) (not_before, not_after) = GetX509CertValidity(cert) + now = time.time() + constants.NODE_MAX_CLOCK_SKEW + return _VerifyCertificateInner(cert.has_expired(), not_before, not_after, - time.time(), warn_days, error_days) + now, warn_days, error_days) def SignX509Certificate(cert, key, salt): diff --git a/test/ganeti.utils.x509_unittest.py b/test/ganeti.utils.x509_unittest.py index 2249e896c7ec16466bdc1ef1a9985732c59cd508..40425d214b1b31a220bfd0359115376e014c965e 100755 --- a/test/ganeti.utils.x509_unittest.py +++ b/test/ganeti.utils.x509_unittest.py @@ -1,7 +1,7 @@ #!/usr/bin/python # -# Copyright (C) 2006, 2007, 2010, 2011 Google Inc. +# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -164,6 +164,43 @@ class TestCertVerification(testutils.GanetiTestCase): # Not checking return value as this certificate is expired utils.VerifyX509Certificate(cert, 30, 7) + @staticmethod + def _GenCert(key, before, validity): + # Urgh... mostly copied from x509.py :( + + # Create self-signed certificate + cert = OpenSSL.crypto.X509() + cert.set_serial_number(1) + if before != 0: + cert.gmtime_adj_notBefore(int(before)) + cert.gmtime_adj_notAfter(validity) + cert.set_issuer(cert.get_subject()) + cert.set_pubkey(key) + cert.sign(key, constants.X509_CERT_SIGN_DIGEST) + return cert + + def testClockSkew(self): + SKEW = constants.NODE_MAX_CLOCK_SKEW + # Create private and public key + key = OpenSSL.crypto.PKey() + key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS) + + validity = 7 * 86400 + # skew small enough, accepting cert; note that this is a timed + # test, and could fail if the machine is so loaded that the next + # few lines take more than NODE_MAX_CLOCK_SKEW / 2 + for before in [-1, 0, SKEW / 4, SKEW / 2]: + cert = self._GenCert(key, before, validity) + result = utils.VerifyX509Certificate(cert, 1, 2) + self.assertEqual(result, (None, None)) + + # skew too great, not accepting certs + for before in [SKEW + 1, SKEW * 2, SKEW * 10]: + cert = self._GenCert(key, before, validity) + (status, msg) = utils.VerifyX509Certificate(cert, 1, 2) + self.assertEqual(status, utils.CERT_WARNING) + self.assertTrue(msg.startswith("Certificate not yet valid")) + class TestVerifyCertificateInner(unittest.TestCase): def test(self):