utils: Add functions to sign and verify X509 certs using HMAC
authorMichael Hanselmann <hansmi@google.com>
Wed, 17 Mar 2010 15:18:08 +0000 (16:18 +0100)
committerMichael Hanselmann <hansmi@google.com>
Wed, 17 Mar 2010 15:18:22 +0000 (16:18 +0100)
Certificates exchanged via an untrusted third party should be
signed to ensure they haven't been modified.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Iustin Pop <iustin@google.com>

doc/design-2.2.rst
lib/constants.py
lib/utils.py
test/ganeti.utils_unittest.py

index f676c9e..ab0a8bd 100644 (file)
@@ -246,7 +246,9 @@ signatures must be mandatory. The HMAC signature for X509 certificates
 will be prepended to the certificate similar to an RFC822 header and
 only covers the certificate (from ``-----BEGIN CERTIFICATE-----`` to
 ``-----END CERTIFICATE-----``). The header name will be
-``X-Ganeti-Signature``.
+``X-Ganeti-Signature`` and its value will have the format
+``$salt/$hash`` (salt and hash separated by slash). The salt may only
+contain characters in the range ``[a-zA-Z0-9]``.
 
 On the web, the destination cluster would be equivalent to an HTTPS
 server requiring verifiable client certificates. The browser would be
index f8b68c4..24fc7ab 100644 (file)
@@ -189,6 +189,8 @@ RSA_KEY_BITS = 2048
 # Digest used to sign certificates ("openssl x509" uses SHA1 by default)
 X509_CERT_SIGN_DIGEST = "SHA1"
 
+X509_CERT_SIGNATURE_HEADER = "X-Ganeti-Signature"
+
 VALUE_DEFAULT = "default"
 VALUE_AUTO = "auto"
 VALUE_GENERATE = "generate"
index a0d7642..cb7f39a 100644 (file)
@@ -47,14 +47,14 @@ import signal
 import OpenSSL
 import datetime
 import calendar
+import hmac
 
 from cStringIO import StringIO
 
 try:
   from hashlib import sha1
 except ImportError:
-  import sha
-  sha1 = sha.new
+  import sha as sha1
 
 from ganeti import errors
 from ganeti import constants
@@ -70,6 +70,13 @@ no_fork = False
 
 _RANDOM_UUID_FILE = "/proc/sys/kernel/random/uuid"
 
+HEX_CHAR_RE = r"[a-zA-Z0-9]"
+VALID_X509_SIGNATURE_SALT = re.compile("^%s+$" % HEX_CHAR_RE, re.S)
+X509_SIGNATURE = re.compile(r"^%s:\s*(?P<salt>%s+)/(?P<sign>%s+)$" %
+                            (re.escape(constants.X509_CERT_SIGNATURE_HEADER),
+                             HEX_CHAR_RE, HEX_CHAR_RE),
+                            re.S | re.I)
+
 
 class RunResult(object):
   """Holds the result of running external programs.
@@ -673,7 +680,10 @@ def _FingerprintFile(filename):
 
   f = open(filename)
 
-  fp = sha1()
+  if callable(sha1):
+    fp = sha1()
+  else:
+    fp = sha1.new()
   while True:
     data = f.read(4096)
     if not data:
@@ -2356,6 +2366,74 @@ def GetX509CertValidity(cert):
   return (not_before, not_after)
 
 
+def SignX509Certificate(cert, key, salt):
+  """Sign a X509 certificate.
+
+  An RFC822-like signature header is added in front of the certificate.
+
+  @type cert: OpenSSL.crypto.X509
+  @param cert: X509 certificate object
+  @type key: string
+  @param key: Key for HMAC
+  @type salt: string
+  @param salt: Salt for HMAC
+  @rtype: string
+  @return: Serialized and signed certificate in PEM format
+
+  """
+  if not VALID_X509_SIGNATURE_SALT.match(salt):
+    raise errors.GenericError("Invalid salt: %r" % salt)
+
+  # Dumping as PEM here ensures the certificate is in a sane format
+  cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
+
+  return ("%s: %s/%s\n\n%s" %
+          (constants.X509_CERT_SIGNATURE_HEADER, salt,
+           hmac.new(key, salt + cert_pem, sha1).hexdigest(),
+           cert_pem))
+
+
+def _ExtractX509CertificateSignature(cert_pem):
+  """Helper function to extract signature from X509 certificate.
+
+  """
+  # Extract signature from original PEM data
+  for line in cert_pem.splitlines():
+    if line.startswith("---"):
+      break
+
+    m = X509_SIGNATURE.match(line.strip())
+    if m:
+      return (m.group("salt"), m.group("sign"))
+
+  raise errors.GenericError("X509 certificate signature is missing")
+
+
+def LoadSignedX509Certificate(cert_pem, key):
+  """Verifies a signed X509 certificate.
+
+  @type cert_pem: string
+  @param cert_pem: Certificate in PEM format and with signature header
+  @type key: string
+  @param key: Key for HMAC
+  @rtype: tuple; (OpenSSL.crypto.X509, string)
+  @return: X509 certificate object and salt
+
+  """
+  (salt, signature) = _ExtractX509CertificateSignature(cert_pem)
+
+  # Load certificate
+  cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_pem)
+
+  # Dump again to ensure it's in a sane format
+  sane_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
+
+  if signature != hmac.new(key, salt + sane_pem, sha1).hexdigest():
+    raise errors.GenericError("X509 certificate signature is invalid")
+
+  return (cert, salt)
+
+
 def SafeEncode(text):
   """Return a 'safe' version of a source string.
 
index 831c477..1088a75 100755 (executable)
@@ -1698,5 +1698,60 @@ class TestGetX509CertValidity(testutils.GanetiTestCase):
       self.assertEqual(validity, (None, None))
 
 
+class TestSignX509Certificate(unittest.TestCase):
+  KEY = "My private key!"
+  KEY_OTHER = "Another key"
+
+  def test(self):
+    # Generate certificate valid for 5 minutes
+    (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
+
+    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                           cert_pem)
+
+    # No signature at all
+    self.assertRaises(errors.GenericError,
+                      utils.LoadSignedX509Certificate, cert_pem, self.KEY)
+
+    # Invalid input
+    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
+                      "", self.KEY)
+    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
+                      "X-Ganeti-Signature: \n", self.KEY)
+    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
+                      "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
+    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
+                      "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
+    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
+                      "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
+
+    # Invalid salt
+    for salt in list("-_@$,:;/\\ \t\n"):
+      self.assertRaises(errors.GenericError, utils.SignX509Certificate,
+                        cert_pem, self.KEY, "foo%sbar" % salt)
+
+    for salt in ["HelloWorld", "salt", string.letters, string.digits,
+                 utils.GenerateSecret(numbytes=4),
+                 utils.GenerateSecret(numbytes=16),
+                 "{123:456}".encode("hex")]:
+      signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
+
+      self._Check(cert, salt, signed_pem)
+
+      self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
+      self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
+      self._Check(cert, salt, (signed_pem + "\n\na few more\n"
+                               "lines----\n------ at\nthe end!"))
+
+  def _Check(self, cert, salt, pem):
+    (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
+    self.assertEqual(salt, salt2)
+    self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
+
+    # Other key
+    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
+                      pem, self.KEY_OTHER)
+
+
 if __name__ == '__main__':
   testutils.GanetiTestProgram()