Commit 27e46076 authored by Michael Hanselmann's avatar Michael Hanselmann

utils: Add function to extract X509 cert validity

X509 uses ASN1 GENERALIZEDTIME or UTCTIME to store certificate validity.
pyOpenSSL 0.7 and above allow us to retrieve both “notBefore” and
“notAfter” as strings. Parsing them turned out to be a challenge since
they can be in a variety of formats (YYYYMMDDhhmmssZ, YYYYMMDDhhmmss+hhmm
or YYYMMDDhhmmss-hhmm).

This will be used to verify the validity of cluster certificates in
LUVerifyCluster.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent d3100055
......@@ -313,6 +313,7 @@ TEST_FILES = \
test/data/bdev-8.3-both.txt \
test/data/bdev-disk.txt \
test/data/bdev-net.txt \
test/data/cert1.pem \
test/data/proc_drbd8.txt \
test/data/proc_drbd80-emptyline.txt \
test/data/proc_drbd83.txt
......
......@@ -43,6 +43,8 @@ import resource
import logging
import logging.handlers
import signal
import datetime
import calendar
from cStringIO import StringIO
......@@ -2026,6 +2028,69 @@ def TailFile(fname, lines=20):
return rows[-lines:]
def _ParseAsn1Generalizedtime(value):
"""Parses an ASN1 GENERALIZEDTIME timestamp as used by pyOpenSSL.
@type value: string
@param value: ASN1 GENERALIZEDTIME timestamp
"""
m = re.match(r"^(\d+)([-+]\d\d)(\d\d)$", value)
if m:
# We have an offset
asn1time = m.group(1)
hours = int(m.group(2))
minutes = int(m.group(3))
utcoffset = (60 * hours) + minutes
else:
if not value.endswith("Z"):
raise ValueError("Missing timezone")
asn1time = value[:-1]
utcoffset = 0
parsed = time.strptime(asn1time, "%Y%m%d%H%M%S")
tt = datetime.datetime(*(parsed[:7])) - datetime.timedelta(minutes=utcoffset)
return calendar.timegm(tt.utctimetuple())
def GetX509CertValidity(cert):
"""Returns the validity period of the certificate.
@type cert: OpenSSL.crypto.X509
@param cert: X509 certificate object
"""
# The get_notBefore and get_notAfter functions are only supported in
# pyOpenSSL 0.7 and above.
try:
get_notbefore_fn = cert.get_notBefore
except AttributeError:
not_before = None
else:
not_before_asn1 = get_notbefore_fn()
if not_before_asn1 is None:
not_before = None
else:
not_before = _ParseAsn1Generalizedtime(not_before_asn1)
try:
get_notafter_fn = cert.get_notAfter
except AttributeError:
not_after = None
else:
not_after_asn1 = get_notafter_fn()
if not_after_asn1 is None:
not_after = None
else:
not_after = _ParseAsn1Generalizedtime(not_after_asn1)
return (not_before, not_after)
def SafeEncode(text):
"""Return a 'safe' version of a source string.
......
-----BEGIN CERTIFICATE-----
MIICKzCCAdWgAwIBAgIJALdZsXwXOtW7MA0GCSqGSIb3DQEBBQUAMEUxCzAJBgNV
BAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRlcm5ldCBX
aWRnaXRzIFB0eSBMdGQwHhcNMTAwMjIzMTAxMjQ3WhcNMTAwMzAyMTAxMjQ3WjBF
MQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALIL
AmF7Hay9WuhREpRqG2KPCFNbjVGeZ6cS/1FImhHCw40JWDElQJp4lprIly7mkp+7
seIEa7/kf0y9iy0o7s0CAwEAAaOBpzCBpDAdBgNVHQ4EFgQUBKWDVk2Hp9jW+hiD
wuuecaBB0W0wdQYDVR0jBG4wbIAUBKWDVk2Hp9jW+hiDwuuecaBB0W2hSaRHMEUx
CzAJBgNVBAYTAkFVMRMwEQYDVQQIEwpTb21lLVN0YXRlMSEwHwYDVQQKExhJbnRl
cm5ldCBXaWRnaXRzIFB0eSBMdGSCCQC3WbF8FzrVuzAMBgNVHRMEBTADAQH/MA0G
CSqGSIb3DQEBBQUAA0EAg7hwCEhY2+MmQYXqe8szmgkXe73qv+i2XyZGytUcdaB/
sd2ydbMLIZlWHD5Zb6xBVDVJpLttduW0cK9daFvElQ==
-----END CERTIFICATE-----
......@@ -35,6 +35,9 @@ import shutil
import re
import select
import string
import OpenSSL
import warnings
import distutils.version
import ganeti
import testutils
......@@ -1387,6 +1390,68 @@ class TestHostInfo(unittest.TestCase):
HostInfo.NormalizeName(value)
class TestParseAsn1Generalizedtime(unittest.TestCase):
def test(self):
# UTC
self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0)
self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"),
1266860512)
self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"),
(2**31) - 1)
# With offset
self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"),
1266860512)
self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"),
1266931012)
self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"),
1266931088)
self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"),
1266931295)
self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"),
3600)
# Leap seconds are not supported by datetime.datetime
self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
"19841231235960+0000")
self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
"19920630235960+0000")
# Errors
self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "")
self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid")
self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
"20100222174152")
self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
"Mon Feb 22 17:47:02 UTC 2010")
self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
"2010-02-22 17:42:02")
class TestGetX509CertValidity(testutils.GanetiTestCase):
def setUp(self):
testutils.GanetiTestCase.setUp(self)
pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
# Test whether we have pyOpenSSL 0.7 or above
self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
if not self.pyopenssl0_7:
warnings.warn("This test requires pyOpenSSL 0.7 or above to"
" function correctly")
def _LoadCert(self, name):
return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
self._ReadTestData(name))
def test(self):
validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
if self.pyopenssl0_7:
self.assertEqual(validity, (1266919967, 1267524767))
else:
self.assertEqual(validity, (None, None))
if __name__ == '__main__':
testutils.GanetiTestProgram()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment