Commit c50645c0 authored by Michael Hanselmann's avatar Michael Hanselmann

utils: Move X509-related code into separate file

Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 3865ca48
......@@ -221,7 +221,8 @@ utils_PYTHON = \
lib/utils/mlock.py \
lib/utils/retry.py \
lib/utils/text.py \
lib/utils/wrapper.py
lib/utils/wrapper.py \
lib/utils/x509.py
docrst = \
doc/admin.rst \
......@@ -495,6 +496,7 @@ python_tests = \
test/ganeti.utils.retry_unittest.py \
test/ganeti.utils.text_unittest.py \
test/ganeti.utils.wrapper_unittest.py \
test/ganeti.utils.x509_unittest.py \
test/ganeti.utils_unittest.py \
test/ganeti.workerpool_unittest.py \
test/cfgupgrade_unittest.py \
......
......@@ -43,7 +43,6 @@ import fcntl
import resource
import logging
import signal
import OpenSSL
import datetime
import calendar
......@@ -62,6 +61,7 @@ from ganeti.utils.hash import * # pylint: disable-msg=W0401
from ganeti.utils.wrapper import * # pylint: disable-msg=W0401
from ganeti.utils.filelock import * # pylint: disable-msg=W0401
from ganeti.utils.io import * # pylint: disable-msg=W0401
from ganeti.utils.x509 import * # pylint: disable-msg=W0401
#: when set to True, L{RunCmd} is disabled
......@@ -69,22 +69,11 @@ _no_fork = False
_RANDOM_UUID_FILE = "/proc/sys/kernel/random/uuid"
HEX_CHAR_RE = r"[a-zA-Z0-9]"
VALID_X509_SIGNATURE_SALT = re.compile("^%s+$" % HEX_CHAR_RE, re.S)
X509_SIGNATURE = re.compile(r"^%s:\s*(?P<salt>%s+)/(?P<sign>%s+)$" %
(re.escape(constants.X509_CERT_SIGNATURE_HEADER),
HEX_CHAR_RE, HEX_CHAR_RE),
re.S | re.I)
_VALID_SERVICE_NAME_RE = re.compile("^[-_.a-zA-Z0-9]{1,128}$")
UUID_RE = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
'[a-f0-9]{4}-[a-f0-9]{12}$')
# Certificate verification results
(CERT_WARNING,
CERT_ERROR) = range(1, 3)
(_TIMEOUT_NONE,
_TIMEOUT_TERM,
_TIMEOUT_KILL) = range(3)
......@@ -92,9 +81,6 @@ UUID_RE = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-'
#: Shell param checker regexp
_SHELLPARAM_REGEX = re.compile(r"^[-a-zA-Z0-9._+/:%@]+$")
#: ASN1 time regexp
_ASN1_TIME_REGEX = re.compile(r"^(\d+)([-+]\d\d)(\d\d)$")
def DisableFork():
"""Disables the use of fork(2).
......@@ -1572,206 +1558,6 @@ def MergeTime(timetuple):
return float(seconds) + (float(microseconds) * 0.000001)
def _ParseAsn1Generalizedtime(value):
"""Parses an ASN1 GENERALIZEDTIME timestamp as used by pyOpenSSL.
@type value: string
@param value: ASN1 GENERALIZEDTIME timestamp
@return: Seconds since the Epoch (1970-01-01 00:00:00 UTC)
"""
m = _ASN1_TIME_REGEX.match(value)
if m:
# We have an offset
asn1time = m.group(1)
hours = int(m.group(2))
minutes = int(m.group(3))
utcoffset = (60 * hours) + minutes
else:
if not value.endswith("Z"):
raise ValueError("Missing timezone")
asn1time = value[:-1]
utcoffset = 0
parsed = time.strptime(asn1time, "%Y%m%d%H%M%S")
tt = datetime.datetime(*(parsed[:7])) - datetime.timedelta(minutes=utcoffset)
return calendar.timegm(tt.utctimetuple())
def GetX509CertValidity(cert):
"""Returns the validity period of the certificate.
@type cert: OpenSSL.crypto.X509
@param cert: X509 certificate object
"""
# The get_notBefore and get_notAfter functions are only supported in
# pyOpenSSL 0.7 and above.
try:
get_notbefore_fn = cert.get_notBefore
except AttributeError:
not_before = None
else:
not_before_asn1 = get_notbefore_fn()
if not_before_asn1 is None:
not_before = None
else:
not_before = _ParseAsn1Generalizedtime(not_before_asn1)
try:
get_notafter_fn = cert.get_notAfter
except AttributeError:
not_after = None
else:
not_after_asn1 = get_notafter_fn()
if not_after_asn1 is None:
not_after = None
else:
not_after = _ParseAsn1Generalizedtime(not_after_asn1)
return (not_before, not_after)
def _VerifyCertificateInner(expired, not_before, not_after, now,
warn_days, error_days):
"""Verifies certificate validity.
@type expired: bool
@param expired: Whether pyOpenSSL considers the certificate as expired
@type not_before: number or None
@param not_before: Unix timestamp before which certificate is not valid
@type not_after: number or None
@param not_after: Unix timestamp after which certificate is invalid
@type now: number
@param now: Current time as Unix timestamp
@type warn_days: number or None
@param warn_days: How many days before expiration a warning should be reported
@type error_days: number or None
@param error_days: How many days before expiration an error should be reported
"""
if expired:
msg = "Certificate is expired"
if not_before is not None and not_after is not None:
msg += (" (valid from %s to %s)" %
(FormatTime(not_before), FormatTime(not_after)))
elif not_before is not None:
msg += " (valid from %s)" % FormatTime(not_before)
elif not_after is not None:
msg += " (valid until %s)" % FormatTime(not_after)
return (CERT_ERROR, msg)
elif not_before is not None and not_before > now:
return (CERT_WARNING,
"Certificate not yet valid (valid from %s)" %
FormatTime(not_before))
elif not_after is not None:
remaining_days = int((not_after - now) / (24 * 3600))
msg = "Certificate expires in about %d days" % remaining_days
if error_days is not None and remaining_days <= error_days:
return (CERT_ERROR, msg)
if warn_days is not None and remaining_days <= warn_days:
return (CERT_WARNING, msg)
return (None, None)
def VerifyX509Certificate(cert, warn_days, error_days):
"""Verifies a certificate for LUVerifyCluster.
@type cert: OpenSSL.crypto.X509
@param cert: X509 certificate object
@type warn_days: number or None
@param warn_days: How many days before expiration a warning should be reported
@type error_days: number or None
@param error_days: How many days before expiration an error should be reported
"""
# Depending on the pyOpenSSL version, this can just return (None, None)
(not_before, not_after) = GetX509CertValidity(cert)
return _VerifyCertificateInner(cert.has_expired(), not_before, not_after,
time.time(), warn_days, error_days)
def SignX509Certificate(cert, key, salt):
"""Sign a X509 certificate.
An RFC822-like signature header is added in front of the certificate.
@type cert: OpenSSL.crypto.X509
@param cert: X509 certificate object
@type key: string
@param key: Key for HMAC
@type salt: string
@param salt: Salt for HMAC
@rtype: string
@return: Serialized and signed certificate in PEM format
"""
if not VALID_X509_SIGNATURE_SALT.match(salt):
raise errors.GenericError("Invalid salt: %r" % salt)
# Dumping as PEM here ensures the certificate is in a sane format
cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
return ("%s: %s/%s\n\n%s" %
(constants.X509_CERT_SIGNATURE_HEADER, salt,
Sha1Hmac(key, cert_pem, salt=salt),
cert_pem))
def _ExtractX509CertificateSignature(cert_pem):
"""Helper function to extract signature from X509 certificate.
"""
# Extract signature from original PEM data
for line in cert_pem.splitlines():
if line.startswith("---"):
break
m = X509_SIGNATURE.match(line.strip())
if m:
return (m.group("salt"), m.group("sign"))
raise errors.GenericError("X509 certificate signature is missing")
def LoadSignedX509Certificate(cert_pem, key):
"""Verifies a signed X509 certificate.
@type cert_pem: string
@param cert_pem: Certificate in PEM format and with signature header
@type key: string
@param key: Key for HMAC
@rtype: tuple; (OpenSSL.crypto.X509, string)
@return: X509 certificate object and salt
"""
(salt, signature) = _ExtractX509CertificateSignature(cert_pem)
# Load certificate
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_pem)
# Dump again to ensure it's in a sane format
sane_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
if not VerifySha1Hmac(key, sane_pem, signature, salt=salt):
raise errors.GenericError("X509 certificate signature is invalid")
return (cert, salt)
def FindMatch(data, name):
"""Tries to find an item in a dictionary matching a name.
......@@ -1866,57 +1652,6 @@ def RunInSeparateProcess(fn, *args):
return bool(exitcode)
def GenerateSelfSignedX509Cert(common_name, validity):
"""Generates a self-signed X509 certificate.
@type common_name: string
@param common_name: commonName value
@type validity: int
@param validity: Validity for certificate in seconds
"""
# Create private and public key
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS)
# Create self-signed certificate
cert = OpenSSL.crypto.X509()
if common_name:
cert.get_subject().CN = common_name
cert.set_serial_number(1)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(validity)
cert.set_issuer(cert.get_subject())
cert.set_pubkey(key)
cert.sign(key, constants.X509_CERT_SIGN_DIGEST)
key_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
return (key_pem, cert_pem)
def GenerateSelfSignedSslCert(filename, common_name=constants.X509_CERT_CN,
validity=constants.X509_CERT_DEFAULT_VALIDITY):
"""Legacy function to generate self-signed X509 certificate.
@type filename: str
@param filename: path to write certificate to
@type common_name: string
@param common_name: commonName value
@type validity: int
@param validity: validity of certificate in number of days
"""
# TODO: Investigate using the cluster name instead of X505_CERT_CN for
# common_name, as cluster-renames are very seldom, and it'd be nice if RAPI
# and node daemon certificates have the proper Subject/Issuer.
(key_pem, cert_pem) = GenerateSelfSignedX509Cert(common_name,
validity * 24 * 60 * 60)
WriteFile(filename, mode=0400, data=key_pem + cert_pem)
def SignalHandled(signums):
"""Signal Handled decoration.
......
#
#
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Utility functions for X509.
"""
import time
import OpenSSL
import re
import datetime
import calendar
from ganeti import errors
from ganeti import constants
from ganeti.utils import text as utils_text
from ganeti.utils import io as utils_io
from ganeti.utils import hash as utils_hash
HEX_CHAR_RE = r"[a-zA-Z0-9]"
VALID_X509_SIGNATURE_SALT = re.compile("^%s+$" % HEX_CHAR_RE, re.S)
X509_SIGNATURE = re.compile(r"^%s:\s*(?P<salt>%s+)/(?P<sign>%s+)$" %
(re.escape(constants.X509_CERT_SIGNATURE_HEADER),
HEX_CHAR_RE, HEX_CHAR_RE),
re.S | re.I)
# Certificate verification results
(CERT_WARNING,
CERT_ERROR) = range(1, 3)
#: ASN1 time regexp
_ASN1_TIME_REGEX = re.compile(r"^(\d+)([-+]\d\d)(\d\d)$")
def _ParseAsn1Generalizedtime(value):
"""Parses an ASN1 GENERALIZEDTIME timestamp as used by pyOpenSSL.
@type value: string
@param value: ASN1 GENERALIZEDTIME timestamp
@return: Seconds since the Epoch (1970-01-01 00:00:00 UTC)
"""
m = _ASN1_TIME_REGEX.match(value)
if m:
# We have an offset
asn1time = m.group(1)
hours = int(m.group(2))
minutes = int(m.group(3))
utcoffset = (60 * hours) + minutes
else:
if not value.endswith("Z"):
raise ValueError("Missing timezone")
asn1time = value[:-1]
utcoffset = 0
parsed = time.strptime(asn1time, "%Y%m%d%H%M%S")
tt = datetime.datetime(*(parsed[:7])) - datetime.timedelta(minutes=utcoffset)
return calendar.timegm(tt.utctimetuple())
def GetX509CertValidity(cert):
"""Returns the validity period of the certificate.
@type cert: OpenSSL.crypto.X509
@param cert: X509 certificate object
"""
# The get_notBefore and get_notAfter functions are only supported in
# pyOpenSSL 0.7 and above.
try:
get_notbefore_fn = cert.get_notBefore
except AttributeError:
not_before = None
else:
not_before_asn1 = get_notbefore_fn()
if not_before_asn1 is None:
not_before = None
else:
not_before = _ParseAsn1Generalizedtime(not_before_asn1)
try:
get_notafter_fn = cert.get_notAfter
except AttributeError:
not_after = None
else:
not_after_asn1 = get_notafter_fn()
if not_after_asn1 is None:
not_after = None
else:
not_after = _ParseAsn1Generalizedtime(not_after_asn1)
return (not_before, not_after)
def _VerifyCertificateInner(expired, not_before, not_after, now,
warn_days, error_days):
"""Verifies certificate validity.
@type expired: bool
@param expired: Whether pyOpenSSL considers the certificate as expired
@type not_before: number or None
@param not_before: Unix timestamp before which certificate is not valid
@type not_after: number or None
@param not_after: Unix timestamp after which certificate is invalid
@type now: number
@param now: Current time as Unix timestamp
@type warn_days: number or None
@param warn_days: How many days before expiration a warning should be reported
@type error_days: number or None
@param error_days: How many days before expiration an error should be reported
"""
if expired:
msg = "Certificate is expired"
if not_before is not None and not_after is not None:
msg += (" (valid from %s to %s)" %
(utils_text.FormatTime(not_before),
utils_text.FormatTime(not_after)))
elif not_before is not None:
msg += " (valid from %s)" % utils_text.FormatTime(not_before)
elif not_after is not None:
msg += " (valid until %s)" % utils_text.FormatTime(not_after)
return (CERT_ERROR, msg)
elif not_before is not None and not_before > now:
return (CERT_WARNING,
"Certificate not yet valid (valid from %s)" %
utils_text.FormatTime(not_before))
elif not_after is not None:
remaining_days = int((not_after - now) / (24 * 3600))
msg = "Certificate expires in about %d days" % remaining_days
if error_days is not None and remaining_days <= error_days:
return (CERT_ERROR, msg)
if warn_days is not None and remaining_days <= warn_days:
return (CERT_WARNING, msg)
return (None, None)
def VerifyX509Certificate(cert, warn_days, error_days):
"""Verifies a certificate for LUVerifyCluster.
@type cert: OpenSSL.crypto.X509
@param cert: X509 certificate object
@type warn_days: number or None
@param warn_days: How many days before expiration a warning should be reported
@type error_days: number or None
@param error_days: How many days before expiration an error should be reported
"""
# Depending on the pyOpenSSL version, this can just return (None, None)
(not_before, not_after) = GetX509CertValidity(cert)
return _VerifyCertificateInner(cert.has_expired(), not_before, not_after,
time.time(), warn_days, error_days)
def SignX509Certificate(cert, key, salt):
"""Sign a X509 certificate.
An RFC822-like signature header is added in front of the certificate.
@type cert: OpenSSL.crypto.X509
@param cert: X509 certificate object
@type key: string
@param key: Key for HMAC
@type salt: string
@param salt: Salt for HMAC
@rtype: string
@return: Serialized and signed certificate in PEM format
"""
if not VALID_X509_SIGNATURE_SALT.match(salt):
raise errors.GenericError("Invalid salt: %r" % salt)
# Dumping as PEM here ensures the certificate is in a sane format
cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
return ("%s: %s/%s\n\n%s" %
(constants.X509_CERT_SIGNATURE_HEADER, salt,
utils_hash.Sha1Hmac(key, cert_pem, salt=salt),
cert_pem))
def _ExtractX509CertificateSignature(cert_pem):
"""Helper function to extract signature from X509 certificate.
"""
# Extract signature from original PEM data
for line in cert_pem.splitlines():
if line.startswith("---"):
break
m = X509_SIGNATURE.match(line.strip())
if m:
return (m.group("salt"), m.group("sign"))
raise errors.GenericError("X509 certificate signature is missing")
def LoadSignedX509Certificate(cert_pem, key):
"""Verifies a signed X509 certificate.
@type cert_pem: string
@param cert_pem: Certificate in PEM format and with signature header
@type key: string
@param key: Key for HMAC
@rtype: tuple; (OpenSSL.crypto.X509, string)
@return: X509 certificate object and salt
"""
(salt, signature) = _ExtractX509CertificateSignature(cert_pem)
# Load certificate
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_pem)
# Dump again to ensure it's in a sane format
sane_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
if not utils_hash.VerifySha1Hmac(key, sane_pem, signature, salt=salt):
raise errors.GenericError("X509 certificate signature is invalid")
return (cert, salt)
def GenerateSelfSignedX509Cert(common_name, validity):
"""Generates a self-signed X509 certificate.
@type common_name: string
@param common_name: commonName value
@type validity: int
@param validity: Validity for certificate in seconds
"""
# Create private and public key
key = OpenSSL.crypto.PKey()
key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS)
# Create self-signed certificate
cert = OpenSSL.crypto.X509()
if common_name:
cert.get_subject().CN = common_name
cert.set_serial_number(1)
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(validity)
cert.set_issuer(cert.get_subject())
cert.set_pubkey(key)
cert.sign(key, constants.X509_CERT_SIGN_DIGEST)
key_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
return (key_pem, cert_pem)
def GenerateSelfSignedSslCert(filename, common_name=constants.X509_CERT_CN,
validity=constants.X509_CERT_DEFAULT_VALIDITY):
"""Legacy function to generate self-signed X509 certificate.
@type filename: str
@param filename: path to write certificate to
@type common_name: string
@param common_name: commonName value
@type validity: int
@param validity: validity of certificate in number of days
"""
# TODO: Investigate using the cluster name instead of X505_CERT_CN for
# common_name, as cluster-renames are very seldom, and it'd be nice if RAPI
# and node daemon certificates have the proper Subject/Issuer.
(key_pem, cert_pem) = GenerateSelfSignedX509Cert(common_name,
validity * 24 * 60 * 60)
utils_io.WriteFile(filename, mode=0400, data=key_pem + cert_pem)
#!/usr/bin/python
#
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.