4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Utility functions for X509.
31 from ganeti import errors
32 from ganeti import constants
34 from ganeti.utils import text as utils_text
35 from ganeti.utils import io as utils_io
36 from ganeti.utils import hash as utils_hash
39 HEX_CHAR_RE = r"[a-zA-Z0-9]"
40 VALID_X509_SIGNATURE_SALT = re.compile("^%s+$" % HEX_CHAR_RE, re.S)
41 X509_SIGNATURE = re.compile(r"^%s:\s*(?P<salt>%s+)/(?P<sign>%s+)$" %
42 (re.escape(constants.X509_CERT_SIGNATURE_HEADER),
43 HEX_CHAR_RE, HEX_CHAR_RE),
46 # Certificate verification results
48 CERT_ERROR) = range(1, 3)
51 _ASN1_TIME_REGEX = re.compile(r"^(\d+)([-+]\d\d)(\d\d)$")
54 def _ParseAsn1Generalizedtime(value):
55 """Parses an ASN1 GENERALIZEDTIME timestamp as used by pyOpenSSL.
58 @param value: ASN1 GENERALIZEDTIME timestamp
59 @return: Seconds since the Epoch (1970-01-01 00:00:00 UTC)
62 m = _ASN1_TIME_REGEX.match(value)
66 hours = int(m.group(2))
67 minutes = int(m.group(3))
68 utcoffset = (60 * hours) + minutes
70 if not value.endswith("Z"):
71 raise ValueError("Missing timezone")
75 parsed = time.strptime(asn1time, "%Y%m%d%H%M%S")
77 tt = datetime.datetime(*(parsed[:7])) - datetime.timedelta(minutes=utcoffset)
79 return calendar.timegm(tt.utctimetuple())
82 def GetX509CertValidity(cert):
83 """Returns the validity period of the certificate.
85 @type cert: OpenSSL.crypto.X509
86 @param cert: X509 certificate object
89 # The get_notBefore and get_notAfter functions are only supported in
90 # pyOpenSSL 0.7 and above.
92 get_notbefore_fn = cert.get_notBefore
93 except AttributeError:
96 not_before_asn1 = get_notbefore_fn()
98 if not_before_asn1 is None:
101 not_before = _ParseAsn1Generalizedtime(not_before_asn1)
104 get_notafter_fn = cert.get_notAfter
105 except AttributeError:
108 not_after_asn1 = get_notafter_fn()
110 if not_after_asn1 is None:
113 not_after = _ParseAsn1Generalizedtime(not_after_asn1)
115 return (not_before, not_after)
118 def _VerifyCertificateInner(expired, not_before, not_after, now,
119 warn_days, error_days):
120 """Verifies certificate validity.
123 @param expired: Whether pyOpenSSL considers the certificate as expired
124 @type not_before: number or None
125 @param not_before: Unix timestamp before which certificate is not valid
126 @type not_after: number or None
127 @param not_after: Unix timestamp after which certificate is invalid
129 @param now: Current time as Unix timestamp
130 @type warn_days: number or None
131 @param warn_days: How many days before expiration a warning should be reported
132 @type error_days: number or None
133 @param error_days: How many days before expiration an error should be reported
137 msg = "Certificate is expired"
139 if not_before is not None and not_after is not None:
140 msg += (" (valid from %s to %s)" %
141 (utils_text.FormatTime(not_before),
142 utils_text.FormatTime(not_after)))
143 elif not_before is not None:
144 msg += " (valid from %s)" % utils_text.FormatTime(not_before)
145 elif not_after is not None:
146 msg += " (valid until %s)" % utils_text.FormatTime(not_after)
148 return (CERT_ERROR, msg)
150 elif not_before is not None and not_before > now:
151 return (CERT_WARNING,
152 "Certificate not yet valid (valid from %s)" %
153 utils_text.FormatTime(not_before))
155 elif not_after is not None:
156 remaining_days = int((not_after - now) / (24 * 3600))
158 msg = "Certificate expires in about %d days" % remaining_days
160 if error_days is not None and remaining_days <= error_days:
161 return (CERT_ERROR, msg)
163 if warn_days is not None and remaining_days <= warn_days:
164 return (CERT_WARNING, msg)
169 def VerifyX509Certificate(cert, warn_days, error_days):
170 """Verifies a certificate for LUClusterVerify.
172 @type cert: OpenSSL.crypto.X509
173 @param cert: X509 certificate object
174 @type warn_days: number or None
175 @param warn_days: How many days before expiration a warning should be reported
176 @type error_days: number or None
177 @param error_days: How many days before expiration an error should be reported
180 # Depending on the pyOpenSSL version, this can just return (None, None)
181 (not_before, not_after) = GetX509CertValidity(cert)
183 return _VerifyCertificateInner(cert.has_expired(), not_before, not_after,
184 time.time(), warn_days, error_days)
187 def SignX509Certificate(cert, key, salt):
188 """Sign a X509 certificate.
190 An RFC822-like signature header is added in front of the certificate.
192 @type cert: OpenSSL.crypto.X509
193 @param cert: X509 certificate object
195 @param key: Key for HMAC
197 @param salt: Salt for HMAC
199 @return: Serialized and signed certificate in PEM format
202 if not VALID_X509_SIGNATURE_SALT.match(salt):
203 raise errors.GenericError("Invalid salt: %r" % salt)
205 # Dumping as PEM here ensures the certificate is in a sane format
206 cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
208 return ("%s: %s/%s\n\n%s" %
209 (constants.X509_CERT_SIGNATURE_HEADER, salt,
210 utils_hash.Sha1Hmac(key, cert_pem, salt=salt),
214 def _ExtractX509CertificateSignature(cert_pem):
215 """Helper function to extract signature from X509 certificate.
218 # Extract signature from original PEM data
219 for line in cert_pem.splitlines():
220 if line.startswith("---"):
223 m = X509_SIGNATURE.match(line.strip())
225 return (m.group("salt"), m.group("sign"))
227 raise errors.GenericError("X509 certificate signature is missing")
230 def LoadSignedX509Certificate(cert_pem, key):
231 """Verifies a signed X509 certificate.
233 @type cert_pem: string
234 @param cert_pem: Certificate in PEM format and with signature header
236 @param key: Key for HMAC
237 @rtype: tuple; (OpenSSL.crypto.X509, string)
238 @return: X509 certificate object and salt
241 (salt, signature) = _ExtractX509CertificateSignature(cert_pem)
244 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_pem)
246 # Dump again to ensure it's in a sane format
247 sane_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
249 if not utils_hash.VerifySha1Hmac(key, sane_pem, signature, salt=salt):
250 raise errors.GenericError("X509 certificate signature is invalid")
255 def GenerateSelfSignedX509Cert(common_name, validity):
256 """Generates a self-signed X509 certificate.
258 @type common_name: string
259 @param common_name: commonName value
261 @param validity: Validity for certificate in seconds
264 # Create private and public key
265 key = OpenSSL.crypto.PKey()
266 key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS)
268 # Create self-signed certificate
269 cert = OpenSSL.crypto.X509()
271 cert.get_subject().CN = common_name
272 cert.set_serial_number(1)
273 cert.gmtime_adj_notBefore(0)
274 cert.gmtime_adj_notAfter(validity)
275 cert.set_issuer(cert.get_subject())
277 cert.sign(key, constants.X509_CERT_SIGN_DIGEST)
279 key_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
280 cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
282 return (key_pem, cert_pem)
285 def GenerateSelfSignedSslCert(filename, common_name=constants.X509_CERT_CN,
286 validity=constants.X509_CERT_DEFAULT_VALIDITY):
287 """Legacy function to generate self-signed X509 certificate.
290 @param filename: path to write certificate to
291 @type common_name: string
292 @param common_name: commonName value
294 @param validity: validity of certificate in number of days
297 # TODO: Investigate using the cluster name instead of X505_CERT_CN for
298 # common_name, as cluster-renames are very seldom, and it'd be nice if RAPI
299 # and node daemon certificates have the proper Subject/Issuer.
300 (key_pem, cert_pem) = GenerateSelfSignedX509Cert(common_name,
301 validity * 24 * 60 * 60)
303 utils_io.WriteFile(filename, mode=0400, data=key_pem + cert_pem)