Statistics
| Branch: | Tag: | Revision:

root / lib / utils / x509.py @ 86b9a385

History | View | Annotate | Download (10.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Utility functions for X509.
22

23
"""
24

    
25
import time
26
import OpenSSL
27
import re
28
import datetime
29
import calendar
30

    
31
from ganeti import errors
32
from ganeti import constants
33

    
34
from ganeti.utils import text as utils_text
35
from ganeti.utils import io as utils_io
36
from ganeti.utils import hash as utils_hash
37

    
38

    
39
HEX_CHAR_RE = r"[a-zA-Z0-9]"
40
VALID_X509_SIGNATURE_SALT = re.compile("^%s+$" % HEX_CHAR_RE, re.S)
41
X509_SIGNATURE = re.compile(r"^%s:\s*(?P<salt>%s+)/(?P<sign>%s+)$" %
42
                            (re.escape(constants.X509_CERT_SIGNATURE_HEADER),
43
                             HEX_CHAR_RE, HEX_CHAR_RE),
44
                            re.S | re.I)
45

    
46
# Certificate verification results
47
(CERT_WARNING,
48
 CERT_ERROR) = range(1, 3)
49

    
50
#: ASN1 time regexp
51
_ASN1_TIME_REGEX = re.compile(r"^(\d+)([-+]\d\d)(\d\d)$")
52

    
53

    
54
def _ParseAsn1Generalizedtime(value):
55
  """Parses an ASN1 GENERALIZEDTIME timestamp as used by pyOpenSSL.
56

57
  @type value: string
58
  @param value: ASN1 GENERALIZEDTIME timestamp
59
  @return: Seconds since the Epoch (1970-01-01 00:00:00 UTC)
60

61
  """
62
  m = _ASN1_TIME_REGEX.match(value)
63
  if m:
64
    # We have an offset
65
    asn1time = m.group(1)
66
    hours = int(m.group(2))
67
    minutes = int(m.group(3))
68
    utcoffset = (60 * hours) + minutes
69
  else:
70
    if not value.endswith("Z"):
71
      raise ValueError("Missing timezone")
72
    asn1time = value[:-1]
73
    utcoffset = 0
74

    
75
  parsed = time.strptime(asn1time, "%Y%m%d%H%M%S")
76

    
77
  tt = datetime.datetime(*(parsed[:7])) - datetime.timedelta(minutes=utcoffset)
78

    
79
  return calendar.timegm(tt.utctimetuple())
80

    
81

    
82
def GetX509CertValidity(cert):
83
  """Returns the validity period of the certificate.
84

85
  @type cert: OpenSSL.crypto.X509
86
  @param cert: X509 certificate object
87

88
  """
89
  # The get_notBefore and get_notAfter functions are only supported in
90
  # pyOpenSSL 0.7 and above.
91
  try:
92
    get_notbefore_fn = cert.get_notBefore
93
  except AttributeError:
94
    not_before = None
95
  else:
96
    not_before_asn1 = get_notbefore_fn()
97

    
98
    if not_before_asn1 is None:
99
      not_before = None
100
    else:
101
      not_before = _ParseAsn1Generalizedtime(not_before_asn1)
102

    
103
  try:
104
    get_notafter_fn = cert.get_notAfter
105
  except AttributeError:
106
    not_after = None
107
  else:
108
    not_after_asn1 = get_notafter_fn()
109

    
110
    if not_after_asn1 is None:
111
      not_after = None
112
    else:
113
      not_after = _ParseAsn1Generalizedtime(not_after_asn1)
114

    
115
  return (not_before, not_after)
116

    
117

    
118
def _VerifyCertificateInner(expired, not_before, not_after, now,
119
                            warn_days, error_days):
120
  """Verifies certificate validity.
121

122
  @type expired: bool
123
  @param expired: Whether pyOpenSSL considers the certificate as expired
124
  @type not_before: number or None
125
  @param not_before: Unix timestamp before which certificate is not valid
126
  @type not_after: number or None
127
  @param not_after: Unix timestamp after which certificate is invalid
128
  @type now: number
129
  @param now: Current time as Unix timestamp
130
  @type warn_days: number or None
131
  @param warn_days: How many days before expiration a warning should be reported
132
  @type error_days: number or None
133
  @param error_days: How many days before expiration an error should be reported
134

135
  """
136
  if expired:
137
    msg = "Certificate is expired"
138

    
139
    if not_before is not None and not_after is not None:
140
      msg += (" (valid from %s to %s)" %
141
              (utils_text.FormatTime(not_before),
142
               utils_text.FormatTime(not_after)))
143
    elif not_before is not None:
144
      msg += " (valid from %s)" % utils_text.FormatTime(not_before)
145
    elif not_after is not None:
146
      msg += " (valid until %s)" % utils_text.FormatTime(not_after)
147

    
148
    return (CERT_ERROR, msg)
149

    
150
  elif not_before is not None and not_before > now:
151
    return (CERT_WARNING,
152
            "Certificate not yet valid (valid from %s)" %
153
            utils_text.FormatTime(not_before))
154

    
155
  elif not_after is not None:
156
    remaining_days = int((not_after - now) / (24 * 3600))
157

    
158
    msg = "Certificate expires in about %d days" % remaining_days
159

    
160
    if error_days is not None and remaining_days <= error_days:
161
      return (CERT_ERROR, msg)
162

    
163
    if warn_days is not None and remaining_days <= warn_days:
164
      return (CERT_WARNING, msg)
165

    
166
  return (None, None)
167

    
168

    
169
def VerifyX509Certificate(cert, warn_days, error_days):
170
  """Verifies a certificate for LUClusterVerify.
171

172
  @type cert: OpenSSL.crypto.X509
173
  @param cert: X509 certificate object
174
  @type warn_days: number or None
175
  @param warn_days: How many days before expiration a warning should be reported
176
  @type error_days: number or None
177
  @param error_days: How many days before expiration an error should be reported
178

179
  """
180
  # Depending on the pyOpenSSL version, this can just return (None, None)
181
  (not_before, not_after) = GetX509CertValidity(cert)
182

    
183
  now = time.time() + constants.NODE_MAX_CLOCK_SKEW
184

    
185
  return _VerifyCertificateInner(cert.has_expired(), not_before, not_after,
186
                                 now, warn_days, error_days)
187

    
188

    
189
def SignX509Certificate(cert, key, salt):
190
  """Sign a X509 certificate.
191

192
  An RFC822-like signature header is added in front of the certificate.
193

194
  @type cert: OpenSSL.crypto.X509
195
  @param cert: X509 certificate object
196
  @type key: string
197
  @param key: Key for HMAC
198
  @type salt: string
199
  @param salt: Salt for HMAC
200
  @rtype: string
201
  @return: Serialized and signed certificate in PEM format
202

203
  """
204
  if not VALID_X509_SIGNATURE_SALT.match(salt):
205
    raise errors.GenericError("Invalid salt: %r" % salt)
206

    
207
  # Dumping as PEM here ensures the certificate is in a sane format
208
  cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
209

    
210
  return ("%s: %s/%s\n\n%s" %
211
          (constants.X509_CERT_SIGNATURE_HEADER, salt,
212
           utils_hash.Sha1Hmac(key, cert_pem, salt=salt),
213
           cert_pem))
214

    
215

    
216
def _ExtractX509CertificateSignature(cert_pem):
217
  """Helper function to extract signature from X509 certificate.
218

219
  """
220
  # Extract signature from original PEM data
221
  for line in cert_pem.splitlines():
222
    if line.startswith("---"):
223
      break
224

    
225
    m = X509_SIGNATURE.match(line.strip())
226
    if m:
227
      return (m.group("salt"), m.group("sign"))
228

    
229
  raise errors.GenericError("X509 certificate signature is missing")
230

    
231

    
232
def LoadSignedX509Certificate(cert_pem, key):
233
  """Verifies a signed X509 certificate.
234

235
  @type cert_pem: string
236
  @param cert_pem: Certificate in PEM format and with signature header
237
  @type key: string
238
  @param key: Key for HMAC
239
  @rtype: tuple; (OpenSSL.crypto.X509, string)
240
  @return: X509 certificate object and salt
241

242
  """
243
  (salt, signature) = _ExtractX509CertificateSignature(cert_pem)
244

    
245
  # Load and dump certificate to ensure it's in a sane format
246
  (cert, sane_pem) = ExtractX509Certificate(cert_pem)
247

    
248
  if not utils_hash.VerifySha1Hmac(key, sane_pem, signature, salt=salt):
249
    raise errors.GenericError("X509 certificate signature is invalid")
250

    
251
  return (cert, salt)
252

    
253

    
254
def GenerateSelfSignedX509Cert(common_name, validity):
255
  """Generates a self-signed X509 certificate.
256

257
  @type common_name: string
258
  @param common_name: commonName value
259
  @type validity: int
260
  @param validity: Validity for certificate in seconds
261
  @return: a tuple of strings containing the PEM-encoded private key and
262
           certificate
263

264
  """
265
  # Create private and public key
266
  key = OpenSSL.crypto.PKey()
267
  key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS)
268

    
269
  # Create self-signed certificate
270
  cert = OpenSSL.crypto.X509()
271
  if common_name:
272
    cert.get_subject().CN = common_name
273
  cert.set_serial_number(1)
274
  cert.gmtime_adj_notBefore(0)
275
  cert.gmtime_adj_notAfter(validity)
276
  cert.set_issuer(cert.get_subject())
277
  cert.set_pubkey(key)
278
  cert.sign(key, constants.X509_CERT_SIGN_DIGEST)
279

    
280
  key_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
281
  cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
282

    
283
  return (key_pem, cert_pem)
284

    
285

    
286
def GenerateSelfSignedSslCert(filename, common_name=constants.X509_CERT_CN,
287
                              validity=constants.X509_CERT_DEFAULT_VALIDITY):
288
  """Legacy function to generate self-signed X509 certificate.
289

290
  @type filename: str
291
  @param filename: path to write certificate to
292
  @type common_name: string
293
  @param common_name: commonName value
294
  @type validity: int
295
  @param validity: validity of certificate in number of days
296
  @return: a tuple of strings containing the PEM-encoded private key and
297
           certificate
298

299
  """
300
  # TODO: Investigate using the cluster name instead of X505_CERT_CN for
301
  # common_name, as cluster-renames are very seldom, and it'd be nice if RAPI
302
  # and node daemon certificates have the proper Subject/Issuer.
303
  (key_pem, cert_pem) = GenerateSelfSignedX509Cert(common_name,
304
                                                   validity * 24 * 60 * 60)
305

    
306
  utils_io.WriteFile(filename, mode=0400, data=key_pem + cert_pem)
307
  return (key_pem, cert_pem)
308

    
309

    
310
def ExtractX509Certificate(pem):
311
  """Extracts the certificate from a PEM-formatted string.
312

313
  @type pem: string
314
  @rtype: tuple; (OpenSSL.X509 object, string)
315
  @return: Certificate object and PEM-formatted certificate
316

317
  """
318
  cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
319

    
320
  return (cert,
321
          OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert))
322

    
323

    
324
def PrepareX509CertKeyCheck(cert, key):
325
  """Get function for verifying certificate with a certain private key.
326

327
  @type key: OpenSSL.crypto.PKey
328
  @param key: Private key object
329
  @type cert: OpenSSL.crypto.X509
330
  @param cert: X509 certificate object
331
  @rtype: callable
332
  @return: Callable doing the actual check; will raise C{OpenSSL.SSL.Error} if
333
    certificate is not signed by given private key
334

335
  """
336
  ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_METHOD)
337
  ctx.use_privatekey(key)
338
  ctx.use_certificate(cert)
339

    
340
  return ctx.check_privatekey