From 688576435081cc5bbd5bcf8bf10fa5f78e94df24 Mon Sep 17 00:00:00 2001 From: Michael Hanselmann Date: Wed, 17 Mar 2010 16:18:08 +0100 Subject: [PATCH] utils: Add functions to sign and verify X509 certs using HMAC Certificates exchanged via an untrusted third party should be signed to ensure they haven't been modified. Signed-off-by: Michael Hanselmann Reviewed-by: Iustin Pop --- doc/design-2.2.rst | 4 +- lib/constants.py | 2 + lib/utils.py | 84 +++++++++++++++++++++++++++++++++++++++-- test/ganeti.utils_unittest.py | 55 +++++++++++++++++++++++++++ 4 files changed, 141 insertions(+), 4 deletions(-) diff --git a/doc/design-2.2.rst b/doc/design-2.2.rst index f676c9e..ab0a8bd 100644 --- a/doc/design-2.2.rst +++ b/doc/design-2.2.rst @@ -246,7 +246,9 @@ signatures must be mandatory. The HMAC signature for X509 certificates will be prepended to the certificate similar to an RFC822 header and only covers the certificate (from ``-----BEGIN CERTIFICATE-----`` to ``-----END CERTIFICATE-----``). The header name will be -``X-Ganeti-Signature``. +``X-Ganeti-Signature`` and its value will have the format +``$salt/$hash`` (salt and hash separated by slash). The salt may only +contain characters in the range ``[a-zA-Z0-9]``. On the web, the destination cluster would be equivalent to an HTTPS server requiring verifiable client certificates. The browser would be diff --git a/lib/constants.py b/lib/constants.py index f8b68c4..24fc7ab 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -189,6 +189,8 @@ RSA_KEY_BITS = 2048 # Digest used to sign certificates ("openssl x509" uses SHA1 by default) X509_CERT_SIGN_DIGEST = "SHA1" +X509_CERT_SIGNATURE_HEADER = "X-Ganeti-Signature" + VALUE_DEFAULT = "default" VALUE_AUTO = "auto" VALUE_GENERATE = "generate" diff --git a/lib/utils.py b/lib/utils.py index a0d7642..cb7f39a 100644 --- a/lib/utils.py +++ b/lib/utils.py @@ -47,14 +47,14 @@ import signal import OpenSSL import datetime import calendar +import hmac from cStringIO import StringIO try: from hashlib import sha1 except ImportError: - import sha - sha1 = sha.new + import sha as sha1 from ganeti import errors from ganeti import constants @@ -70,6 +70,13 @@ no_fork = False _RANDOM_UUID_FILE = "/proc/sys/kernel/random/uuid" +HEX_CHAR_RE = r"[a-zA-Z0-9]" +VALID_X509_SIGNATURE_SALT = re.compile("^%s+$" % HEX_CHAR_RE, re.S) +X509_SIGNATURE = re.compile(r"^%s:\s*(?P%s+)/(?P%s+)$" % + (re.escape(constants.X509_CERT_SIGNATURE_HEADER), + HEX_CHAR_RE, HEX_CHAR_RE), + re.S | re.I) + class RunResult(object): """Holds the result of running external programs. @@ -673,7 +680,10 @@ def _FingerprintFile(filename): f = open(filename) - fp = sha1() + if callable(sha1): + fp = sha1() + else: + fp = sha1.new() while True: data = f.read(4096) if not data: @@ -2356,6 +2366,74 @@ def GetX509CertValidity(cert): return (not_before, not_after) +def SignX509Certificate(cert, key, salt): + """Sign a X509 certificate. + + An RFC822-like signature header is added in front of the certificate. + + @type cert: OpenSSL.crypto.X509 + @param cert: X509 certificate object + @type key: string + @param key: Key for HMAC + @type salt: string + @param salt: Salt for HMAC + @rtype: string + @return: Serialized and signed certificate in PEM format + + """ + if not VALID_X509_SIGNATURE_SALT.match(salt): + raise errors.GenericError("Invalid salt: %r" % salt) + + # Dumping as PEM here ensures the certificate is in a sane format + cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert) + + return ("%s: %s/%s\n\n%s" % + (constants.X509_CERT_SIGNATURE_HEADER, salt, + hmac.new(key, salt + cert_pem, sha1).hexdigest(), + cert_pem)) + + +def _ExtractX509CertificateSignature(cert_pem): + """Helper function to extract signature from X509 certificate. + + """ + # Extract signature from original PEM data + for line in cert_pem.splitlines(): + if line.startswith("---"): + break + + m = X509_SIGNATURE.match(line.strip()) + if m: + return (m.group("salt"), m.group("sign")) + + raise errors.GenericError("X509 certificate signature is missing") + + +def LoadSignedX509Certificate(cert_pem, key): + """Verifies a signed X509 certificate. + + @type cert_pem: string + @param cert_pem: Certificate in PEM format and with signature header + @type key: string + @param key: Key for HMAC + @rtype: tuple; (OpenSSL.crypto.X509, string) + @return: X509 certificate object and salt + + """ + (salt, signature) = _ExtractX509CertificateSignature(cert_pem) + + # Load certificate + cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_pem) + + # Dump again to ensure it's in a sane format + sane_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert) + + if signature != hmac.new(key, salt + sane_pem, sha1).hexdigest(): + raise errors.GenericError("X509 certificate signature is invalid") + + return (cert, salt) + + def SafeEncode(text): """Return a 'safe' version of a source string. diff --git a/test/ganeti.utils_unittest.py b/test/ganeti.utils_unittest.py index 831c477..1088a75 100755 --- a/test/ganeti.utils_unittest.py +++ b/test/ganeti.utils_unittest.py @@ -1698,5 +1698,60 @@ class TestGetX509CertValidity(testutils.GanetiTestCase): self.assertEqual(validity, (None, None)) +class TestSignX509Certificate(unittest.TestCase): + KEY = "My private key!" + KEY_OTHER = "Another key" + + def test(self): + # Generate certificate valid for 5 minutes + (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300) + + cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, + cert_pem) + + # No signature at all + self.assertRaises(errors.GenericError, + utils.LoadSignedX509Certificate, cert_pem, self.KEY) + + # Invalid input + self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, + "", self.KEY) + self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, + "X-Ganeti-Signature: \n", self.KEY) + self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, + "X-Ganeti-Sign: $1234$abcdef\n", self.KEY) + self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, + "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY) + self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, + "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY) + + # Invalid salt + for salt in list("-_@$,:;/\\ \t\n"): + self.assertRaises(errors.GenericError, utils.SignX509Certificate, + cert_pem, self.KEY, "foo%sbar" % salt) + + for salt in ["HelloWorld", "salt", string.letters, string.digits, + utils.GenerateSecret(numbytes=4), + utils.GenerateSecret(numbytes=16), + "{123:456}".encode("hex")]: + signed_pem = utils.SignX509Certificate(cert, self.KEY, salt) + + self._Check(cert, salt, signed_pem) + + self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem) + self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem) + self._Check(cert, salt, (signed_pem + "\n\na few more\n" + "lines----\n------ at\nthe end!")) + + def _Check(self, cert, salt, pem): + (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY) + self.assertEqual(salt, salt2) + self.assertEqual(cert.digest("sha1"), cert2.digest("sha1")) + + # Other key + self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, + pem, self.KEY_OTHER) + + if __name__ == '__main__': testutils.GanetiTestProgram() -- 1.7.10.4