Move hash functions to the compat module
[ganeti-local] / lib / utils.py
index 48b427a..f2f00b5 100644 (file)
@@ -49,16 +49,19 @@ import datetime
 import calendar
 import hmac
 import collections
+import struct
+import IN
 
 from cStringIO import StringIO
 
 try:
-  from hashlib import sha1
+  import ctypes
 except ImportError:
-  import sha as sha1
+  ctypes = None
 
 from ganeti import errors
 from ganeti import constants
+from ganeti import compat
 
 
 _locksheld = []
@@ -78,6 +81,26 @@ X509_SIGNATURE = re.compile(r"^%s:\s*(?P<salt>%s+)/(?P<sign>%s+)$" %
                              HEX_CHAR_RE, HEX_CHAR_RE),
                             re.S | re.I)
 
+# Structure definition for getsockopt(SOL_SOCKET, SO_PEERCRED, ...):
+# struct ucred { pid_t pid; uid_t uid; gid_t gid; };
+#
+# The GNU C Library defines gid_t and uid_t to be "unsigned int" and
+# pid_t to "int".
+#
+# IEEE Std 1003.1-2008:
+# "nlink_t, uid_t, gid_t, and id_t shall be integer types"
+# "blksize_t, pid_t, and ssize_t shall be signed integer types"
+_STRUCT_UCRED = "iII"
+_STRUCT_UCRED_SIZE = struct.calcsize(_STRUCT_UCRED)
+
+# Certificate verification results
+(CERT_WARNING,
+ CERT_ERROR) = range(1, 3)
+
+# Flags for mlockall() (from bits/mman.h)
+_MCL_CURRENT = 1
+_MCL_FUTURE = 2
+
 
 class RunResult(object):
   """Holds the result of running external programs.
@@ -543,7 +566,7 @@ def RetryOnSignal(fn, *args, **kwargs):
   while True:
     try:
       return fn(*args, **kwargs)
-    except EnvironmentError, err:
+    except (EnvironmentError, socket.error), err:
       if err.errno != errno.EINTR:
         raise
     except select.error, err:
@@ -588,6 +611,19 @@ def RunParts(dir_name, env=None, reset_env=False):
   return rr
 
 
+def GetSocketCredentials(sock):
+  """Returns the credentials of the foreign process connected to a socket.
+
+  @param sock: Unix socket
+  @rtype: tuple; (number, number, number)
+  @return: The PID, UID and GID of the connected foreign process.
+
+  """
+  peercred = sock.getsockopt(socket.SOL_SOCKET, IN.SO_PEERCRED,
+                             _STRUCT_UCRED_SIZE)
+  return struct.unpack(_STRUCT_UCRED, peercred)
+
+
 def RemoveFile(filename):
   """Remove a file ignoring some errors.
 
@@ -605,6 +641,24 @@ def RemoveFile(filename):
       raise
 
 
+def RemoveDir(dirname):
+  """Remove an empty directory.
+
+  Remove a directory, ignoring non-existing ones.
+  Other errors are passed. This includes the case,
+  where the directory is not empty, so it can't be removed.
+
+  @type dirname: str
+  @param dirname: the empty directory to be removed
+
+  """
+  try:
+    os.rmdir(dirname)
+  except OSError, err:
+    if err.errno != errno.ENOENT:
+      raise
+
+
 def RenameFile(old, new, mkdir=False, mkdir_mode=0750):
   """Renames a file.
 
@@ -690,10 +744,7 @@ def _FingerprintFile(filename):
 
   f = open(filename)
 
-  if callable(sha1):
-    fp = sha1()
-  else:
-    fp = sha1.new()
+  fp = compat.sha1_hash()
   while True:
     data = f.read(4096)
     if not data:
@@ -802,16 +853,28 @@ def IsProcessAlive(pid):
   @return: True if the process exists
 
   """
+  def _TryStat(name):
+    try:
+      os.stat(name)
+      return True
+    except EnvironmentError, err:
+      if err.errno in (errno.ENOENT, errno.ENOTDIR):
+        return False
+      elif err.errno == errno.EINVAL:
+        raise RetryAgain(err)
+      raise
+
+  assert isinstance(pid, int), "pid must be an integer"
   if pid <= 0:
     return False
 
