From 27e46076de61a7c064aa83f94b06d829265ec558 Mon Sep 17 00:00:00 2001 From: Michael Hanselmann <hansmi@google.com> Date: Tue, 23 Feb 2010 17:09:03 +0100 Subject: [PATCH] utils: Add function to extract X509 cert validity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit X509 uses ASN1 GENERALIZEDTIME or UTCTIME to store certificate validity. pyOpenSSL 0.7 and above allow us to retrieve both βnotBeforeβ and βnotAfterβ as strings. Parsing them turned out to be a challenge since they can be in a variety of formats (YYYYMMDDhhmmssZ, YYYYMMDDhhmmss+hhmm or YYYMMDDhhmmss-hhmm). This will be used to verify the validity of cluster certificates in LUVerifyCluster. Signed-off-by: Michael Hanselmann <hansmi@google.com> Reviewed-by: Iustin Pop <iustin@google.com> --- Makefile.am | 1 + lib/utils.py | 65 +++++++++++++++++++++++++++++++++++ test/data/cert1.pem | 14 ++++++++ test/ganeti.utils_unittest.py | 65 +++++++++++++++++++++++++++++++++++ 4 files changed, 145 insertions(+) create mode 100644 test/data/cert1.pem diff --git a/Makefile.am b/Makefile.am index 880007db8..3ad319b20 100644 --- a/Makefile.am +++ b/Makefile.am @@ -313,6 +313,7 @@ TEST_FILES = \ test/data/bdev-8.3-both.txt \ test/data/bdev-disk.txt \ test/data/bdev-net.txt \ + test/data/cert1.pem \ test/data/proc_drbd8.txt \ test/data/proc_drbd80-emptyline.txt \ test/data/proc_drbd83.txt diff --git a/lib/utils.py b/lib/utils.py index 014475524..5f80279ab 100644 --- a/lib/utils.py +++ b/lib/utils.py @@ -43,6 +43,8 @@ import resource import logging import logging.handlers import signal +import datetime +import calendar from cStringIO import StringIO @@ -2026,6 +2028,69 @@ def TailFile(fname, lines=20): return rows[-lines:] +def _ParseAsn1Generalizedtime(value): + """Parses an ASN1 GENERALIZEDTIME timestamp as used by pyOpenSSL. + + @type value: string + @param value: ASN1 GENERALIZEDTIME timestamp + + """ + m = re.match(r"^(\d+)([-+]\d\d)(\d\d)$", value) + if m: + # We have an offset + asn1time = m.group(1) + hours = int(m.group(2)) + minutes = int(m.group(3)) + utcoffset = (60 * hours) + minutes + else: + if not value.endswith("Z"): + raise ValueError("Missing timezone") + asn1time = value[:-1] + utcoffset = 0 + + parsed = time.strptime(asn1time, "%Y%m%d%H%M%S") + + tt = datetime.datetime(*(parsed[:7])) - datetime.timedelta(minutes=utcoffset) + + return calendar.timegm(tt.utctimetuple()) + + +def GetX509CertValidity(cert): + """Returns the validity period of the certificate. + + @type cert: OpenSSL.crypto.X509 + @param cert: X509 certificate object + + """ + # The get_notBefore and get_notAfter functions are only supported in + # pyOpenSSL 0.7 and above. + try: + get_notbefore_fn = cert.get_notBefore + except AttributeError: + not_before = None + else: + not_before_asn1 = get_notbefore_fn() + + if not_before_asn1 is None: + not_before = None + else: + not_before = _ParseAsn1Generalizedtime(not_before_asn1) + + try: + get_notafter_fn = cert.get_notAfter + except AttributeError: + not_after = None + else: + not_after_asn1 = get_notafter_fn() + + if not_after_asn1 is None: + not_after = None + else: + not_after = _ParseAsn1Generalizedtime(not_after_asn1) + + return (not_before, not_after) + + def SafeEncode(text): """Return a 'safe' version of a source string. diff --git a/test/data/cert1.pem b/test/data/cert1.pem new file mode 100644 index 000000000..3c6b59c1c --- /dev/null +++ b/test/data/cert1.pem @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICKzCCAdWgAwIBAgIJALdZsXwXOtW7MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV +BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX +aWRnaXRzIFB0eSBMdGQwHhcNMTAwMjIzMTAxMjQ3WhcNMTAwMzAyMTAxMjQ3WjBF +MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50 +ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALIL +AmF7Hay9WuhREpRqG2KPCFNbjVGeZ6cS/1FImhHCw40JWDElQJp4lprIly7mkp+7 +seIEa7/kf0y9iy0o7s0CAwEAAaOBpzCBpDAdBgNVHQ4EFgQUBKWDVk2Hp9jW+hiD +wuuecaBB0W0wdQYDVR0jBG4wbIAUBKWDVk2Hp9jW+hiDwuuecaBB0W2hSaRHMEUx +CzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRl +cm5ldCBXaWRnaXRzIFB0eSBMdGSCCQC3WbF8FzrVuzAMBgNVHRMEBTADAQH/MA0G +CSqGSIb3DQEBBQUAA0EAg7hwCEhY2+MmQYXqe8szmgkXe73qv+i2XyZGytUcdaB/ +sd2ydbMLIZlWHD5Zb6xBVDVJpLttduW0cK9daFvElQ== +-----END CERTIFICATE----- diff --git a/test/ganeti.utils_unittest.py b/test/ganeti.utils_unittest.py index f1c665940..eb24ebd83 100755 --- a/test/ganeti.utils_unittest.py +++ b/test/ganeti.utils_unittest.py @@ -35,6 +35,9 @@ import shutil import re import select import string +import OpenSSL +import warnings +import distutils.version import ganeti import testutils @@ -1387,6 +1390,68 @@ class TestHostInfo(unittest.TestCase): HostInfo.NormalizeName(value) +class TestParseAsn1Generalizedtime(unittest.TestCase): + def test(self): + # UTC + self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0) + self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"), + 1266860512) + self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"), + (2**31) - 1) + + # With offset + self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"), + 1266860512) + self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"), + 1266931012) + self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"), + 1266931088) + self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"), + 1266931295) + self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"), + 3600) + + # Leap seconds are not supported by datetime.datetime + self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, + "19841231235960+0000") + self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, + "19920630235960+0000") + + # Errors + self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "") + self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid") + self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, + "20100222174152") + self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, + "Mon Feb 22 17:47:02 UTC 2010") + self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, + "2010-02-22 17:42:02") + + +class TestGetX509CertValidity(testutils.GanetiTestCase): + def setUp(self): + testutils.GanetiTestCase.setUp(self) + + pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__) + + # Test whether we have pyOpenSSL 0.7 or above + self.pyopenssl0_7 = (pyopenssl_version >= "0.7") + + if not self.pyopenssl0_7: + warnings.warn("This test requires pyOpenSSL 0.7 or above to" + " function correctly") + + def _LoadCert(self, name): + return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, + self._ReadTestData(name)) + + def test(self): + validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem")) + if self.pyopenssl0_7: + self.assertEqual(validity, (1266919967, 1267524767)) + else: + self.assertEqual(validity, (None, None)) + if __name__ == '__main__': testutils.GanetiTestProgram() -- GitLab