71ba25dc34d6e0a91739fa9cde779d9851f1d51d
[ganeti-local] / lib / utils / x509.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21 """Utility functions for X509.
22
23 """
24
25 import time
26 import OpenSSL
27 import re
28 import datetime
29 import calendar
30
31 from ganeti import errors
32 from ganeti import constants
33
34 from ganeti.utils import text as utils_text
35 from ganeti.utils import io as utils_io
36 from ganeti.utils import hash as utils_hash
37
38
39 HEX_CHAR_RE = r"[a-zA-Z0-9]"
40 VALID_X509_SIGNATURE_SALT = re.compile("^%s+$" % HEX_CHAR_RE, re.S)
41 X509_SIGNATURE = re.compile(r"^%s:\s*(?P<salt>%s+)/(?P<sign>%s+)$" %
42                             (re.escape(constants.X509_CERT_SIGNATURE_HEADER),
43                              HEX_CHAR_RE, HEX_CHAR_RE),
44                             re.S | re.I)
45
46 # Certificate verification results
47 (CERT_WARNING,
48  CERT_ERROR) = range(1, 3)
49
50 #: ASN1 time regexp
51 _ASN1_TIME_REGEX = re.compile(r"^(\d+)([-+]\d\d)(\d\d)$")
52
53
54 def _ParseAsn1Generalizedtime(value):
55   """Parses an ASN1 GENERALIZEDTIME timestamp as used by pyOpenSSL.
56
57   @type value: string
58   @param value: ASN1 GENERALIZEDTIME timestamp
59   @return: Seconds since the Epoch (1970-01-01 00:00:00 UTC)
60
61   """
62   m = _ASN1_TIME_REGEX.match(value)
63   if m:
64     # We have an offset
65     asn1time = m.group(1)
66     hours = int(m.group(2))
67     minutes = int(m.group(3))
68     utcoffset = (60 * hours) + minutes
69   else:
70     if not value.endswith("Z"):
71       raise ValueError("Missing timezone")
72     asn1time = value[:-1]
73     utcoffset = 0
74
75   parsed = time.strptime(asn1time, "%Y%m%d%H%M%S")
76
77   tt = datetime.datetime(*(parsed[:7])) - datetime.timedelta(minutes=utcoffset)
78
79   return calendar.timegm(tt.utctimetuple())
80
81
82 def GetX509CertValidity(cert):
83   """Returns the validity period of the certificate.
84
85   @type cert: OpenSSL.crypto.X509
86   @param cert: X509 certificate object
87
88   """
89   # The get_notBefore and get_notAfter functions are only supported in
90   # pyOpenSSL 0.7 and above.
91   try:
92     get_notbefore_fn = cert.get_notBefore
93   except AttributeError:
94     not_before = None
95   else:
96     not_before_asn1 = get_notbefore_fn()
97
98     if not_before_asn1 is None:
99       not_before = None
100     else:
101       not_before = _ParseAsn1Generalizedtime(not_before_asn1)
102
103   try:
104     get_notafter_fn = cert.get_notAfter
105   except AttributeError:
106     not_after = None
107   else:
108     not_after_asn1 = get_notafter_fn()
109
110     if not_after_asn1 is None:
111       not_after = None
112     else:
113       not_after = _ParseAsn1Generalizedtime(not_after_asn1)
114
115   return (not_before, not_after)
116
117
118 def _VerifyCertificateInner(expired, not_before, not_after, now,
119                             warn_days, error_days):
120   """Verifies certificate validity.
121
122   @type expired: bool
123   @param expired: Whether pyOpenSSL considers the certificate as expired
124   @type not_before: number or None
125   @param not_before: Unix timestamp before which certificate is not valid
126   @type not_after: number or None
127   @param not_after: Unix timestamp after which certificate is invalid
128   @type now: number
129   @param now: Current time as Unix timestamp
130   @type warn_days: number or None
131   @param warn_days: How many days before expiration a warning should be reported
132   @type error_days: number or None
133   @param error_days: How many days before expiration an error should be reported
134
135   """
136   if expired:
137     msg = "Certificate is expired"
138
139     if not_before is not None and not_after is not None:
140       msg += (" (valid from %s to %s)" %
141               (utils_text.FormatTime(not_before),
142                utils_text.FormatTime(not_after)))
143     elif not_before is not None:
144       msg += " (valid from %s)" % utils_text.FormatTime(not_before)
145     elif not_after is not None:
146       msg += " (valid until %s)" % utils_text.FormatTime(not_after)
147
148     return (CERT_ERROR, msg)
149
150   elif not_before is not None and not_before > now:
151     return (CERT_WARNING,
152             "Certificate not yet valid (valid from %s)" %
153             utils_text.FormatTime(not_before))
154
155   elif not_after is not None:
156     remaining_days = int((not_after - now) / (24 * 3600))
157
158     msg = "Certificate expires in about %d days" % remaining_days
159
160     if error_days is not None and remaining_days <= error_days:
161       return (CERT_ERROR, msg)
162
163     if warn_days is not None and remaining_days <= warn_days:
164       return (CERT_WARNING, msg)
165
166   return (None, None)
167
168
169 def VerifyX509Certificate(cert, warn_days, error_days):
170   """Verifies a certificate for LUClusterVerify.
171
172   @type cert: OpenSSL.crypto.X509
173   @param cert: X509 certificate object
174   @type warn_days: number or None
175   @param warn_days: How many days before expiration a warning should be reported
176   @type error_days: number or None
177   @param error_days: How many days before expiration an error should be reported
178
179   """
180   # Depending on the pyOpenSSL version, this can just return (None, None)
181   (not_before, not_after) = GetX509CertValidity(cert)
182
183   return _VerifyCertificateInner(cert.has_expired(), not_before, not_after,
184                                  time.time(), warn_days, error_days)
185
186
187 def SignX509Certificate(cert, key, salt):
188   """Sign a X509 certificate.
189
190   An RFC822-like signature header is added in front of the certificate.
191
192   @type cert: OpenSSL.crypto.X509
193   @param cert: X509 certificate object
194   @type key: string
195   @param key: Key for HMAC
196   @type salt: string
197   @param salt: Salt for HMAC
198   @rtype: string
199   @return: Serialized and signed certificate in PEM format
200
201   """
202   if not VALID_X509_SIGNATURE_SALT.match(salt):
203     raise errors.GenericError("Invalid salt: %r" % salt)
204
205   # Dumping as PEM here ensures the certificate is in a sane format
206   cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
207
208   return ("%s: %s/%s\n\n%s" %
209           (constants.X509_CERT_SIGNATURE_HEADER, salt,
210            utils_hash.Sha1Hmac(key, cert_pem, salt=salt),
211            cert_pem))
212
213
214 def _ExtractX509CertificateSignature(cert_pem):
215   """Helper function to extract signature from X509 certificate.
216
217   """
218   # Extract signature from original PEM data
219   for line in cert_pem.splitlines():
220     if line.startswith("---"):
221       break
222
223     m = X509_SIGNATURE.match(line.strip())
224     if m:
225       return (m.group("salt"), m.group("sign"))
226
227   raise errors.GenericError("X509 certificate signature is missing")
228
229
230 def LoadSignedX509Certificate(cert_pem, key):
231   """Verifies a signed X509 certificate.
232
233   @type cert_pem: string
234   @param cert_pem: Certificate in PEM format and with signature header
235   @type key: string
236   @param key: Key for HMAC
237   @rtype: tuple; (OpenSSL.crypto.X509, string)
238   @return: X509 certificate object and salt
239
240   """
241   (salt, signature) = _ExtractX509CertificateSignature(cert_pem)
242
243   # Load certificate
244   cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_pem)
245
246   # Dump again to ensure it's in a sane format
247   sane_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
248
249   if not utils_hash.VerifySha1Hmac(key, sane_pem, signature, salt=salt):
250     raise errors.GenericError("X509 certificate signature is invalid")
251
252   return (cert, salt)
253
254
255 def GenerateSelfSignedX509Cert(common_name, validity):
256   """Generates a self-signed X509 certificate.
257
258   @type common_name: string
259   @param common_name: commonName value
260   @type validity: int
261   @param validity: Validity for certificate in seconds
262
263   """
264   # Create private and public key
265   key = OpenSSL.crypto.PKey()
266   key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS)
267
268   # Create self-signed certificate
269   cert = OpenSSL.crypto.X509()
270   if common_name:
271     cert.get_subject().CN = common_name
272   cert.set_serial_number(1)
273   cert.gmtime_adj_notBefore(0)
274   cert.gmtime_adj_notAfter(validity)
275   cert.set_issuer(cert.get_subject())
276   cert.set_pubkey(key)
277   cert.sign(key, constants.X509_CERT_SIGN_DIGEST)
278
279   key_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
280   cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
281
282   return (key_pem, cert_pem)
283
284
285 def GenerateSelfSignedSslCert(filename, common_name=constants.X509_CERT_CN,
286                               validity=constants.X509_CERT_DEFAULT_VALIDITY):
287   """Legacy function to generate self-signed X509 certificate.
288
289   @type filename: str
290   @param filename: path to write certificate to
291   @type common_name: string
292   @param common_name: commonName value
293   @type validity: int
294   @param validity: validity of certificate in number of days
295
296   """
297   # TODO: Investigate using the cluster name instead of X505_CERT_CN for
298   # common_name, as cluster-renames are very seldom, and it'd be nice if RAPI
299   # and node daemon certificates have the proper Subject/Issuer.
300   (key_pem, cert_pem) = GenerateSelfSignedX509Cert(common_name,
301                                                    validity * 24 * 60 * 60)
302
303   utils_io.WriteFile(filename, mode=0400, data=key_pem + cert_pem)