+  proc_entry = "/proc/%d/status" % pid
+  # /proc in a multiprocessor environment can have strange behaviors.
+  # Retry the os.stat a few times until we get a good result.
   try:
-    os.stat("/proc/%d/status" % pid)
-    return True
-  except EnvironmentError, err:
-    if err.errno in (errno.ENOENT, errno.ENOTDIR):
-      return False
-    raise
+    return Retry(_TryStat, (0.01, 1.5, 0.1), 0.5, args=[proc_entry])
+  except RetryTimeout, err:
+    err.RaiseInner()
 
 
 def ReadPidFile(pidfile):
@@ -825,7 +888,7 @@ def ReadPidFile(pidfile):
 
   """
   try:
-    raw_data = ReadFile(pidfile)
+    raw_data = ReadOneLineFile(pidfile)
   except EnvironmentError, err:
     if err.errno != errno.ENOENT:
       logging.exception("Can't read pid file")
@@ -1753,6 +1816,24 @@ def WriteFile(file_name, fn=None, data=None,
   return result
 
 
+def ReadOneLineFile(file_name, strict=False):
+  """Return the first non-empty line from a file.
+
+  @type strict: boolean
+  @param strict: if True, abort if the file has more than one
+      non-empty line
+
+  """
+  file_lines = ReadFile(file_name).splitlines()
+  full_lines = filter(bool, file_lines)
+  if not file_lines or not full_lines:
+    raise errors.GenericError("No data in one-liner file %s" % file_name)
+  elif strict and len(full_lines) > 1:
+    raise errors.GenericError("Too many lines in one-liner file %s" %
+                              file_name)
+  return full_lines[0]
+
+
 def FirstFree(seq, base=0):
   """Returns the first non-existing integer from seq.
 
@@ -1781,20 +1862,6 @@ def FirstFree(seq, base=0):
   return None
 
 
-def all(seq, pred=bool): # pylint: disable-msg=W0622
-  "Returns True if pred(x) is True for every element in the iterable"
-  for _ in itertools.ifilterfalse(pred, seq):
-    return False
-  return True
-
-
-def any(seq, pred=bool): # pylint: disable-msg=W0622
-  "Returns True if pred(x) is True for at least one element in the iterable"
-  for _ in itertools.ifilter(pred, seq):
-    return True
-  return False
-
-
 def SingleWaitForFdCondition(fdobj, event, timeout):
   """Waits for a condition to occur on the socket.
 
@@ -1886,12 +1953,6 @@ def WaitForFdCondition(fdobj, event, timeout):
   return result
 
 
-def partition(seq, pred=bool): # # pylint: disable-msg=W0622
-  "Partition a list in two, based on the given predicate"
-  return (list(itertools.ifilter(pred, seq)),
-          list(itertools.ifilterfalse(pred, seq)))
-
-
 def UniqueSequence(seq):
   """Returns a list with unique elements.
 
@@ -1996,6 +2057,39 @@ def CloseFDs(noclose_fds=None):
     _CloseFDNoErr(fd)
 
 
