4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21 """Utility functions for X509.
33 from ganeti import errors
34 from ganeti import constants
35 from ganeti import pathutils
37 from ganeti.utils import text as utils_text
38 from ganeti.utils import io as utils_io
39 from ganeti.utils import hash as utils_hash
42 HEX_CHAR_RE = r"[a-zA-Z0-9]"
43 VALID_X509_SIGNATURE_SALT = re.compile("^%s+$" % HEX_CHAR_RE, re.S)
44 X509_SIGNATURE = re.compile(r"^%s:\s*(?P<salt>%s+)/(?P<sign>%s+)$" %
45 (re.escape(constants.X509_CERT_SIGNATURE_HEADER),
46 HEX_CHAR_RE, HEX_CHAR_RE),
49 # Certificate verification results
51 CERT_ERROR) = range(1, 3)
54 _ASN1_TIME_REGEX = re.compile(r"^(\d+)([-+]\d\d)(\d\d)$")
57 def _ParseAsn1Generalizedtime(value):
58 """Parses an ASN1 GENERALIZEDTIME timestamp as used by pyOpenSSL.
61 @param value: ASN1 GENERALIZEDTIME timestamp
62 @return: Seconds since the Epoch (1970-01-01 00:00:00 UTC)
65 m = _ASN1_TIME_REGEX.match(value)
69 hours = int(m.group(2))
70 minutes = int(m.group(3))
71 utcoffset = (60 * hours) + minutes
73 if not value.endswith("Z"):
74 raise ValueError("Missing timezone")
78 parsed = time.strptime(asn1time, "%Y%m%d%H%M%S")
80 tt = datetime.datetime(*(parsed[:7])) - datetime.timedelta(minutes=utcoffset)
82 return calendar.timegm(tt.utctimetuple())
85 def GetX509CertValidity(cert):
86 """Returns the validity period of the certificate.
88 @type cert: OpenSSL.crypto.X509
89 @param cert: X509 certificate object
92 # The get_notBefore and get_notAfter functions are only supported in
93 # pyOpenSSL 0.7 and above.
95 get_notbefore_fn = cert.get_notBefore
96 except AttributeError:
99 not_before_asn1 = get_notbefore_fn()
101 if not_before_asn1 is None:
104 not_before = _ParseAsn1Generalizedtime(not_before_asn1)
107 get_notafter_fn = cert.get_notAfter
108 except AttributeError:
111 not_after_asn1 = get_notafter_fn()
113 if not_after_asn1 is None:
116 not_after = _ParseAsn1Generalizedtime(not_after_asn1)
118 return (not_before, not_after)
121 def _VerifyCertificateInner(expired, not_before, not_after, now,
122 warn_days, error_days):
123 """Verifies certificate validity.
126 @param expired: Whether pyOpenSSL considers the certificate as expired
127 @type not_before: number or None
128 @param not_before: Unix timestamp before which certificate is not valid
129 @type not_after: number or None
130 @param not_after: Unix timestamp after which certificate is invalid
132 @param now: Current time as Unix timestamp
133 @type warn_days: number or None
134 @param warn_days: How many days before expiration a warning should be reported
135 @type error_days: number or None
136 @param error_days: How many days before expiration an error should be reported
140 msg = "Certificate is expired"
142 if not_before is not None and not_after is not None:
143 msg += (" (valid from %s to %s)" %
144 (utils_text.FormatTime(not_before),
145 utils_text.FormatTime(not_after)))
146 elif not_before is not None:
147 msg += " (valid from %s)" % utils_text.FormatTime(not_before)
148 elif not_after is not None:
149 msg += " (valid until %s)" % utils_text.FormatTime(not_after)
151 return (CERT_ERROR, msg)
153 elif not_before is not None and not_before > now:
154 return (CERT_WARNING,
155 "Certificate not yet valid (valid from %s)" %
156 utils_text.FormatTime(not_before))
158 elif not_after is not None:
159 remaining_days = int((not_after - now) / (24 * 3600))
161 msg = "Certificate expires in about %d days" % remaining_days
163 if error_days is not None and remaining_days <= error_days:
164 return (CERT_ERROR, msg)
166 if warn_days is not None and remaining_days <= warn_days:
167 return (CERT_WARNING, msg)
172 def VerifyX509Certificate(cert, warn_days, error_days):
173 """Verifies a certificate for LUClusterVerify.
175 @type cert: OpenSSL.crypto.X509
176 @param cert: X509 certificate object
177 @type warn_days: number or None
178 @param warn_days: How many days before expiration a warning should be reported
179 @type error_days: number or None
180 @param error_days: How many days before expiration an error should be reported
183 # Depending on the pyOpenSSL version, this can just return (None, None)
184 (not_before, not_after) = GetX509CertValidity(cert)
186 now = time.time() + constants.NODE_MAX_CLOCK_SKEW
188 return _VerifyCertificateInner(cert.has_expired(), not_before, not_after,
189 now, warn_days, error_days)
192 def SignX509Certificate(cert, key, salt):
193 """Sign a X509 certificate.
195 An RFC822-like signature header is added in front of the certificate.
197 @type cert: OpenSSL.crypto.X509
198 @param cert: X509 certificate object
200 @param key: Key for HMAC
202 @param salt: Salt for HMAC
204 @return: Serialized and signed certificate in PEM format
207 if not VALID_X509_SIGNATURE_SALT.match(salt):
208 raise errors.GenericError("Invalid salt: %r" % salt)
210 # Dumping as PEM here ensures the certificate is in a sane format
211 cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
213 return ("%s: %s/%s\n\n%s" %
214 (constants.X509_CERT_SIGNATURE_HEADER, salt,
215 utils_hash.Sha1Hmac(key, cert_pem, salt=salt),
219 def _ExtractX509CertificateSignature(cert_pem):
220 """Helper function to extract signature from X509 certificate.
223 # Extract signature from original PEM data
224 for line in cert_pem.splitlines():
225 if line.startswith("---"):
228 m = X509_SIGNATURE.match(line.strip())
230 return (m.group("salt"), m.group("sign"))
232 raise errors.GenericError("X509 certificate signature is missing")
235 def LoadSignedX509Certificate(cert_pem, key):
236 """Verifies a signed X509 certificate.
238 @type cert_pem: string
239 @param cert_pem: Certificate in PEM format and with signature header
241 @param key: Key for HMAC
242 @rtype: tuple; (OpenSSL.crypto.X509, string)
243 @return: X509 certificate object and salt
246 (salt, signature) = _ExtractX509CertificateSignature(cert_pem)
248 # Load and dump certificate to ensure it's in a sane format
249 (cert, sane_pem) = ExtractX509Certificate(cert_pem)
251 if not utils_hash.VerifySha1Hmac(key, sane_pem, signature, salt=salt):
252 raise errors.GenericError("X509 certificate signature is invalid")
257 def GenerateSelfSignedX509Cert(common_name, validity):
258 """Generates a self-signed X509 certificate.
260 @type common_name: string
261 @param common_name: commonName value
263 @param validity: Validity for certificate in seconds
264 @return: a tuple of strings containing the PEM-encoded private key and
268 # Create private and public key
269 key = OpenSSL.crypto.PKey()
270 key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS)
272 # Create self-signed certificate
273 cert = OpenSSL.crypto.X509()
275 cert.get_subject().CN = common_name
276 cert.set_serial_number(1)
277 cert.gmtime_adj_notBefore(0)
278 cert.gmtime_adj_notAfter(validity)
279 cert.set_issuer(cert.get_subject())
281 cert.sign(key, constants.X509_CERT_SIGN_DIGEST)
283 key_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
284 cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
286 return (key_pem, cert_pem)
289 def GenerateSelfSignedSslCert(filename, common_name=constants.X509_CERT_CN,
290 validity=constants.X509_CERT_DEFAULT_VALIDITY):
291 """Legacy function to generate self-signed X509 certificate.
294 @param filename: path to write certificate to
295 @type common_name: string
296 @param common_name: commonName value
298 @param validity: validity of certificate in number of days
299 @return: a tuple of strings containing the PEM-encoded private key and
303 # TODO: Investigate using the cluster name instead of X505_CERT_CN for
304 # common_name, as cluster-renames are very seldom, and it'd be nice if RAPI
305 # and node daemon certificates have the proper Subject/Issuer.
306 (key_pem, cert_pem) = GenerateSelfSignedX509Cert(common_name,
307 validity * 24 * 60 * 60)
309 utils_io.WriteFile(filename, mode=0400, data=key_pem + cert_pem)
310 return (key_pem, cert_pem)
313 def ExtractX509Certificate(pem):
314 """Extracts the certificate from a PEM-formatted string.
317 @rtype: tuple; (OpenSSL.X509 object, string)
318 @return: Certificate object and PEM-formatted certificate
321 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
324 OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert))
327 def PrepareX509CertKeyCheck(cert, key):
328 """Get function for verifying certificate with a certain private key.
330 @type key: OpenSSL.crypto.PKey
331 @param key: Private key object
332 @type cert: OpenSSL.crypto.X509
333 @param cert: X509 certificate object
335 @return: Callable doing the actual check; will raise C{OpenSSL.SSL.Error} if
336 certificate is not signed by given private key
339 ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_METHOD)
340 ctx.use_privatekey(key)
341 ctx.use_certificate(cert)
343 return ctx.check_privatekey
346 def CheckNodeCertificate(cert, _noded_cert_file=pathutils.NODED_CERT_FILE):
347 """Checks the local node daemon certificate against given certificate.
349 Both certificates must be signed with the same key (as stored in the local
350 L{pathutils.NODED_CERT_FILE} file). No error is raised if no local
351 certificate can be found.
353 @type cert: OpenSSL.crypto.X509
354 @param cert: X509 certificate object
355 @raise errors.X509CertError: When an error related to X509 occurred
356 @raise errors.GenericError: When the verification failed
360 noded_pem = utils_io.ReadFile(_noded_cert_file)
361 except EnvironmentError, err:
362 if err.errno != errno.ENOENT:
365 logging.debug("Node certificate file '%s' was not found", _noded_cert_file)
370 OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, noded_pem)
371 except Exception, err:
372 raise errors.X509CertError(_noded_cert_file,
373 "Unable to load certificate: %s" % err)
377 OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, noded_pem)
378 except Exception, err:
379 raise errors.X509CertError(_noded_cert_file,
380 "Unable to load private key: %s" % err)
382 # Check consistency of server.pem file
383 check_fn = PrepareX509CertKeyCheck(noded_cert, noded_key)
386 except OpenSSL.SSL.Error:
387 # This should never happen as it would mean the certificate in server.pem
388 # is out of sync with the private key stored in the same file
389 raise errors.X509CertError(_noded_cert_file,
390 "Certificate does not match with private key")
392 # Check with supplied certificate with local key
393 check_fn = PrepareX509CertKeyCheck(cert, noded_key)
396 except OpenSSL.SSL.Error:
397 raise errors.GenericError("Given cluster certificate does not match"