Statistics
| Branch: | Tag: | Revision:

root / lib / utils / x509.py @ 81f7ea25

History | View | Annotate | Download (9.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Utility functions for X509.
22

23
"""
24

    
25
import time
26
import OpenSSL
27
import re
28
import datetime
29
import calendar
30

    
31
from ganeti import errors
32
from ganeti import constants
33

    
34
from ganeti.utils import text as utils_text
35
from ganeti.utils import io as utils_io
36
from ganeti.utils import hash as utils_hash
37

    
38

    
39
HEX_CHAR_RE = r"[a-zA-Z0-9]"
40
VALID_X509_SIGNATURE_SALT = re.compile("^%s+$" % HEX_CHAR_RE, re.S)
41
X509_SIGNATURE = re.compile(r"^%s:\s*(?P<salt>%s+)/(?P<sign>%s+)$" %
42
                            (re.escape(constants.X509_CERT_SIGNATURE_HEADER),
43
                             HEX_CHAR_RE, HEX_CHAR_RE),
44
                            re.S | re.I)
45

    
46
# Certificate verification results
47
(CERT_WARNING,
48
 CERT_ERROR) = range(1, 3)
49

    
50
#: ASN1 time regexp
51
_ASN1_TIME_REGEX = re.compile(r"^(\d+)([-+]\d\d)(\d\d)$")
52

    
53

    
54
def _ParseAsn1Generalizedtime(value):
55
  """Parses an ASN1 GENERALIZEDTIME timestamp as used by pyOpenSSL.
56

57
  @type value: string
58
  @param value: ASN1 GENERALIZEDTIME timestamp
59
  @return: Seconds since the Epoch (1970-01-01 00:00:00 UTC)
60

61
  """
62
  m = _ASN1_TIME_REGEX.match(value)
63
  if m:
64
    # We have an offset
65
    asn1time = m.group(1)
66
    hours = int(m.group(2))
67
    minutes = int(m.group(3))
68
    utcoffset = (60 * hours) + minutes
69
  else:
70
    if not value.endswith("Z"):
71
      raise ValueError("Missing timezone")
72
    asn1time = value[:-1]
73
    utcoffset = 0
74

    
75
  parsed = time.strptime(asn1time, "%Y%m%d%H%M%S")
76

    
77
  tt = datetime.datetime(*(parsed[:7])) - datetime.timedelta(minutes=utcoffset)
78

    
79
  return calendar.timegm(tt.utctimetuple())
80

    
81

    
82
def GetX509CertValidity(cert):
83
  """Returns the validity period of the certificate.
84

85
  @type cert: OpenSSL.crypto.X509
86
  @param cert: X509 certificate object
87

88
  """
89
  # The get_notBefore and get_notAfter functions are only supported in
90
  # pyOpenSSL 0.7 and above.
91
  try:
92
    get_notbefore_fn = cert.get_notBefore
93
  except AttributeError:
94
    not_before = None
95
  else:
96
    not_before_asn1 = get_notbefore_fn()
97

    
98
    if not_before_asn1 is None:
99
      not_before = None
100
    else:
101
      not_before = _ParseAsn1Generalizedtime(not_before_asn1)
102

    
103
  try:
104
    get_notafter_fn = cert.get_notAfter
105
  except AttributeError:
106
    not_after = None
107
  else:
108
    not_after_asn1 = get_notafter_fn()
109

    
110
    if not_after_asn1 is None:
111
      not_after = None
112
    else:
113
      not_after = _ParseAsn1Generalizedtime(not_after_asn1)
114

    
115
  return (not_before, not_after)
116

    
117

    
118
def _VerifyCertificateInner(expired, not_before, not_after, now,
119
                            warn_days, error_days):
120
  """Verifies certificate validity.
121

122
  @type expired: bool
123
  @param expired: Whether pyOpenSSL considers the certificate as expired
124
  @type not_before: number or None
125
  @param not_before: Unix timestamp before which certificate is not valid
126
  @type not_after: number or None
127
  @param not_after: Unix timestamp after which certificate is invalid
128
  @type now: number
129
  @param now: Current time as Unix timestamp
130
  @type warn_days: number or None
131
  @param warn_days: How many days before expiration a warning should be reported
132
  @type error_days: number or None
133
  @param error_days: How many days before expiration an error should be reported
134

135
  """
136
  if expired:
137
    msg = "Certificate is expired"
138

    
139
    if not_before is not None and not_after is not None:
140
      msg += (" (valid from %s to %s)" %
141
              (utils_text.FormatTime(not_before),
142
               utils_text.FormatTime(not_after)))
143
    elif not_before is not None:
144
      msg += " (valid from %s)" % utils_text.FormatTime(not_before)
145
    elif not_after is not None:
146
      msg += " (valid until %s)" % utils_text.FormatTime(not_after)
147

    
148
    return (CERT_ERROR, msg)
149

    
150
  elif not_before is not None and not_before > now:
151
    return (CERT_WARNING,
152
            "Certificate not yet valid (valid from %s)" %
153
            utils_text.FormatTime(not_before))
154

    
155
  elif not_after is not None:
156
    remaining_days = int((not_after - now) / (24 * 3600))
157

    
158
    msg = "Certificate expires in about %d days" % remaining_days
159

    
160
    if error_days is not None and remaining_days <= error_days:
161
      return (CERT_ERROR, msg)
162

    
163
    if warn_days is not None and remaining_days <= warn_days:
164
      return (CERT_WARNING, msg)
165

    
166
  return (None, None)
167

    
168

    
169
def VerifyX509Certificate(cert, warn_days, error_days):
170
  """Verifies a certificate for LUClusterVerify.
171

172
  @type cert: OpenSSL.crypto.X509
173
  @param cert: X509 certificate object
174
  @type warn_days: number or None
175
  @param warn_days: How many days before expiration a warning should be reported
176
  @type error_days: number or None
177
  @param error_days: How many days before expiration an error should be reported
178

179
  """
180
  # Depending on the pyOpenSSL version, this can just return (None, None)
181
  (not_before, not_after) = GetX509CertValidity(cert)
182

    
183
  return _VerifyCertificateInner(cert.has_expired(), not_before, not_after,
184
                                 time.time(), warn_days, error_days)
185

    
186

    
187
def SignX509Certificate(cert, key, salt):
188
  """Sign a X509 certificate.
189

190
  An RFC822-like signature header is added in front of the certificate.
191

192
  @type cert: OpenSSL.crypto.X509
193
  @param cert: X509 certificate object
194
  @type key: string
195
  @param key: Key for HMAC
196
  @type salt: string
197
  @param salt: Salt for HMAC
198
  @rtype: string
199
  @return: Serialized and signed certificate in PEM format
200

201
  """
202
  if not VALID_X509_SIGNATURE_SALT.match(salt):
203
    raise errors.GenericError("Invalid salt: %r" % salt)
204

    
205
  # Dumping as PEM here ensures the certificate is in a sane format
206
  cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
207

    
208
  return ("%s: %s/%s\n\n%s" %
209
          (constants.X509_CERT_SIGNATURE_HEADER, salt,
210
           utils_hash.Sha1Hmac(key, cert_pem, salt=salt),
211
           cert_pem))
212

    
213

    
214
def _ExtractX509CertificateSignature(cert_pem):
215
  """Helper function to extract signature from X509 certificate.
216

217
  """
218
  # Extract signature from original PEM data
219
  for line in cert_pem.splitlines():
220
    if line.startswith("---"):
221
      break
222

    
223
    m = X509_SIGNATURE.match(line.strip())
224
    if m:
225
      return (m.group("salt"), m.group("sign"))
226

    
227
  raise errors.GenericError("X509 certificate signature is missing")
228

    
229

    
230
def LoadSignedX509Certificate(cert_pem, key):
231
  """Verifies a signed X509 certificate.
232

233
  @type cert_pem: string
234
  @param cert_pem: Certificate in PEM format and with signature header
235
  @type key: string
236
  @param key: Key for HMAC
237
  @rtype: tuple; (OpenSSL.crypto.X509, string)
238
  @return: X509 certificate object and salt
239

240
  """
241
  (salt, signature) = _ExtractX509CertificateSignature(cert_pem)
242

    
243
  # Load certificate
244
  cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_pem)
245

    
246
  # Dump again to ensure it's in a sane format
247
  sane_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
248

    
249
  if not utils_hash.VerifySha1Hmac(key, sane_pem, signature, salt=salt):
250
    raise errors.GenericError("X509 certificate signature is invalid")
251

    
252
  return (cert, salt)
253

    
254

    
255
def GenerateSelfSignedX509Cert(common_name, validity):
256
  """Generates a self-signed X509 certificate.
257

258
  @type common_name: string
259
  @param common_name: commonName value
260
  @type validity: int
261
  @param validity: Validity for certificate in seconds
262
  @return: a tuple of strings containing the PEM-encoded private key and
263
           certificate
264

265
  """
266
  # Create private and public key
267
  key = OpenSSL.crypto.PKey()
268
  key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS)
269

    
270
  # Create self-signed certificate
271
  cert = OpenSSL.crypto.X509()
272
  if common_name:
273
    cert.get_subject().CN = common_name
274
  cert.set_serial_number(1)
275
  cert.gmtime_adj_notBefore(0)
276
  cert.gmtime_adj_notAfter(validity)
277
  cert.set_issuer(cert.get_subject())
278
  cert.set_pubkey(key)
279
  cert.sign(key, constants.X509_CERT_SIGN_DIGEST)
280

    
281
  key_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key)
282
  cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
283

    
284
  return (key_pem, cert_pem)
285

    
286

    
287
def GenerateSelfSignedSslCert(filename, common_name=constants.X509_CERT_CN,
288
                              validity=constants.X509_CERT_DEFAULT_VALIDITY):
289
  """Legacy function to generate self-signed X509 certificate.
290

291
  @type filename: str
292
  @param filename: path to write certificate to
293
  @type common_name: string
294
  @param common_name: commonName value
295
  @type validity: int
296
  @param validity: validity of certificate in number of days
297
  @return: a tuple of strings containing the PEM-encoded private key and
298
           certificate
299

300
  """
301
  # TODO: Investigate using the cluster name instead of X505_CERT_CN for
302
  # common_name, as cluster-renames are very seldom, and it'd be nice if RAPI
303
  # and node daemon certificates have the proper Subject/Issuer.
304
  (key_pem, cert_pem) = GenerateSelfSignedX509Cert(common_name,
305
                                                   validity * 24 * 60 * 60)
306

    
307
  utils_io.WriteFile(filename, mode=0400, data=key_pem + cert_pem)
308
  return (key_pem, cert_pem)