+def Mlockall():
+  """Lock current process' virtual address space into RAM.
+
+  This is equivalent to the C call mlockall(MCL_CURRENT|MCL_FUTURE),
+  see mlock(2) for more details. This function requires ctypes module.
+
+  """
+  if ctypes is None:
+    logging.warning("Cannot set memory lock, ctypes module not found")
+    return
+
+  libc = ctypes.cdll.LoadLibrary("libc.so.6")
+  if libc is None:
+    logging.error("Cannot set memory lock, ctypes cannot load libc")
+    return
+
+  # Some older version of the ctypes module don't have built-in functionality
+  # to access the errno global variable, where function error codes are stored.
+  # By declaring this variable as a pointer to an integer we can then access
+  # its value correctly, should the mlockall call fail, in order to see what
+  # the actual error code was.
+  # pylint: disable-msg=W0212
+  libc.__errno_location.restype = ctypes.POINTER(ctypes.c_int)
+
+  if libc.mlockall(_MCL_CURRENT | _MCL_FUTURE):
+    # pylint: disable-msg=W0212
+    logging.error("Cannot set memory lock: %s",
+                  os.strerror(libc.__errno_location().contents.value))
+    return
+
+  logging.debug("Memory lock set")
+
+
 def Daemonize(logfile):
   """Daemonize the current process.
 
@@ -2284,8 +2378,43 @@ def GetDaemonPort(daemon_name):
   return port
 
 
+class LogFileHandler(logging.FileHandler):
+  """Log handler that doesn't fallback to stderr.
+
+  When an error occurs while writing on the logfile, logging.FileHandler tries
+  to log on stderr. This doesn't work in ganeti since stderr is redirected to
+  the logfile. This class avoids failures reporting errors to /dev/console.
+
+  """
+  def __init__(self, filename, mode="a", encoding=None):
+    """Open the specified file and use it as the stream for logging.
+
+    Also open /dev/console to report errors while logging.
+
+    """
+    logging.FileHandler.__init__(self, filename, mode, encoding)
+    self.console = open(constants.DEV_CONSOLE, "a")
+
+  def handleError(self, record): # pylint: disable-msg=C0103
+    """Handle errors which occur during an emit() call.
+
+    Try to handle errors with FileHandler method, if it fails write to
+    /dev/console.
+
+    """
+    try:
+      logging.FileHandler.handleError(self, record)
+    except Exception: # pylint: disable-msg=W0703
+      try:
+        self.console.write("Cannot log message:\n%s\n" % self.format(record))
+      except Exception: # pylint: disable-msg=W0703
+        # Log handler tried everything it could, now just give up
+        pass
+
+
 def SetupLogging(logfile, debug=0, stderr_logging=False, program="",
-                 multithreaded=False, syslog=constants.SYSLOG_USAGE):
+                 multithreaded=False, syslog=constants.SYSLOG_USAGE,
+                 console_logging=False):
   """Configures the logging module.
 
   @type logfile: str
