Commit 68857643 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

utils: Add functions to sign and verify X509 certs using HMAC

Certificates exchanged via an untrusted third party should be
signed to ensure they haven't been modified.
Signed-off-by: default avatarMichael Hanselmann <>
Reviewed-by: default avatarIustin Pop <>
parent 3db3eb2a
......@@ -246,7 +246,9 @@ signatures must be mandatory. The HMAC signature for X509 certificates
will be prepended to the certificate similar to an RFC822 header and
only covers the certificate (from ``-----BEGIN CERTIFICATE-----`` to
``-----END CERTIFICATE-----``). The header name will be
``X-Ganeti-Signature`` and its value will have the format
``$salt/$hash`` (salt and hash separated by slash). The salt may only
contain characters in the range ``[a-zA-Z0-9]``.
On the web, the destination cluster would be equivalent to an HTTPS
server requiring verifiable client certificates. The browser would be
......@@ -189,6 +189,8 @@ RSA_KEY_BITS = 2048
# Digest used to sign certificates ("openssl x509" uses SHA1 by default)
X509_CERT_SIGNATURE_HEADER = "X-Ganeti-Signature"
VALUE_DEFAULT = "default"
VALUE_AUTO = "auto"
VALUE_GENERATE = "generate"
......@@ -47,14 +47,14 @@ import signal
import OpenSSL
import datetime
import calendar
import hmac
from cStringIO import StringIO
from hashlib import sha1
except ImportError:
import sha
sha1 =
import sha as sha1
from ganeti import errors
from ganeti import constants
......@@ -70,6 +70,13 @@ no_fork = False
_RANDOM_UUID_FILE = "/proc/sys/kernel/random/uuid"
HEX_CHAR_RE = r"[a-zA-Z0-9]"
VALID_X509_SIGNATURE_SALT = re.compile("^%s+$" % HEX_CHAR_RE, re.S)
X509_SIGNATURE = re.compile(r"^%s:\s*(?P<salt>%s+)/(?P<sign>%s+)$" %
re.S | re.I)
class RunResult(object):
"""Holds the result of running external programs.
......@@ -673,7 +680,10 @@ def _FingerprintFile(filename):
f = open(filename)
fp = sha1()
if callable(sha1):
fp = sha1()
fp =
while True:
data =
if not data:
......@@ -2356,6 +2366,74 @@ def GetX509CertValidity(cert):
return (not_before, not_after)
def SignX509Certificate(cert, key, salt):
"""Sign a X509 certificate.
An RFC822-like signature header is added in front of the certificate.
@type cert: OpenSSL.crypto.X509
@param cert: X509 certificate object
@type key: string
@param key: Key for HMAC
@type salt: string
@param salt: Salt for HMAC
@rtype: string
@return: Serialized and signed certificate in PEM format
if not VALID_X509_SIGNATURE_SALT.match(salt):
raise errors.GenericError("Invalid salt: %r" % salt)
# Dumping as PEM here ensures the certificate is in a sane format
cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
return ("%s: %s/%s\n\n%s" %
(constants.X509_CERT_SIGNATURE_HEADER, salt,, salt + cert_pem, sha1).hexdigest(),
def _ExtractX509CertificateSignature(cert_pem):
"""Helper function to extract signature from X509 certificate.
# Extract signature from original PEM data
for line in cert_pem.splitlines():
if line.startswith("---"):
m = X509_SIGNATURE.match(line.strip())
if m:
return ("salt"),"sign"))
raise errors.GenericError("X509 certificate signature is missing")
def LoadSignedX509Certificate(cert_pem, key):
"""Verifies a signed X509 certificate.
@type cert_pem: string
@param cert_pem: Certificate in PEM format and with signature header
@type key: string
@param key: Key for HMAC
@rtype: tuple; (OpenSSL.crypto.X509, string)
@return: X509 certificate object and salt
(salt, signature) = _ExtractX509CertificateSignature(cert_pem)
# Load certificate
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_pem)
# Dump again to ensure it's in a sane format
sane_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
if signature !=, salt + sane_pem, sha1).hexdigest():
raise errors.GenericError("X509 certificate signature is invalid")
return (cert, salt)
def SafeEncode(text):
"""Return a 'safe' version of a source string.
......@@ -1698,5 +1698,60 @@ class TestGetX509CertValidity(testutils.GanetiTestCase):
self.assertEqual(validity, (None, None))
class TestSignX509Certificate(unittest.TestCase):
KEY = "My private key!"
KEY_OTHER = "Another key"
def test(self):
# Generate certificate valid for 5 minutes
(_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
# No signature at all
utils.LoadSignedX509Certificate, cert_pem, self.KEY)
# Invalid input
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
"", self.KEY)
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
"X-Ganeti-Signature: \n", self.KEY)
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
"X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
"X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
"X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
# Invalid salt
for salt in list("-_@$,:;/\\ \t\n"):
self.assertRaises(errors.GenericError, utils.SignX509Certificate,
cert_pem, self.KEY, "foo%sbar" % salt)
for salt in ["HelloWorld", "salt", string.letters, string.digits,
signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
self._Check(cert, salt, signed_pem)
self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
self._Check(cert, salt, (signed_pem + "\n\na few more\n"
"lines----\n------ at\nthe end!"))
def _Check(self, cert, salt, pem):
(cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
self.assertEqual(salt, salt2)
self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
# Other key
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
pem, self.KEY_OTHER)
if __name__ == '__main__':
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment