Revision c50645c0
b/Makefile.am | ||
---|---|---|
221 | 221 |
lib/utils/mlock.py \ |
222 | 222 |
lib/utils/retry.py \ |
223 | 223 |
lib/utils/text.py \ |
224 |
lib/utils/wrapper.py |
|
224 |
lib/utils/wrapper.py \ |
|
225 |
lib/utils/x509.py |
|
225 | 226 |
|
226 | 227 |
docrst = \ |
227 | 228 |
doc/admin.rst \ |
... | ... | |
495 | 496 |
test/ganeti.utils.retry_unittest.py \ |
496 | 497 |
test/ganeti.utils.text_unittest.py \ |
497 | 498 |
test/ganeti.utils.wrapper_unittest.py \ |
499 |
test/ganeti.utils.x509_unittest.py \ |
|
498 | 500 |
test/ganeti.utils_unittest.py \ |
499 | 501 |
test/ganeti.workerpool_unittest.py \ |
500 | 502 |
test/cfgupgrade_unittest.py \ |
b/lib/utils/__init__.py | ||
---|---|---|
43 | 43 |
import resource |
44 | 44 |
import logging |
45 | 45 |
import signal |
46 |
import OpenSSL |
|
47 | 46 |
import datetime |
48 | 47 |
import calendar |
49 | 48 |
|
... | ... | |
62 | 61 |
from ganeti.utils.wrapper import * # pylint: disable-msg=W0401 |
63 | 62 |
from ganeti.utils.filelock import * # pylint: disable-msg=W0401 |
64 | 63 |
from ganeti.utils.io import * # pylint: disable-msg=W0401 |
64 |
from ganeti.utils.x509 import * # pylint: disable-msg=W0401 |
|
65 | 65 |
|
66 | 66 |
|
67 | 67 |
#: when set to True, L{RunCmd} is disabled |
... | ... | |
69 | 69 |
|
70 | 70 |
_RANDOM_UUID_FILE = "/proc/sys/kernel/random/uuid" |
71 | 71 |
|
72 |
HEX_CHAR_RE = r"[a-zA-Z0-9]" |
|
73 |
VALID_X509_SIGNATURE_SALT = re.compile("^%s+$" % HEX_CHAR_RE, re.S) |
|
74 |
X509_SIGNATURE = re.compile(r"^%s:\s*(?P<salt>%s+)/(?P<sign>%s+)$" % |
|
75 |
(re.escape(constants.X509_CERT_SIGNATURE_HEADER), |
|
76 |
HEX_CHAR_RE, HEX_CHAR_RE), |
|
77 |
re.S | re.I) |
|
78 |
|
|
79 | 72 |
_VALID_SERVICE_NAME_RE = re.compile("^[-_.a-zA-Z0-9]{1,128}$") |
80 | 73 |
|
81 | 74 |
UUID_RE = re.compile('^[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-' |
82 | 75 |
'[a-f0-9]{4}-[a-f0-9]{12}$') |
83 | 76 |
|
84 |
# Certificate verification results |
|
85 |
(CERT_WARNING, |
|
86 |
CERT_ERROR) = range(1, 3) |
|
87 |
|
|
88 | 77 |
(_TIMEOUT_NONE, |
89 | 78 |
_TIMEOUT_TERM, |
90 | 79 |
_TIMEOUT_KILL) = range(3) |
... | ... | |
92 | 81 |
#: Shell param checker regexp |
93 | 82 |
_SHELLPARAM_REGEX = re.compile(r"^[-a-zA-Z0-9._+/:%@]+$") |
94 | 83 |
|
95 |
#: ASN1 time regexp |
|
96 |
_ASN1_TIME_REGEX = re.compile(r"^(\d+)([-+]\d\d)(\d\d)$") |
|
97 |
|
|
98 | 84 |
|
99 | 85 |
def DisableFork(): |
100 | 86 |
"""Disables the use of fork(2). |
... | ... | |
1572 | 1558 |
return float(seconds) + (float(microseconds) * 0.000001) |
1573 | 1559 |
|
1574 | 1560 |
|
1575 |
def _ParseAsn1Generalizedtime(value): |
|
1576 |
"""Parses an ASN1 GENERALIZEDTIME timestamp as used by pyOpenSSL. |
|
1577 |
|
|
1578 |
@type value: string |
|
1579 |
@param value: ASN1 GENERALIZEDTIME timestamp |
|
1580 |
@return: Seconds since the Epoch (1970-01-01 00:00:00 UTC) |
|
1581 |
|
|
1582 |
""" |
|
1583 |
m = _ASN1_TIME_REGEX.match(value) |
|
1584 |
if m: |
|
1585 |
# We have an offset |
|
1586 |
asn1time = m.group(1) |
|
1587 |
hours = int(m.group(2)) |
|
1588 |
minutes = int(m.group(3)) |
|
1589 |
utcoffset = (60 * hours) + minutes |
|
1590 |
else: |
|
1591 |
if not value.endswith("Z"): |
|
1592 |
raise ValueError("Missing timezone") |
|
1593 |
asn1time = value[:-1] |
|
1594 |
utcoffset = 0 |
|
1595 |
|
|
1596 |
parsed = time.strptime(asn1time, "%Y%m%d%H%M%S") |
|
1597 |
|
|
1598 |
tt = datetime.datetime(*(parsed[:7])) - datetime.timedelta(minutes=utcoffset) |
|
1599 |
|
|
1600 |
return calendar.timegm(tt.utctimetuple()) |
|
1601 |
|
|
1602 |
|
|
1603 |
def GetX509CertValidity(cert): |
|
1604 |
"""Returns the validity period of the certificate. |
|
1605 |
|
|
1606 |
@type cert: OpenSSL.crypto.X509 |
|
1607 |
@param cert: X509 certificate object |
|
1608 |
|
|
1609 |
""" |
|
1610 |
# The get_notBefore and get_notAfter functions are only supported in |
|
1611 |
# pyOpenSSL 0.7 and above. |
|
1612 |
try: |
|
1613 |
get_notbefore_fn = cert.get_notBefore |
|
1614 |
except AttributeError: |
|
1615 |
not_before = None |
|
1616 |
else: |
|
1617 |
not_before_asn1 = get_notbefore_fn() |
|
1618 |
|
|
1619 |
if not_before_asn1 is None: |
|
1620 |
not_before = None |
|
1621 |
else: |
|
1622 |
not_before = _ParseAsn1Generalizedtime(not_before_asn1) |
|
1623 |
|
|
1624 |
try: |
|
1625 |
get_notafter_fn = cert.get_notAfter |
|
1626 |
except AttributeError: |
|
1627 |
not_after = None |
|
1628 |
else: |
|
1629 |
not_after_asn1 = get_notafter_fn() |
|
1630 |
|
|
1631 |
if not_after_asn1 is None: |
|
1632 |
not_after = None |
|
1633 |
else: |
|
1634 |
not_after = _ParseAsn1Generalizedtime(not_after_asn1) |
|
1635 |
|
|
1636 |
return (not_before, not_after) |
|
1637 |
|
|
1638 |
|
|
1639 |
def _VerifyCertificateInner(expired, not_before, not_after, now, |
|
1640 |
warn_days, error_days): |
|
1641 |
"""Verifies certificate validity. |
|
1642 |
|
|
1643 |
@type expired: bool |
|
1644 |
@param expired: Whether pyOpenSSL considers the certificate as expired |
|
1645 |
@type not_before: number or None |
|
1646 |
@param not_before: Unix timestamp before which certificate is not valid |
|
1647 |
@type not_after: number or None |
|
1648 |
@param not_after: Unix timestamp after which certificate is invalid |
|
1649 |
@type now: number |
|
1650 |
@param now: Current time as Unix timestamp |
|
1651 |
@type warn_days: number or None |
|
1652 |
@param warn_days: How many days before expiration a warning should be reported |
|
1653 |
@type error_days: number or None |
|
1654 |
@param error_days: How many days before expiration an error should be reported |
|
1655 |
|
|
1656 |
""" |
|
1657 |
if expired: |
|
1658 |
msg = "Certificate is expired" |
|
1659 |
|
|
1660 |
if not_before is not None and not_after is not None: |
|
1661 |
msg += (" (valid from %s to %s)" % |
|
1662 |
(FormatTime(not_before), FormatTime(not_after))) |
|
1663 |
elif not_before is not None: |
|
1664 |
msg += " (valid from %s)" % FormatTime(not_before) |
|
1665 |
elif not_after is not None: |
|
1666 |
msg += " (valid until %s)" % FormatTime(not_after) |
|
1667 |
|
|
1668 |
return (CERT_ERROR, msg) |
|
1669 |
|
|
1670 |
elif not_before is not None and not_before > now: |
|
1671 |
return (CERT_WARNING, |
|
1672 |
"Certificate not yet valid (valid from %s)" % |
|
1673 |
FormatTime(not_before)) |
|
1674 |
|
|
1675 |
elif not_after is not None: |
|
1676 |
remaining_days = int((not_after - now) / (24 * 3600)) |
|
1677 |
|
|
1678 |
msg = "Certificate expires in about %d days" % remaining_days |
|
1679 |
|
|
1680 |
if error_days is not None and remaining_days <= error_days: |
|
1681 |
return (CERT_ERROR, msg) |
|
1682 |
|
|
1683 |
if warn_days is not None and remaining_days <= warn_days: |
|
1684 |
return (CERT_WARNING, msg) |
|
1685 |
|
|
1686 |
return (None, None) |
|
1687 |
|
|
1688 |
|
|
1689 |
def VerifyX509Certificate(cert, warn_days, error_days): |
|
1690 |
"""Verifies a certificate for LUVerifyCluster. |
|
1691 |
|
|
1692 |
@type cert: OpenSSL.crypto.X509 |
|
1693 |
@param cert: X509 certificate object |
|
1694 |
@type warn_days: number or None |
|
1695 |
@param warn_days: How many days before expiration a warning should be reported |
|
1696 |
@type error_days: number or None |
|
1697 |
@param error_days: How many days before expiration an error should be reported |
|
1698 |
|
|
1699 |
""" |
|
1700 |
# Depending on the pyOpenSSL version, this can just return (None, None) |
|
1701 |
(not_before, not_after) = GetX509CertValidity(cert) |
|
1702 |
|
|
1703 |
return _VerifyCertificateInner(cert.has_expired(), not_before, not_after, |
|
1704 |
time.time(), warn_days, error_days) |
|
1705 |
|
|
1706 |
|
|
1707 |
def SignX509Certificate(cert, key, salt): |
|
1708 |
"""Sign a X509 certificate. |
|
1709 |
|
|
1710 |
An RFC822-like signature header is added in front of the certificate. |
|
1711 |
|
|
1712 |
@type cert: OpenSSL.crypto.X509 |
|
1713 |
@param cert: X509 certificate object |
|
1714 |
@type key: string |
|
1715 |
@param key: Key for HMAC |
|
1716 |
@type salt: string |
|
1717 |
@param salt: Salt for HMAC |
|
1718 |
@rtype: string |
|
1719 |
@return: Serialized and signed certificate in PEM format |
|
1720 |
|
|
1721 |
""" |
|
1722 |
if not VALID_X509_SIGNATURE_SALT.match(salt): |
|
1723 |
raise errors.GenericError("Invalid salt: %r" % salt) |
|
1724 |
|
|
1725 |
# Dumping as PEM here ensures the certificate is in a sane format |
|
1726 |
cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert) |
|
1727 |
|
|
1728 |
return ("%s: %s/%s\n\n%s" % |
|
1729 |
(constants.X509_CERT_SIGNATURE_HEADER, salt, |
|
1730 |
Sha1Hmac(key, cert_pem, salt=salt), |
|
1731 |
cert_pem)) |
|
1732 |
|
|
1733 |
|
|
1734 |
def _ExtractX509CertificateSignature(cert_pem): |
|
1735 |
"""Helper function to extract signature from X509 certificate. |
|
1736 |
|
|
1737 |
""" |
|
1738 |
# Extract signature from original PEM data |
|
1739 |
for line in cert_pem.splitlines(): |
|
1740 |
if line.startswith("---"): |
|
1741 |
break |
|
1742 |
|
|
1743 |
m = X509_SIGNATURE.match(line.strip()) |
|
1744 |
if m: |
|
1745 |
return (m.group("salt"), m.group("sign")) |
|
1746 |
|
|
1747 |
raise errors.GenericError("X509 certificate signature is missing") |
|
1748 |
|
|
1749 |
|
|
1750 |
def LoadSignedX509Certificate(cert_pem, key): |
|
1751 |
"""Verifies a signed X509 certificate. |
|
1752 |
|
|
1753 |
@type cert_pem: string |
|
1754 |
@param cert_pem: Certificate in PEM format and with signature header |
|
1755 |
@type key: string |
|
1756 |
@param key: Key for HMAC |
|
1757 |
@rtype: tuple; (OpenSSL.crypto.X509, string) |
|
1758 |
@return: X509 certificate object and salt |
|
1759 |
|
|
1760 |
""" |
|
1761 |
(salt, signature) = _ExtractX509CertificateSignature(cert_pem) |
|
1762 |
|
|
1763 |
# Load certificate |
|
1764 |
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_pem) |
|
1765 |
|
|
1766 |
# Dump again to ensure it's in a sane format |
|
1767 |
sane_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert) |
|
1768 |
|
|
1769 |
if not VerifySha1Hmac(key, sane_pem, signature, salt=salt): |
|
1770 |
raise errors.GenericError("X509 certificate signature is invalid") |
|
1771 |
|
|
1772 |
return (cert, salt) |
|
1773 |
|
|
1774 |
|
|
1775 | 1561 |
def FindMatch(data, name): |
1776 | 1562 |
"""Tries to find an item in a dictionary matching a name. |
1777 | 1563 |
|
... | ... | |
1866 | 1652 |
return bool(exitcode) |
1867 | 1653 |
|
1868 | 1654 |
|
1869 |
def GenerateSelfSignedX509Cert(common_name, validity): |
|
1870 |
"""Generates a self-signed X509 certificate. |
|
1871 |
|
|
1872 |
@type common_name: string |
|
1873 |
@param common_name: commonName value |
|
1874 |
@type validity: int |
|
1875 |
@param validity: Validity for certificate in seconds |
|
1876 |
|
|
1877 |
""" |
|
1878 |
# Create private and public key |
|
1879 |
key = OpenSSL.crypto.PKey() |
|
1880 |
key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS) |
|
1881 |
|
|
1882 |
# Create self-signed certificate |
|
1883 |
cert = OpenSSL.crypto.X509() |
|
1884 |
if common_name: |
|
1885 |
cert.get_subject().CN = common_name |
|
1886 |
cert.set_serial_number(1) |
|
1887 |
cert.gmtime_adj_notBefore(0) |
|
1888 |
cert.gmtime_adj_notAfter(validity) |
|
1889 |
cert.set_issuer(cert.get_subject()) |
|
1890 |
cert.set_pubkey(key) |
|
1891 |
cert.sign(key, constants.X509_CERT_SIGN_DIGEST) |
|
1892 |
|
|
1893 |
key_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key) |
|
1894 |
cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert) |
|
1895 |
|
|
1896 |
return (key_pem, cert_pem) |
|
1897 |
|
|
1898 |
|
|
1899 |
def GenerateSelfSignedSslCert(filename, common_name=constants.X509_CERT_CN, |
|
1900 |
validity=constants.X509_CERT_DEFAULT_VALIDITY): |
|
1901 |
"""Legacy function to generate self-signed X509 certificate. |
|
1902 |
|
|
1903 |
@type filename: str |
|
1904 |
@param filename: path to write certificate to |
|
1905 |
@type common_name: string |
|
1906 |
@param common_name: commonName value |
|
1907 |
@type validity: int |
|
1908 |
@param validity: validity of certificate in number of days |
|
1909 |
|
|
1910 |
""" |
|
1911 |
# TODO: Investigate using the cluster name instead of X505_CERT_CN for |
|
1912 |
# common_name, as cluster-renames are very seldom, and it'd be nice if RAPI |
|
1913 |
# and node daemon certificates have the proper Subject/Issuer. |
|
1914 |
(key_pem, cert_pem) = GenerateSelfSignedX509Cert(common_name, |
|
1915 |
validity * 24 * 60 * 60) |
|
1916 |
|
|
1917 |
WriteFile(filename, mode=0400, data=key_pem + cert_pem) |
|
1918 |
|
|
1919 |
|
|
1920 | 1655 |
def SignalHandled(signums): |
1921 | 1656 |
"""Signal Handled decoration. |
1922 | 1657 |
|
b/lib/utils/x509.py | ||
---|---|---|
1 |
# |
|
2 |
# |
|
3 |
|
|
4 |
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc. |
|
5 |
# |
|
6 |
# This program is free software; you can redistribute it and/or modify |
|
7 |
# it under the terms of the GNU General Public License as published by |
|
8 |
# the Free Software Foundation; either version 2 of the License, or |
|
9 |
# (at your option) any later version. |
|
10 |
# |
|
11 |
# This program is distributed in the hope that it will be useful, but |
|
12 |
# WITHOUT ANY WARRANTY; without even the implied warranty of |
|
13 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
|
14 |
# General Public License for more details. |
|
15 |
# |
|
16 |
# You should have received a copy of the GNU General Public License |
|
17 |
# along with this program; if not, write to the Free Software |
|
18 |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
|
19 |
# 02110-1301, USA. |
|
20 |
|
|
21 |
"""Utility functions for X509. |
|
22 |
|
|
23 |
""" |
|
24 |
|
|
25 |
import time |
|
26 |
import OpenSSL |
|
27 |
import re |
|
28 |
import datetime |
|
29 |
import calendar |
|
30 |
|
|
31 |
from ganeti import errors |
|
32 |
from ganeti import constants |
|
33 |
|
|
34 |
from ganeti.utils import text as utils_text |
|
35 |
from ganeti.utils import io as utils_io |
|
36 |
from ganeti.utils import hash as utils_hash |
|
37 |
|
|
38 |
|
|
39 |
HEX_CHAR_RE = r"[a-zA-Z0-9]" |
|
40 |
VALID_X509_SIGNATURE_SALT = re.compile("^%s+$" % HEX_CHAR_RE, re.S) |
|
41 |
X509_SIGNATURE = re.compile(r"^%s:\s*(?P<salt>%s+)/(?P<sign>%s+)$" % |
|
42 |
(re.escape(constants.X509_CERT_SIGNATURE_HEADER), |
|
43 |
HEX_CHAR_RE, HEX_CHAR_RE), |
|
44 |
re.S | re.I) |
|
45 |
|
|
46 |
# Certificate verification results |
|
47 |
(CERT_WARNING, |
|
48 |
CERT_ERROR) = range(1, 3) |
|
49 |
|
|
50 |
#: ASN1 time regexp |
|
51 |
_ASN1_TIME_REGEX = re.compile(r"^(\d+)([-+]\d\d)(\d\d)$") |
|
52 |
|
|
53 |
|
|
54 |
def _ParseAsn1Generalizedtime(value): |
|
55 |
"""Parses an ASN1 GENERALIZEDTIME timestamp as used by pyOpenSSL. |
|
56 |
|
|
57 |
@type value: string |
|
58 |
@param value: ASN1 GENERALIZEDTIME timestamp |
|
59 |
@return: Seconds since the Epoch (1970-01-01 00:00:00 UTC) |
|
60 |
|
|
61 |
""" |
|
62 |
m = _ASN1_TIME_REGEX.match(value) |
|
63 |
if m: |
|
64 |
# We have an offset |
|
65 |
asn1time = m.group(1) |
|
66 |
hours = int(m.group(2)) |
|
67 |
minutes = int(m.group(3)) |
|
68 |
utcoffset = (60 * hours) + minutes |
|
69 |
else: |
|
70 |
if not value.endswith("Z"): |
|
71 |
raise ValueError("Missing timezone") |
|
72 |
asn1time = value[:-1] |
|
73 |
utcoffset = 0 |
|
74 |
|
|
75 |
parsed = time.strptime(asn1time, "%Y%m%d%H%M%S") |
|
76 |
|
|
77 |
tt = datetime.datetime(*(parsed[:7])) - datetime.timedelta(minutes=utcoffset) |
|
78 |
|
|
79 |
return calendar.timegm(tt.utctimetuple()) |
|
80 |
|
|
81 |
|
|
82 |
def GetX509CertValidity(cert): |
|
83 |
"""Returns the validity period of the certificate. |
|
84 |
|
|
85 |
@type cert: OpenSSL.crypto.X509 |
|
86 |
@param cert: X509 certificate object |
|
87 |
|
|
88 |
""" |
|
89 |
# The get_notBefore and get_notAfter functions are only supported in |
|
90 |
# pyOpenSSL 0.7 and above. |
|
91 |
try: |
|
92 |
get_notbefore_fn = cert.get_notBefore |
|
93 |
except AttributeError: |
|
94 |
not_before = None |
|
95 |
else: |
|
96 |
not_before_asn1 = get_notbefore_fn() |
|
97 |
|
|
98 |
if not_before_asn1 is None: |
|
99 |
not_before = None |
|
100 |
else: |
|
101 |
not_before = _ParseAsn1Generalizedtime(not_before_asn1) |
|
102 |
|
|
103 |
try: |
|
104 |
get_notafter_fn = cert.get_notAfter |
|
105 |
except AttributeError: |
|
106 |
not_after = None |
|
107 |
else: |
|
108 |
not_after_asn1 = get_notafter_fn() |
|
109 |
|
|
110 |
if not_after_asn1 is None: |
|
111 |
not_after = None |
|
112 |
else: |
|
113 |
not_after = _ParseAsn1Generalizedtime(not_after_asn1) |
|
114 |
|
|
115 |
return (not_before, not_after) |
|
116 |
|
|
117 |
|
|
118 |
def _VerifyCertificateInner(expired, not_before, not_after, now, |
|
119 |
warn_days, error_days): |
|
120 |
"""Verifies certificate validity. |
|
121 |
|
|
122 |
@type expired: bool |
|
123 |
@param expired: Whether pyOpenSSL considers the certificate as expired |
|
124 |
@type not_before: number or None |
|
125 |
@param not_before: Unix timestamp before which certificate is not valid |
|
126 |
@type not_after: number or None |
|
127 |
@param not_after: Unix timestamp after which certificate is invalid |
|
128 |
@type now: number |
|
129 |
@param now: Current time as Unix timestamp |
|
130 |
@type warn_days: number or None |
|
131 |
@param warn_days: How many days before expiration a warning should be reported |
|
132 |
@type error_days: number or None |
|
133 |
@param error_days: How many days before expiration an error should be reported |
|
134 |
|
|
135 |
""" |
|
136 |
if expired: |
|
137 |
msg = "Certificate is expired" |
|
138 |
|
|
139 |
if not_before is not None and not_after is not None: |
|
140 |
msg += (" (valid from %s to %s)" % |
|
141 |
(utils_text.FormatTime(not_before), |
|
142 |
utils_text.FormatTime(not_after))) |
|
143 |
elif not_before is not None: |
|
144 |
msg += " (valid from %s)" % utils_text.FormatTime(not_before) |
|
145 |
elif not_after is not None: |
|
146 |
msg += " (valid until %s)" % utils_text.FormatTime(not_after) |
|
147 |
|
|
148 |
return (CERT_ERROR, msg) |
|
149 |
|
|
150 |
elif not_before is not None and not_before > now: |
|
151 |
return (CERT_WARNING, |
|
152 |
"Certificate not yet valid (valid from %s)" % |
|
153 |
utils_text.FormatTime(not_before)) |
|
154 |
|
|
155 |
elif not_after is not None: |
|
156 |
remaining_days = int((not_after - now) / (24 * 3600)) |
|
157 |
|
|
158 |
msg = "Certificate expires in about %d days" % remaining_days |
|
159 |
|
|
160 |
if error_days is not None and remaining_days <= error_days: |
|
161 |
return (CERT_ERROR, msg) |
|
162 |
|
|
163 |
if warn_days is not None and remaining_days <= warn_days: |
|
164 |
return (CERT_WARNING, msg) |
|
165 |
|
|
166 |
return (None, None) |
|
167 |
|
|
168 |
|
|
169 |
def VerifyX509Certificate(cert, warn_days, error_days): |
|
170 |
"""Verifies a certificate for LUVerifyCluster. |
|
171 |
|
|
172 |
@type cert: OpenSSL.crypto.X509 |
|
173 |
@param cert: X509 certificate object |
|
174 |
@type warn_days: number or None |
|
175 |
@param warn_days: How many days before expiration a warning should be reported |
|
176 |
@type error_days: number or None |
|
177 |
@param error_days: How many days before expiration an error should be reported |
|
178 |
|
|
179 |
""" |
|
180 |
# Depending on the pyOpenSSL version, this can just return (None, None) |
|
181 |
(not_before, not_after) = GetX509CertValidity(cert) |
|
182 |
|
|
183 |
return _VerifyCertificateInner(cert.has_expired(), not_before, not_after, |
|
184 |
time.time(), warn_days, error_days) |
|
185 |
|
|
186 |
|
|
187 |
def SignX509Certificate(cert, key, salt): |
|
188 |
"""Sign a X509 certificate. |
|
189 |
|
|
190 |
An RFC822-like signature header is added in front of the certificate. |
|
191 |
|
|
192 |
@type cert: OpenSSL.crypto.X509 |
|
193 |
@param cert: X509 certificate object |
|
194 |
@type key: string |
|
195 |
@param key: Key for HMAC |
|
196 |
@type salt: string |
|
197 |
@param salt: Salt for HMAC |
|
198 |
@rtype: string |
|
199 |
@return: Serialized and signed certificate in PEM format |
|
200 |
|
|
201 |
""" |
|
202 |
if not VALID_X509_SIGNATURE_SALT.match(salt): |
|
203 |
raise errors.GenericError("Invalid salt: %r" % salt) |
|
204 |
|
|
205 |
# Dumping as PEM here ensures the certificate is in a sane format |
|
206 |
cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert) |
|
207 |
|
|
208 |
return ("%s: %s/%s\n\n%s" % |
|
209 |
(constants.X509_CERT_SIGNATURE_HEADER, salt, |
|
210 |
utils_hash.Sha1Hmac(key, cert_pem, salt=salt), |
|
211 |
cert_pem)) |
|
212 |
|
|
213 |
|
|
214 |
def _ExtractX509CertificateSignature(cert_pem): |
|
215 |
"""Helper function to extract signature from X509 certificate. |
|
216 |
|
|
217 |
""" |
|
218 |
# Extract signature from original PEM data |
|
219 |
for line in cert_pem.splitlines(): |
|
220 |
if line.startswith("---"): |
|
221 |
break |
|
222 |
|
|
223 |
m = X509_SIGNATURE.match(line.strip()) |
|
224 |
if m: |
|
225 |
return (m.group("salt"), m.group("sign")) |
|
226 |
|
|
227 |
raise errors.GenericError("X509 certificate signature is missing") |
|
228 |
|
|
229 |
|
|
230 |
def LoadSignedX509Certificate(cert_pem, key): |
|
231 |
"""Verifies a signed X509 certificate. |
|
232 |
|
|
233 |
@type cert_pem: string |
|
234 |
@param cert_pem: Certificate in PEM format and with signature header |
|
235 |
@type key: string |
|
236 |
@param key: Key for HMAC |
|
237 |
@rtype: tuple; (OpenSSL.crypto.X509, string) |
|
238 |
@return: X509 certificate object and salt |
|
239 |
|
|
240 |
""" |
|
241 |
(salt, signature) = _ExtractX509CertificateSignature(cert_pem) |
|
242 |
|
|
243 |
# Load certificate |
|
244 |
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_pem) |
|
245 |
|
|
246 |
# Dump again to ensure it's in a sane format |
|
247 |
sane_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert) |
|
248 |
|
|
249 |
if not utils_hash.VerifySha1Hmac(key, sane_pem, signature, salt=salt): |
|
250 |
raise errors.GenericError("X509 certificate signature is invalid") |
|
251 |
|
|
252 |
return (cert, salt) |
|
253 |
|
|
254 |
|
|
255 |
def GenerateSelfSignedX509Cert(common_name, validity): |
|
256 |
"""Generates a self-signed X509 certificate. |
|
257 |
|
|
258 |
@type common_name: string |
|
259 |
@param common_name: commonName value |
|
260 |
@type validity: int |
|
261 |
@param validity: Validity for certificate in seconds |
|
262 |
|
|
263 |
""" |
|
264 |
# Create private and public key |
|
265 |
key = OpenSSL.crypto.PKey() |
|
266 |
key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS) |
|
267 |
|
|
268 |
# Create self-signed certificate |
|
269 |
cert = OpenSSL.crypto.X509() |
|
270 |
if common_name: |
|
271 |
cert.get_subject().CN = common_name |
|
272 |
cert.set_serial_number(1) |
|
273 |
cert.gmtime_adj_notBefore(0) |
|
274 |
cert.gmtime_adj_notAfter(validity) |
|
275 |
cert.set_issuer(cert.get_subject()) |
|
276 |
cert.set_pubkey(key) |
|
277 |
cert.sign(key, constants.X509_CERT_SIGN_DIGEST) |
|
278 |
|
|
279 |
key_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, key) |
|
280 |
cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert) |
|
281 |
|
|
282 |
return (key_pem, cert_pem) |
|
283 |
|
|
284 |
|
|
285 |
def GenerateSelfSignedSslCert(filename, common_name=constants.X509_CERT_CN, |
|
286 |
validity=constants.X509_CERT_DEFAULT_VALIDITY): |
|
287 |
"""Legacy function to generate self-signed X509 certificate. |
|
288 |
|
|
289 |
@type filename: str |
|
290 |
@param filename: path to write certificate to |
|
291 |
@type common_name: string |
|
292 |
@param common_name: commonName value |
|
293 |
@type validity: int |
|
294 |
@param validity: validity of certificate in number of days |
|
295 |
|
|
296 |
""" |
|
297 |
# TODO: Investigate using the cluster name instead of X505_CERT_CN for |
|
298 |
# common_name, as cluster-renames are very seldom, and it'd be nice if RAPI |
|
299 |
# and node daemon certificates have the proper Subject/Issuer. |
|
300 |
(key_pem, cert_pem) = GenerateSelfSignedX509Cert(common_name, |
|
301 |
validity * 24 * 60 * 60) |
|
302 |
|
|
303 |
utils_io.WriteFile(filename, mode=0400, data=key_pem + cert_pem) |
b/test/ganeti.utils.x509_unittest.py | ||
---|---|---|
1 |
#!/usr/bin/python |
|
2 |
# |
|
3 |
|
|
4 |
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc. |
|
5 |
# |
|
6 |
# This program is free software; you can redistribute it and/or modify |
|
7 |
# it under the terms of the GNU General Public License as published by |
|
8 |
# the Free Software Foundation; either version 2 of the License, or |
|
9 |
# (at your option) any later version. |
|
10 |
# |
|
11 |
# This program is distributed in the hope that it will be useful, but |
|
12 |
# WITHOUT ANY WARRANTY; without even the implied warranty of |
|
13 |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
|
14 |
# General Public License for more details. |
|
15 |
# |
|
16 |
# You should have received a copy of the GNU General Public License |
|
17 |
# along with this program; if not, write to the Free Software |
|
18 |
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
|
19 |
# 02110-1301, USA. |
|
20 |
|
|
21 |
|
|
22 |
"""Script for testing ganeti.utils.x509""" |
|
23 |
|
|
24 |
import os |
|
25 |
import tempfile |
|
26 |
import unittest |
|
27 |
import shutil |
|
28 |
import time |
|
29 |
import OpenSSL |
|
30 |
import distutils.version |
|
31 |
import string |
|
32 |
|
|
33 |
from ganeti import constants |
|
34 |
from ganeti import utils |
|
35 |
from ganeti import compat |
|
36 |
from ganeti import errors |
|
37 |
|
|
38 |
import testutils |
|
39 |
|
|
40 |
|
|
41 |
class TestParseAsn1Generalizedtime(unittest.TestCase): |
|
42 |
def setUp(self): |
|
43 |
self._Parse = utils.x509._ParseAsn1Generalizedtime |
|
44 |
|
|
45 |
def test(self): |
|
46 |
# UTC |
|
47 |
self.assertEqual(self._Parse("19700101000000Z"), 0) |
|
48 |
self.assertEqual(self._Parse("20100222174152Z"), 1266860512) |
|
49 |
self.assertEqual(self._Parse("20380119031407Z"), (2**31) - 1) |
|
50 |
|
|
51 |
# With offset |
|
52 |
self.assertEqual(self._Parse("20100222174152+0000"), 1266860512) |
|
53 |
self.assertEqual(self._Parse("20100223131652+0000"), 1266931012) |
|
54 |
self.assertEqual(self._Parse("20100223051808-0800"), 1266931088) |
|
55 |
self.assertEqual(self._Parse("20100224002135+1100"), 1266931295) |
|
56 |
self.assertEqual(self._Parse("19700101000000-0100"), 3600) |
|
57 |
|
|
58 |
# Leap seconds are not supported by datetime.datetime |
|
59 |
self.assertRaises(ValueError, self._Parse, "19841231235960+0000") |
|
60 |
self.assertRaises(ValueError, self._Parse, "19920630235960+0000") |
|
61 |
|
|
62 |
# Errors |
|
63 |
self.assertRaises(ValueError, self._Parse, "") |
|
64 |
self.assertRaises(ValueError, self._Parse, "invalid") |
|
65 |
self.assertRaises(ValueError, self._Parse, "20100222174152") |
|
66 |
self.assertRaises(ValueError, self._Parse, "Mon Feb 22 17:47:02 UTC 2010") |
|
67 |
self.assertRaises(ValueError, self._Parse, "2010-02-22 17:42:02") |
|
68 |
|
|
69 |
|
|
70 |
class TestGetX509CertValidity(testutils.GanetiTestCase): |
|
71 |
def setUp(self): |
|
72 |
testutils.GanetiTestCase.setUp(self) |
|
73 |
|
|
74 |
pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__) |
|
75 |
|
|
76 |
# Test whether we have pyOpenSSL 0.7 or above |
|
77 |
self.pyopenssl0_7 = (pyopenssl_version >= "0.7") |
|
78 |
|
|
79 |
if not self.pyopenssl0_7: |
|
80 |
warnings.warn("This test requires pyOpenSSL 0.7 or above to" |
|
81 |
" function correctly") |
|
82 |
|
|
83 |
def _LoadCert(self, name): |
|
84 |
return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, |
|
85 |
self._ReadTestData(name)) |
|
86 |
|
|
87 |
def test(self): |
|
88 |
validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem")) |
|
89 |
if self.pyopenssl0_7: |
|
90 |
self.assertEqual(validity, (1266919967, 1267524767)) |
|
91 |
else: |
|
92 |
self.assertEqual(validity, (None, None)) |
|
93 |
|
|
94 |
|
|
95 |
class TestSignX509Certificate(unittest.TestCase): |
|
96 |
KEY = "My private key!" |
|
97 |
KEY_OTHER = "Another key" |
|
98 |
|
|
99 |
def test(self): |
|
100 |
# Generate certificate valid for 5 minutes |
|
101 |
(_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300) |
|
102 |
|
|
103 |
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, |
|
104 |
cert_pem) |
|
105 |
|
|
106 |
# No signature at all |
|
107 |
self.assertRaises(errors.GenericError, |
|
108 |
utils.LoadSignedX509Certificate, cert_pem, self.KEY) |
|
109 |
|
|
110 |
# Invalid input |
|
111 |
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, |
|
112 |
"", self.KEY) |
|
113 |
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, |
|
114 |
"X-Ganeti-Signature: \n", self.KEY) |
|
115 |
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, |
|
116 |
"X-Ganeti-Sign: $1234$abcdef\n", self.KEY) |
|
117 |
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, |
|
118 |
"X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY) |
|
119 |
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, |
|
120 |
"X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY) |
|
121 |
|
|
122 |
# Invalid salt |
|
123 |
for salt in list("-_@$,:;/\\ \t\n"): |
|
124 |
self.assertRaises(errors.GenericError, utils.SignX509Certificate, |
|
125 |
cert_pem, self.KEY, "foo%sbar" % salt) |
|
126 |
|
|
127 |
for salt in ["HelloWorld", "salt", string.letters, string.digits, |
|
128 |
utils.GenerateSecret(numbytes=4), |
|
129 |
utils.GenerateSecret(numbytes=16), |
|
130 |
"{123:456}".encode("hex")]: |
|
131 |
signed_pem = utils.SignX509Certificate(cert, self.KEY, salt) |
|
132 |
|
|
133 |
self._Check(cert, salt, signed_pem) |
|
134 |
|
|
135 |
self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem) |
|
136 |
self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem) |
|
137 |
self._Check(cert, salt, (signed_pem + "\n\na few more\n" |
|
138 |
"lines----\n------ at\nthe end!")) |
|
139 |
|
|
140 |
def _Check(self, cert, salt, pem): |
|
141 |
(cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY) |
|
142 |
self.assertEqual(salt, salt2) |
|
143 |
self.assertEqual(cert.digest("sha1"), cert2.digest("sha1")) |
|
144 |
|
|
145 |
# Other key |
|
146 |
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, |
|
147 |
pem, self.KEY_OTHER) |
|
148 |
|
|
149 |
|
|
150 |
class TestCertVerification(testutils.GanetiTestCase): |
|
151 |
def setUp(self): |
|
152 |
testutils.GanetiTestCase.setUp(self) |
|
153 |
|
|
154 |
self.tmpdir = tempfile.mkdtemp() |
|
155 |
|
|
156 |
def tearDown(self): |
|
157 |
shutil.rmtree(self.tmpdir) |
|
158 |
|
|
159 |
def testVerifyCertificate(self): |
|
160 |
cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem")) |
|
161 |
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, |
|
162 |
cert_pem) |
|
163 |
|
|
164 |
# Not checking return value as this certificate is expired |
|
165 |
utils.VerifyX509Certificate(cert, 30, 7) |
|
166 |
|
|
167 |
|
|
168 |
class TestVerifyCertificateInner(unittest.TestCase): |
|
169 |
def test(self): |
|
170 |
vci = utils.x509._VerifyCertificateInner |
|
171 |
|
|
172 |
# Valid |
|
173 |
self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7), |
|
174 |
(None, None)) |
|
175 |
|
|
176 |
# Not yet valid |
|
177 |
(errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7) |
|
178 |
self.assertEqual(errcode, utils.CERT_WARNING) |
|
179 |
|
|
180 |
# Expiring soon |
|
181 |
(errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7) |
|
182 |
self.assertEqual(errcode, utils.CERT_ERROR) |
|
183 |
|
|
184 |
(errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1) |
|
185 |
self.assertEqual(errcode, utils.CERT_WARNING) |
|
186 |
|
|
187 |
(errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7) |
|
188 |
self.assertEqual(errcode, None) |
|
189 |
|
|
190 |
# Expired |
|
191 |
(errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7) |
|
192 |
self.assertEqual(errcode, utils.CERT_ERROR) |
|
193 |
|
|
194 |
(errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7) |
|
195 |
self.assertEqual(errcode, utils.CERT_ERROR) |
|
196 |
|
|
197 |
(errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7) |
|
198 |
self.assertEqual(errcode, utils.CERT_ERROR) |
|
199 |
|
|
200 |
(errcode, msg) = vci(True, None, None, 1266939600, 30, 7) |
|
201 |
self.assertEqual(errcode, utils.CERT_ERROR) |
|
202 |
|
|
203 |
|
|
204 |
class TestGenerateSelfSignedX509Cert(unittest.TestCase): |
|
205 |
def setUp(self): |
|
206 |
self.tmpdir = tempfile.mkdtemp() |
|
207 |
|
|
208 |
def tearDown(self): |
|
209 |
shutil.rmtree(self.tmpdir) |
|
210 |
|
|
211 |
def _checkRsaPrivateKey(self, key): |
|
212 |
lines = key.splitlines() |
|
213 |
return ("-----BEGIN RSA PRIVATE KEY-----" in lines and |
|
214 |
"-----END RSA PRIVATE KEY-----" in lines) |
|
215 |
|
|
216 |
def _checkCertificate(self, cert): |
|
217 |
lines = cert.splitlines() |
|
218 |
return ("-----BEGIN CERTIFICATE-----" in lines and |
|
219 |
"-----END CERTIFICATE-----" in lines) |
|
220 |
|
|
221 |
def test(self): |
|
222 |
for common_name in [None, ".", "Ganeti", "node1.example.com"]: |
|
223 |
(key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300) |
|
224 |
self._checkRsaPrivateKey(key_pem) |
|
225 |
self._checkCertificate(cert_pem) |
|
226 |
|
|
227 |
key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, |
|
228 |
key_pem) |
|
229 |
self.assert_(key.bits() >= 1024) |
|
230 |
self.assertEqual(key.bits(), constants.RSA_KEY_BITS) |
|
231 |
self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA) |
|
232 |
|
|
233 |
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, |
|
234 |
cert_pem) |
|
235 |
self.failIf(x509.has_expired()) |
|
236 |
self.assertEqual(x509.get_issuer().CN, common_name) |
|
237 |
self.assertEqual(x509.get_subject().CN, common_name) |
|
238 |
self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS) |
|
239 |
|
|
240 |
def testLegacy(self): |
|
241 |
cert1_filename = os.path.join(self.tmpdir, "cert1.pem") |
|
242 |
|
|
243 |
utils.GenerateSelfSignedSslCert(cert1_filename, validity=1) |
|
244 |
|
|
245 |
cert1 = utils.ReadFile(cert1_filename) |
|
246 |
|
|
247 |
self.assert_(self._checkRsaPrivateKey(cert1)) |
|
248 |
self.assert_(self._checkCertificate(cert1)) |
|
249 |
|
|
250 |
|
|
251 |
if __name__ == "__main__": |
|
252 |
testutils.GanetiTestProgram() |
b/test/ganeti.utils_unittest.py | ||
---|---|---|
21 | 21 |
|
22 | 22 |
"""Script for unittesting the utils module""" |
23 | 23 |
|
24 |
import distutils.version |
|
25 | 24 |
import errno |
26 | 25 |
import fcntl |
27 | 26 |
import glob |
... | ... | |
32 | 31 |
import signal |
33 | 32 |
import socket |
34 | 33 |
import stat |
35 |
import string |
|
36 | 34 |
import tempfile |
37 | 35 |
import time |
38 | 36 |
import unittest |
39 | 37 |
import warnings |
40 |
import OpenSSL |
|
41 | 38 |
import random |
42 | 39 |
import operator |
43 | 40 |
|
... | ... | |
837 | 834 |
utils.RunInSeparateProcess, _exc) |
838 | 835 |
|
839 | 836 |
|
840 |
class TestGenerateSelfSignedX509Cert(unittest.TestCase): |
|
841 |
def setUp(self): |
|
842 |
self.tmpdir = tempfile.mkdtemp() |
|
843 |
|
|
844 |
def tearDown(self): |
|
845 |
shutil.rmtree(self.tmpdir) |
|
846 |
|
|
847 |
def _checkRsaPrivateKey(self, key): |
|
848 |
lines = key.splitlines() |
|
849 |
return ("-----BEGIN RSA PRIVATE KEY-----" in lines and |
|
850 |
"-----END RSA PRIVATE KEY-----" in lines) |
|
851 |
|
|
852 |
def _checkCertificate(self, cert): |
|
853 |
lines = cert.splitlines() |
|
854 |
return ("-----BEGIN CERTIFICATE-----" in lines and |
|
855 |
"-----END CERTIFICATE-----" in lines) |
|
856 |
|
|
857 |
def test(self): |
|
858 |
for common_name in [None, ".", "Ganeti", "node1.example.com"]: |
|
859 |
(key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300) |
|
860 |
self._checkRsaPrivateKey(key_pem) |
|
861 |
self._checkCertificate(cert_pem) |
|
862 |
|
|
863 |
key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, |
|
864 |
key_pem) |
|
865 |
self.assert_(key.bits() >= 1024) |
|
866 |
self.assertEqual(key.bits(), constants.RSA_KEY_BITS) |
|
867 |
self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA) |
|
868 |
|
|
869 |
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, |
|
870 |
cert_pem) |
|
871 |
self.failIf(x509.has_expired()) |
|
872 |
self.assertEqual(x509.get_issuer().CN, common_name) |
|
873 |
self.assertEqual(x509.get_subject().CN, common_name) |
|
874 |
self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS) |
|
875 |
|
|
876 |
def testLegacy(self): |
|
877 |
cert1_filename = os.path.join(self.tmpdir, "cert1.pem") |
|
878 |
|
|
879 |
utils.GenerateSelfSignedSslCert(cert1_filename, validity=1) |
|
880 |
|
|
881 |
cert1 = utils.ReadFile(cert1_filename) |
|
882 |
|
|
883 |
self.assert_(self._checkRsaPrivateKey(cert1)) |
|
884 |
self.assert_(self._checkCertificate(cert1)) |
|
885 |
|
|
886 |
|
|
887 | 837 |
class TestValidateServiceName(unittest.TestCase): |
888 | 838 |
def testValid(self): |
889 | 839 |
testnames = [ |
... | ... | |
910 | 860 |
self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name) |
911 | 861 |
|
912 | 862 |
|
913 |
class TestParseAsn1Generalizedtime(unittest.TestCase): |
|
914 |
def test(self): |
|
915 |
# UTC |
|
916 |
self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0) |
|
917 |
self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"), |
|
918 |
1266860512) |
|
919 |
self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"), |
|
920 |
(2**31) - 1) |
|
921 |
|
|
922 |
# With offset |
|
923 |
self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"), |
|
924 |
1266860512) |
|
925 |
self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"), |
|
926 |
1266931012) |
|
927 |
self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"), |
|
928 |
1266931088) |
|
929 |
self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"), |
|
930 |
1266931295) |
|
931 |
self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"), |
|
932 |
3600) |
|
933 |
|
|
934 |
# Leap seconds are not supported by datetime.datetime |
|
935 |
self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, |
|
936 |
"19841231235960+0000") |
|
937 |
self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, |
|
938 |
"19920630235960+0000") |
|
939 |
|
|
940 |
# Errors |
|
941 |
self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "") |
|
942 |
self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid") |
|
943 |
self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, |
|
944 |
"20100222174152") |
|
945 |
self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, |
|
946 |
"Mon Feb 22 17:47:02 UTC 2010") |
|
947 |
self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, |
|
948 |
"2010-02-22 17:42:02") |
|
949 |
|
|
950 |
|
|
951 |
class TestGetX509CertValidity(testutils.GanetiTestCase): |
|
952 |
def setUp(self): |
|
953 |
testutils.GanetiTestCase.setUp(self) |
|
954 |
|
|
955 |
pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__) |
|
956 |
|
|
957 |
# Test whether we have pyOpenSSL 0.7 or above |
|
958 |
self.pyopenssl0_7 = (pyopenssl_version >= "0.7") |
|
959 |
|
|
960 |
if not self.pyopenssl0_7: |
|
961 |
warnings.warn("This test requires pyOpenSSL 0.7 or above to" |
|
962 |
" function correctly") |
|
963 |
|
|
964 |
def _LoadCert(self, name): |
|
965 |
return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, |
|
966 |
self._ReadTestData(name)) |
|
967 |
|
|
968 |
def test(self): |
|
969 |
validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem")) |
|
970 |
if self.pyopenssl0_7: |
|
971 |
self.assertEqual(validity, (1266919967, 1267524767)) |
|
972 |
else: |
|
973 |
self.assertEqual(validity, (None, None)) |
|
974 |
|
|
975 |
|
|
976 |
class TestSignX509Certificate(unittest.TestCase): |
|
977 |
KEY = "My private key!" |
|
978 |
KEY_OTHER = "Another key" |
|
979 |
|
|
980 |
def test(self): |
|
981 |
# Generate certificate valid for 5 minutes |
|
982 |
(_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300) |
|
983 |
|
|
984 |
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, |
|
985 |
cert_pem) |
|
986 |
|
|
987 |
# No signature at all |
|
988 |
self.assertRaises(errors.GenericError, |
|
989 |
utils.LoadSignedX509Certificate, cert_pem, self.KEY) |
|
990 |
|
|
991 |
# Invalid input |
|
992 |
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, |
|
993 |
"", self.KEY) |
|
994 |
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, |
|
995 |
"X-Ganeti-Signature: \n", self.KEY) |
|
996 |
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, |
|
997 |
"X-Ganeti-Sign: $1234$abcdef\n", self.KEY) |
|
998 |
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, |
|
999 |
"X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY) |
|
1000 |
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, |
|
1001 |
"X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY) |
|
1002 |
|
|
1003 |
# Invalid salt |
|
1004 |
for salt in list("-_@$,:;/\\ \t\n"): |
|
1005 |
self.assertRaises(errors.GenericError, utils.SignX509Certificate, |
|
1006 |
cert_pem, self.KEY, "foo%sbar" % salt) |
|
1007 |
|
|
1008 |
for salt in ["HelloWorld", "salt", string.letters, string.digits, |
|
1009 |
utils.GenerateSecret(numbytes=4), |
|
1010 |
utils.GenerateSecret(numbytes=16), |
|
1011 |
"{123:456}".encode("hex")]: |
|
1012 |
signed_pem = utils.SignX509Certificate(cert, self.KEY, salt) |
|
1013 |
|
|
1014 |
self._Check(cert, salt, signed_pem) |
|
1015 |
|
|
1016 |
self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem) |
|
1017 |
self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem) |
|
1018 |
self._Check(cert, salt, (signed_pem + "\n\na few more\n" |
|
1019 |
"lines----\n------ at\nthe end!")) |
|
1020 |
|
|
1021 |
def _Check(self, cert, salt, pem): |
|
1022 |
(cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY) |
|
1023 |
self.assertEqual(salt, salt2) |
|
1024 |
self.assertEqual(cert.digest("sha1"), cert2.digest("sha1")) |
|
1025 |
|
|
1026 |
# Other key |
|
1027 |
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, |
|
1028 |
pem, self.KEY_OTHER) |
|
1029 |
|
|
1030 |
|
|
1031 | 863 |
class TestReadLockedPidFile(unittest.TestCase): |
1032 | 864 |
def setUp(self): |
1033 | 865 |
self.tmpdir = tempfile.mkdtemp() |
... | ... | |
1065 | 897 |
self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path) |
1066 | 898 |
|
1067 | 899 |
|
1068 |
class TestCertVerification(testutils.GanetiTestCase): |
|
1069 |
def setUp(self): |
|
1070 |
testutils.GanetiTestCase.setUp(self) |
|
1071 |
|
|
1072 |
self.tmpdir = tempfile.mkdtemp() |
|
1073 |
|
|
1074 |
def tearDown(self): |
|
1075 |
shutil.rmtree(self.tmpdir) |
|
1076 |
|
|
1077 |
def testVerifyCertificate(self): |
|
1078 |
cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem")) |
|
1079 |
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, |
|
1080 |
cert_pem) |
|
1081 |
|
|
1082 |
# Not checking return value as this certificate is expired |
|
1083 |
utils.VerifyX509Certificate(cert, 30, 7) |
|
1084 |
|
|
1085 |
|
|
1086 |
class TestVerifyCertificateInner(unittest.TestCase): |
|
1087 |
def test(self): |
|
1088 |
vci = utils._VerifyCertificateInner |
|
1089 |
|
|
1090 |
# Valid |
|
1091 |
self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7), |
|
1092 |
(None, None)) |
|
1093 |
|
|
1094 |
# Not yet valid |
|
1095 |
(errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7) |
|
1096 |
self.assertEqual(errcode, utils.CERT_WARNING) |
|
1097 |
|
|
1098 |
# Expiring soon |
|
1099 |
(errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7) |
|
1100 |
self.assertEqual(errcode, utils.CERT_ERROR) |
|
1101 |
|
|
1102 |
(errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1) |
|
1103 |
self.assertEqual(errcode, utils.CERT_WARNING) |
|
1104 |
|
|
1105 |
(errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7) |
|
1106 |
self.assertEqual(errcode, None) |
|
1107 |
|
|
1108 |
# Expired |
|
1109 |
(errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7) |
|
1110 |
self.assertEqual(errcode, utils.CERT_ERROR) |
|
1111 |
|
|
1112 |
(errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7) |
|
1113 |
self.assertEqual(errcode, utils.CERT_ERROR) |
|
1114 |
|
|
1115 |
(errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7) |
|
1116 |
self.assertEqual(errcode, utils.CERT_ERROR) |
|
1117 |
|
|
1118 |
(errcode, msg) = vci(True, None, None, 1266939600, 30, 7) |
|
1119 |
self.assertEqual(errcode, utils.CERT_ERROR) |
|
1120 |
|
|
1121 |
|
|
1122 | 900 |
class TestFindMatch(unittest.TestCase): |
1123 | 901 |
def test(self): |
1124 | 902 |
data = { |
Also available in: Unified diff