import optparse
import sys
import logging
-import errno
import OpenSSL
from ganeti import cli
return opts
-def _VerifyCertificate(cert, _noded_cert_file=pathutils.NODED_CERT_FILE):
+def _VerifyCertificate(cert_pem, _check_fn=utils.CheckNodeCertificate):
"""Verifies a certificate against the local node daemon certificate.
- @type cert: string
- @param cert: Certificate in PEM format (no key)
+ @type cert_pem: string
+ @param cert_pem: Certificate in PEM format (no key)
"""
try:
- OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, cert)
+ OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, cert_pem)
except OpenSSL.crypto.Error, err:
pass
else:
raise JoinError("No private key may be given")
try:
- cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
+ cert = \
+ OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_pem)
except Exception, err:
raise errors.X509CertError("(stdin)",
"Unable to load certificate: %s" % err)
- try:
- noded_pem = utils.ReadFile(_noded_cert_file)
- except EnvironmentError, err:
- if err.errno != errno.ENOENT:
- raise
-
- logging.debug("Local node certificate was not found (file %s)",
- _noded_cert_file)
- return
-
- try:
- key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, noded_pem)
- except Exception, err:
- raise errors.X509CertError(_noded_cert_file,
- "Unable to load private key: %s" % err)
-
- ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_METHOD)
- ctx.use_privatekey(key)
- ctx.use_certificate(cert)
- try:
- ctx.check_privatekey()
- except OpenSSL.SSL.Error:
- raise JoinError("Given cluster certificate does not match local key")
+ _check_fn(cert)
def VerifyCertificate(data, _verify_fn=_VerifyCertificate):
_verify_fn(cert)
-def _VerifyClusterName(name, _ss_cluster_name_file=None):
- """Verifies cluster name against a local cluster name.
-
- @type name: string
- @param name: Cluster name
-
- """
- if _ss_cluster_name_file is None:
- _ss_cluster_name_file = \
- ssconf.SimpleStore().KeyToFilename(constants.SS_CLUSTER_NAME)
-
- try:
- local_name = utils.ReadOneLineFile(_ss_cluster_name_file)
- except EnvironmentError, err:
- if err.errno != errno.ENOENT:
- raise
-
- logging.debug("Local cluster name was not found (file %s)",
- _ss_cluster_name_file)
- else:
- if name != local_name:
- raise JoinError("Current cluster name is '%s'" % local_name)
-
-
-def VerifyClusterName(data, _verify_fn=_VerifyClusterName):
+def VerifyClusterName(data, _verify_fn=ssconf.VerifyClusterName):
"""Verifies cluster name.
@type data: dict