+def _VerifyCertificateInner(expired, not_before, not_after, now,
+ warn_days, error_days):
+ """Verifies certificate validity.
+
+ @type expired: bool
+ @param expired: Whether pyOpenSSL considers the certificate as expired
+ @type not_before: number or None
+ @param not_before: Unix timestamp before which certificate is not valid
+ @type not_after: number or None
+ @param not_after: Unix timestamp after which certificate is invalid
+ @type now: number
+ @param now: Current time as Unix timestamp
+ @type warn_days: number or None
+ @param warn_days: How many days before expiration a warning should be reported
+ @type error_days: number or None
+ @param error_days: How many days before expiration an error should be reported
+
+ """
+ if expired:
+ msg = "Certificate is expired"
+
+ if not_before is not None and not_after is not None:
+ msg += (" (valid from %s to %s)" %
+ (FormatTime(not_before), FormatTime(not_after)))
+ elif not_before is not None:
+ msg += " (valid from %s)" % FormatTime(not_before)
+ elif not_after is not None:
+ msg += " (valid until %s)" % FormatTime(not_after)
+
+ return (CERT_ERROR, msg)
+
+ elif not_before is not None and not_before > now:
+ return (CERT_WARNING,
+ "Certificate not yet valid (valid from %s)" %
+ FormatTime(not_before))
+
+ elif not_after is not None:
+ remaining_days = int((not_after - now) / (24 * 3600))
+
+ msg = "Certificate expires in about %d days" % remaining_days
+
+ if error_days is not None and remaining_days <= error_days:
+ return (CERT_ERROR, msg)
+
+ if warn_days is not None and remaining_days <= warn_days:
+ return (CERT_WARNING, msg)
+
+ return (None, None)
+
+
+def VerifyX509Certificate(cert, warn_days, error_days):
+ """Verifies a certificate for LUVerifyCluster.
+
+ @type cert: OpenSSL.crypto.X509
+ @param cert: X509 certificate object
+ @type warn_days: number or None
+ @param warn_days: How many days before expiration a warning should be reported
+ @type error_days: number or None
+ @param error_days: How many days before expiration an error should be reported
+
+ """
+ # Depending on the pyOpenSSL version, this can just return (None, None)
+ (not_before, not_after) = GetX509CertValidity(cert)
+
+ return _VerifyCertificateInner(cert.has_expired(), not_before, not_after,
+ time.time(), warn_days, error_days)
+
+
+def SignX509Certificate(cert, key, salt):
+ """Sign a X509 certificate.
+
+ An RFC822-like signature header is added in front of the certificate.
+
+ @type cert: OpenSSL.crypto.X509
+ @param cert: X509 certificate object
+ @type key: string
+ @param key: Key for HMAC
+ @type salt: string
+ @param salt: Salt for HMAC
+ @rtype: string
+ @return: Serialized and signed certificate in PEM format
+
+ """
+ if not VALID_X509_SIGNATURE_SALT.match(salt):
+ raise errors.GenericError("Invalid salt: %r" % salt)
+
+ # Dumping as PEM here ensures the certificate is in a sane format
+ cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
+
+ return ("%s: %s/%s\n\n%s" %
+ (constants.X509_CERT_SIGNATURE_HEADER, salt,
+ Sha1Hmac(key, cert_pem, salt=salt),
+ cert_pem))
+
+
+def _ExtractX509CertificateSignature(cert_pem):
+ """Helper function to extract signature from X509 certificate.
+
+ """
+ # Extract signature from original PEM data
+ for line in cert_pem.splitlines():
+ if line.startswith("---"):
+ break
+
+ m = X509_SIGNATURE.match(line.strip())
+ if m:
+ return (m.group("salt"), m.group("sign"))
+
+ raise errors.GenericError("X509 certificate signature is missing")
+
+
+def LoadSignedX509Certificate(cert_pem, key):
+ """Verifies a signed X509 certificate.
+
+ @type cert_pem: string
+ @param cert_pem: Certificate in PEM format and with signature header
+ @type key: string
+ @param key: Key for HMAC
+ @rtype: tuple; (OpenSSL.crypto.X509, string)
+ @return: X509 certificate object and salt
+
+ """
+ (salt, signature) = _ExtractX509CertificateSignature(cert_pem)
+
+ # Load certificate
+ cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_pem)
+
+ # Dump again to ensure it's in a sane format
+ sane_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
+
+ if not VerifySha1Hmac(key, sane_pem, signature, salt=salt):
+ raise errors.GenericError("X509 certificate signature is invalid")
+
+ return (cert, salt)
+
+
+def Sha1Hmac(key, text, salt=None):
+ """Calculates the HMAC-SHA1 digest of a text.
+
+ HMAC is defined in RFC2104.
+
+ @type key: string
+ @param key: Secret key
+ @type text: string
+
+ """
+ if salt:
+ salted_text = salt + text
+ else:
+ salted_text = text
+
+ return hmac.new(key, salted_text, compat.sha1).hexdigest()
+
+
+def VerifySha1Hmac(key, text, digest, salt=None):
+ """Verifies the HMAC-SHA1 digest of a text.
+
+ HMAC is defined in RFC2104.
+
+ @type key: string
+ @param key: Secret key
+ @type text: string
+ @type digest: string
+ @param digest: Expected digest
+ @rtype: bool
+ @return: Whether HMAC-SHA1 digest matches
+
+ """
+ return digest.lower() == Sha1Hmac(key, text, salt=salt).lower()
+
+