@@ -2304,6 +2433,9 @@ def SetupLogging(logfile, debug=0, stderr_logging=False, program="",
       - if no, syslog is not used
       - if yes, syslog is used (in addition to file-logging)
       - if only, only syslog is used
+  @type console_logging: boolean
+  @param console_logging: if True, will use a FileHandler which falls back to
+      the system console if logging fails
   @raise EnvironmentError: if we can't open the log file and
       syslog/stderr logging is disabled
 
@@ -2355,7 +2487,10 @@ def SetupLogging(logfile, debug=0, stderr_logging=False, program="",
     # the error if stderr_logging is True, and if false we re-raise the
     # exception since otherwise we could run but without any logs at all
     try:
-      logfile_handler = logging.FileHandler(logfile)
+      if console_logging:
+        logfile_handler = LogFileHandler(logfile)
+      else:
+        logfile_handler = logging.FileHandler(logfile)
       logfile_handler.setFormatter(formatter)
       if debug:
         logfile_handler.setLevel(logging.DEBUG)
@@ -2435,6 +2570,13 @@ def TailFile(fname, lines=20):
   return rows[-lines:]
 
 
+def FormatTimestampWithTZ(secs):
+  """Formats a Unix timestamp with the local timezone.
+
+  """
+  return time.strftime("%F %T %Z", time.gmtime(secs))
+
+
 def _ParseAsn1Generalizedtime(value):
   """Parses an ASN1 GENERALIZEDTIME timestamp as used by pyOpenSSL.
 
@@ -2498,6 +2640,75 @@ def GetX509CertValidity(cert):
   return (not_before, not_after)
 
 
+def _VerifyCertificateInner(expired, not_before, not_after, now,
+                            warn_days, error_days):
+  """Verifies certificate validity.
+
+  @type expired: bool
+  @param expired: Whether pyOpenSSL considers the certificate as expired
+  @type not_before: number or None
+  @param not_before: Unix timestamp before which certificate is not valid
+  @type not_after: number or None
+  @param not_after: Unix timestamp after which certificate is invalid
+  @type now: number
+  @param now: Current time as Unix timestamp
+  @type warn_days: number or None
+  @param warn_days: How many days before expiration a warning should be reported
+  @type error_days: number or None
+  @param error_days: How many days before expiration an error should be reported
+
+  """
+  if expired:
+    msg = "Certificate is expired"
+
+    if not_before is not None and not_after is not None:
+      msg += (" (valid from %s to %s)" %
+              (FormatTimestampWithTZ(not_before),
+               FormatTimestampWithTZ(not_after)))
+    elif not_before is not None:
+      msg += " (valid from %s)" % FormatTimestampWithTZ(not_before)
+    elif not_after is not None:
+      msg += " (valid until %s)" % FormatTimestampWithTZ(not_after)
+
+    return (CERT_ERROR, msg)
+
+  elif not_before is not None and not_before > now:
+    return (CERT_WARNING,
+            "Certificate not yet valid (valid from %s)" %
+            FormatTimestampWithTZ(not_before))
+
+  elif not_after is not None:
+    remaining_days = int((not_after - now) / (24 * 3600))
+
+    msg = "Certificate expires in about %d days" % remaining_days
+
+    if error_days is not None and remaining_days <= error_days:
+      return (CERT_ERROR, msg)
+
+    if warn_days is not None and remaining_days <= warn_days:
+      return (CERT_WARNING, msg)
+
+  return (None, None)
+
+
+def VerifyX509Certificate(cert, warn_days, error_days):
+  """Verifies a certificate for LUVerifyCluster.
+
+  @type cert: OpenSSL.crypto.X509
+  @param cert: X509 certificate object
+  @type warn_days: number or None
+  @param warn_days: How many days before expiration a warning should be reported
+  @type error_days: number or None
+  @param error_days: How many days before expiration an error should be reported
+
+  """
+  # Depending on the pyOpenSSL version, this can just return (None, None)
+  (not_before, not_after) = GetX509CertValidity(cert)
+
+  return _VerifyCertificateInner(cert.has_expired(), not_before, not_after,
+                                 time.time(), warn_days, error_days)
+
+
 def SignX509Certificate(cert, key, salt):
   """Sign a X509 certificate.
 
@@ -2521,7 +2732,7 @@ def SignX509Certificate(cert, key, salt):
 
   return ("%s: %s/%s\n\n%s" %
           (constants.X509_CERT_SIGNATURE_HEADER, salt,
-           hmac.new(key, salt + cert_pem, sha1).hexdigest(),
+           Sha1Hmac(key, cert_pem, salt=salt),
            cert_pem))
 
 
@@ -2560,12 +2771,47 @@ def LoadSignedX509Certificate(cert_pem, key):
   # Dump again to ensure it's in a sane format
   sane_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
 
-  if signature != hmac.new(key, salt + sane_pem, sha1).hexdigest():
+  if not VerifySha1Hmac(key, sane_pem, signature, salt=salt):
     raise errors.GenericError("X509 certificate signature is invalid")
 
   return (cert, salt)
 
 
+def Sha1Hmac(key, text, salt=None):
+  """Calculates the HMAC-SHA1 digest of a text.
+
+  HMAC is defined in RFC2104.
+
+  @type key: string
+  @param key: Secret key
+  @type text: string
+
+  """
+  if salt:
+    salted_text = salt + text
+  else:
+    salted_text = text
+
+  return hmac.new(key, salted_text, compat.sha1).hexdigest()
+
+
+def VerifySha1Hmac(key, text, digest, salt=None):
+  """Verifies the HMAC-SHA1 digest of a text.
+
+  HMAC is defined in RFC2104.
+
+  @type key: string
+  @param key: Secret key
+  @type text: string
+  @type digest: string
+  @param digest: Expected digest
+  @rtype: bool
+  @return: Whether HMAC-SHA1 digest matches
+
+  """
+  return digest.lower() == Sha1Hmac(key, text, salt=salt).lower()
+
+
 def SafeEncode(text):
   """Return a 'safe' version of a source string.
 
@@ -2749,6 +2995,20 @@ def RunInSeparateProcess(fn, *args):
   return bool(exitcode)
 
 
+def IgnoreSignals(fn, *args, **kwargs):
+  """Tries to call a function ignoring failures due to EINTR.
+
+  """
+  try:
+    return fn(*args, **kwargs)
+  except (EnvironmentError, socket.error), err:
+    if err.errno != errno.EINTR:
+      raise
+  except select.error, err:
+    if not (err.args and err.args[0] == errno.EINTR):
+      raise
+
+
 def LockedMethod(fn):
   """Synchronized object access decorator.
 
@@ -2853,12 +3113,25 @@ def ReadWatcherPauseFile(filename, now=None, remove_after=3600):
 class RetryTimeout(Exception):
   """Retry loop timed out.
 
+  Any arguments which was passed by the retried function to RetryAgain will be
+  preserved in RetryTimeout, if it is raised. If such argument was an exception
+  the RaiseInner helper method will reraise it.
+
   """
+  def RaiseInner(self):
+    if self.args and isinstance(self.args[0], Exception):
+      raise self.args[0]
+    else:
+      raise RetryTimeout(*self.args)
 
 
 class RetryAgain(Exception):
   """Retry again.
 
+  Any arguments passed to RetryAgain will be preserved, if a timeout occurs, as
+  arguments to RetryTimeout. If an exception is passed, the RaiseInner() method
+  of the RetryTimeout() method can be used to reraise it.
+
   """
 
 
@@ -2967,11 +3240,12 @@ def Retry(fn, delay, timeout, args=None, wait_fn=time.sleep,
   assert calc_delay is None or callable(calc_delay)
 
   while True:
+    retry_args = []
     try:
       # pylint: disable-msg=W0142
       return fn(*args)
-    except RetryAgain:
-      pass
+    except RetryAgain, err:
+      retry_args = err.args
     except RetryTimeout:
       raise errors.ProgrammerError("Nested retry loop detected that didn't"
                                    " handle RetryTimeout")
@@ -2979,7 +3253,8 @@ def Retry(fn, delay, timeout, args=None, wait_fn=time.sleep,
     remaining_time = end_time - _time_fn()
 
     if remaining_time < 0.0:
-      raise RetryTimeout()
+      # pylint: disable-msg=W0142
+      raise RetryTimeout(*retry_args)
 
     assert remaining_time >= 0.0
 
@@ -3251,6 +3526,58 @@ def SignalHandled(signums):
   return wrap
 
 
+class SignalWakeupFd(object):
+  try:
+    # This is only supported in Python 2.5 and above (some distributions
+    # backported it to Python 2.4)
+    _set_wakeup_fd_fn = signal.set_wakeup_fd
+  except AttributeError:
+    # Not supported
+    def _SetWakeupFd(self, _): # pylint: disable-msg=R0201
+      return -1
+  else:
+    def _SetWakeupFd(self, fd):
+      return self._set_wakeup_fd_fn(fd)
+
+  def __init__(self):
+    """Initializes this class.
+
+    """
+    (read_fd, write_fd) = os.pipe()
+
+    # Once these succeeded, the file descriptors will be closed automatically.
+    # Buffer size 0 is important, otherwise .read() with a specified length
+    # might buffer data and the file descriptors won't be marked readable.
+    self._read_fh = os.fdopen(read_fd, "r", 0)
+    self._write_fh = os.fdopen(write_fd, "w", 0)
+
+    self._previous = self._SetWakeupFd(self._write_fh.fileno())
+
+    # Utility functions
+    self.fileno = self._read_fh.fileno
+    self.read = self._read_fh.read
+
+  def Reset(self):
+    """Restores the previous wakeup file descriptor.
+
+    """
+    if hasattr(self, "_previous") and self._previous is not None:
+      self._SetWakeupFd(self._previous)
+      self._previous = None
+
+  def Notify(self):
+    """Notifies the wakeup file descriptor.
+
+    """
+    self._write_fh.write("\0")
+
+  def __del__(self):
+    """Called before object deletion.
+
+    """
+    self.Reset()
+
+
 class SignalHandler(object):
   """Generic signal handler class.
 
@@ -3265,7 +3592,7 @@ class SignalHandler(object):
   @ivar called: tracks whether any of the signals have been raised
 
   """
-  def __init__(self, signum, handler_fn=None):
+  def __init__(self, signum, handler_fn=None, wakeup=None):
     """Constructs a new SignalHandler instance.
 
     @type signum: int or list of ints
@@ -3280,6 +3607,7 @@ class SignalHandler(object):
     self.called = False
 
     self._handler_fn = handler_fn
+    self._wakeup = wakeup
 
     self._previous = {}
     try:
@@ -3329,6 +3657,10 @@ class SignalHandler(object):
     # solution in Python -- there are no atomic types.
     self.called = True
 
+    if self._wakeup:
+      # Notify whoever is interested in signals
+      self._wakeup.Notify()
+
     if self._handler_fn:
       self._handler_fn(signum, frame)