will be prepended to the certificate similar to an RFC822 header and
only covers the certificate (from ``-----BEGIN CERTIFICATE-----`` to
``-----END CERTIFICATE-----``). The header name will be
-``X-Ganeti-Signature``.
+``X-Ganeti-Signature`` and its value will have the format
+``$salt/$hash`` (salt and hash separated by slash). The salt may only
+contain characters in the range ``[a-zA-Z0-9]``.
On the web, the destination cluster would be equivalent to an HTTPS
server requiring verifiable client certificates. The browser would be
# Digest used to sign certificates ("openssl x509" uses SHA1 by default)
X509_CERT_SIGN_DIGEST = "SHA1"
+X509_CERT_SIGNATURE_HEADER = "X-Ganeti-Signature"
+
VALUE_DEFAULT = "default"
VALUE_AUTO = "auto"
VALUE_GENERATE = "generate"
import OpenSSL
import datetime
import calendar
+import hmac
from cStringIO import StringIO
try:
from hashlib import sha1
except ImportError:
- import sha
- sha1 = sha.new
+ import sha as sha1
from ganeti import errors
from ganeti import constants
_RANDOM_UUID_FILE = "/proc/sys/kernel/random/uuid"
+HEX_CHAR_RE = r"[a-zA-Z0-9]"
+VALID_X509_SIGNATURE_SALT = re.compile("^%s+$" % HEX_CHAR_RE, re.S)
+X509_SIGNATURE = re.compile(r"^%s:\s*(?P<salt>%s+)/(?P<sign>%s+)$" %
+ (re.escape(constants.X509_CERT_SIGNATURE_HEADER),
+ HEX_CHAR_RE, HEX_CHAR_RE),
+ re.S | re.I)
+
class RunResult(object):
"""Holds the result of running external programs.
f = open(filename)
- fp = sha1()
+ if callable(sha1):
+ fp = sha1()
+ else:
+ fp = sha1.new()
while True:
data = f.read(4096)
if not data:
return (not_before, not_after)
+def SignX509Certificate(cert, key, salt):
+ """Sign a X509 certificate.
+
+ An RFC822-like signature header is added in front of the certificate.
+
+ @type cert: OpenSSL.crypto.X509
+ @param cert: X509 certificate object
+ @type key: string
+ @param key: Key for HMAC
+ @type salt: string
+ @param salt: Salt for HMAC
+ @rtype: string
+ @return: Serialized and signed certificate in PEM format
+
+ """
+ if not VALID_X509_SIGNATURE_SALT.match(salt):
+ raise errors.GenericError("Invalid salt: %r" % salt)
+
+ # Dumping as PEM here ensures the certificate is in a sane format
+ cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
+
+ return ("%s: %s/%s\n\n%s" %
+ (constants.X509_CERT_SIGNATURE_HEADER, salt,
+ hmac.new(key, salt + cert_pem, sha1).hexdigest(),
+ cert_pem))
+
+
+def _ExtractX509CertificateSignature(cert_pem):
+ """Helper function to extract signature from X509 certificate.
+
+ """
+ # Extract signature from original PEM data
+ for line in cert_pem.splitlines():
+ if line.startswith("---"):
+ break
+
+ m = X509_SIGNATURE.match(line.strip())
+ if m:
+ return (m.group("salt"), m.group("sign"))
+
+ raise errors.GenericError("X509 certificate signature is missing")
+
+
+def LoadSignedX509Certificate(cert_pem, key):
+ """Verifies a signed X509 certificate.
+
+ @type cert_pem: string
+ @param cert_pem: Certificate in PEM format and with signature header
+ @type key: string
+ @param key: Key for HMAC
+ @rtype: tuple; (OpenSSL.crypto.X509, string)
+ @return: X509 certificate object and salt
+
+ """
+ (salt, signature) = _ExtractX509CertificateSignature(cert_pem)
+
+ # Load certificate
+ cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_pem)
+
+ # Dump again to ensure it's in a sane format
+ sane_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
+
+ if signature != hmac.new(key, salt + sane_pem, sha1).hexdigest():
+ raise errors.GenericError("X509 certificate signature is invalid")
+
+ return (cert, salt)
+
+
def SafeEncode(text):
"""Return a 'safe' version of a source string.
self.assertEqual(validity, (None, None))
+class TestSignX509Certificate(unittest.TestCase):
+ KEY = "My private key!"
+ KEY_OTHER = "Another key"
+
+ def test(self):
+ # Generate certificate valid for 5 minutes
+ (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
+
+ cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+ cert_pem)
+
+ # No signature at all
+ self.assertRaises(errors.GenericError,
+ utils.LoadSignedX509Certificate, cert_pem, self.KEY)
+
+ # Invalid input
+ self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
+ "", self.KEY)
+ self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
+ "X-Ganeti-Signature: \n", self.KEY)
+ self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
+ "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
+ self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
+ "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
+ self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
+ "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
+
+ # Invalid salt
+ for salt in list("-_@$,:;/\\ \t\n"):
+ self.assertRaises(errors.GenericError, utils.SignX509Certificate,
+ cert_pem, self.KEY, "foo%sbar" % salt)
+
+ for salt in ["HelloWorld", "salt", string.letters, string.digits,
+ utils.GenerateSecret(numbytes=4),
+ utils.GenerateSecret(numbytes=16),
+ "{123:456}".encode("hex")]:
+ signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
+
+ self._Check(cert, salt, signed_pem)
+
+ self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
+ self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
+ self._Check(cert, salt, (signed_pem + "\n\na few more\n"
+ "lines----\n------ at\nthe end!"))
+
+ def _Check(self, cert, salt, pem):
+ (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
+ self.assertEqual(salt, salt2)
+ self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
+
+ # Other key
+ self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
+ pem, self.KEY_OTHER)
+
+
if __name__ == '__main__':
testutils.GanetiTestProgram()