Fix tiny typo in cluster verify
[ganeti-local] / lib / utils.py
index 0144755..1d0750e 100644 (file)
@@ -43,6 +43,11 @@ import resource
 import logging
 import logging.handlers
 import signal
+import datetime
+import calendar
+import collections
+import struct
+import IN
 
 from cStringIO import StringIO
 
@@ -66,6 +71,18 @@ no_fork = False
 
 _RANDOM_UUID_FILE = "/proc/sys/kernel/random/uuid"
 
+# Structure definition for getsockopt(SOL_SOCKET, SO_PEERCRED, ...):
+# struct ucred { pid_t pid; uid_t uid; gid_t gid; };
+#
+# The GNU C Library defines gid_t and uid_t to be "unsigned int" and
+# pid_t to "int".
+#
+# IEEE Std 1003.1-2008:
+# "nlink_t, uid_t, gid_t, and id_t shall be integer types"
+# "blksize_t, pid_t, and ssize_t shall be signed integer types"
+_STRUCT_UCRED = "iII"
+_STRUCT_UCRED_SIZE = struct.calcsize(_STRUCT_UCRED)
+
 
 class RunResult(object):
   """Holds the result of running external programs.
@@ -325,6 +342,19 @@ def RunParts(dir_name, env=None, reset_env=False):
   return rr
 
 
+def GetSocketCredentials(sock):
+  """Returns the credentials of the foreign process connected to a socket.
+
+  @param sock: Unix socket
+  @rtype: tuple; (number, number, number)
+  @return: The PID, UID and GID of the connected foreign process.
+
+  """
+  peercred = sock.getsockopt(socket.SOL_SOCKET, IN.SO_PEERCRED,
+                             _STRUCT_UCRED_SIZE)
+  return struct.unpack(_STRUCT_UCRED, peercred)
+
+
 def RemoveFile(filename):
   """Remove a file ignoring some errors.
 
@@ -363,20 +393,29 @@ def RenameFile(old, new, mkdir=False, mkdir_mode=0750):
     # as efficient.
     if mkdir and err.errno == errno.ENOENT:
       # Create directory and try again
-      dirname = os.path.dirname(new)
-      try:
-        os.makedirs(dirname, mode=mkdir_mode)
-      except OSError, err:
-        # Ignore EEXIST. This is only handled in os.makedirs as included in
-        # Python 2.5 and above.
-        if err.errno != errno.EEXIST or not os.path.exists(dirname):
-          raise
+      Makedirs(os.path.dirname(new), mode=mkdir_mode)
 
       return os.rename(old, new)
 
     raise
 
 
+def Makedirs(path, mode=0750):
+  """Super-mkdir; create a leaf directory and all intermediate ones.
+
+  This is a wrapper around C{os.makedirs} adding error handling not implemented
+  before Python 2.5.
+
+  """
+  try:
+    os.makedirs(path, mode)
+  except OSError, err:
+    # Ignore EEXIST. This is only handled in os.makedirs as included in
+    # Python 2.5 and above.
+    if err.errno != errno.EEXIST or not os.path.exists(path):
+      raise
+
+
 def ResetTempfileModule():
   """Resets the random name generator of the tempfile module.
 
@@ -527,16 +566,28 @@ def IsProcessAlive(pid):
   @return: True if the process exists
 
   """
+  def _TryStat(name):
+    try:
+      os.stat(name)
+      return True
+    except EnvironmentError, err:
+      if err.errno in (errno.ENOENT, errno.ENOTDIR):
+        return False
+      elif err.errno == errno.EINVAL:
+        raise RetryAgain(err)
+      raise
+
+  assert isinstance(pid, int), "pid must be an integer"
   if pid <= 0:
     return False
 
+  proc_entry = "/proc/%d/status" % pid
+  # /proc in a multiprocessor environment can have strange behaviors.
+  # Retry the os.stat a few times until we get a good result.
   try:
-    os.stat("/proc/%d/status" % pid)
-    return True
-  except EnvironmentError, err:
-    if err.errno in (errno.ENOENT, errno.ENOTDIR):
-      return False
-    raise
+    return Retry(_TryStat, (0.01, 1.5, 0.1), 0.5, args=[proc_entry])
+  except RetryTimeout, err:
+    err.RaiseInner()
 
 
 def ReadPidFile(pidfile):
@@ -1117,6 +1168,16 @@ def RemoveHostFromEtcHosts(hostname):
   RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
 
 
+def TimestampForFilename():
+  """Returns the current time formatted for filenames.
+
+  The format doesn't contain colons as some shells and applications them as
+  separators.
+
+  """
+  return time.strftime("%Y-%m-%d_%H_%M_%S")
+
+
 def CreateBackup(file_name):
   """Creates a backup of a file.
 
@@ -1131,7 +1192,8 @@ def CreateBackup(file_name):
     raise errors.ProgrammerError("Can't make a backup of a non-file '%s'" %
                                 file_name)
 
-  prefix = '%s.backup-%d.' % (os.path.basename(file_name), int(time.time()))
+  prefix = ("%s.backup-%s." %
+            (os.path.basename(file_name), TimestampForFilename()))
   dir_name = os.path.dirname(file_name)
 
   fsrc = open(file_name, 'rb')
@@ -1139,6 +1201,7 @@ def CreateBackup(file_name):
     (fd, backup_name) = tempfile.mkstemp(prefix=prefix, dir=dir_name)
     fdst = os.fdopen(fd, 'wb')
     try:
+      logging.debug("Backing up %s at %s", file_name, backup_name)
       shutil.copyfileobj(fsrc, fdst)
     finally:
       fdst.close()
@@ -1463,24 +1526,95 @@ def FirstFree(seq, base=0):
   return None
 
 
-def all(seq, pred=bool): # pylint: disable-msg=W0622
-  "Returns True if pred(x) is True for every element in the iterable"
-  for _ in itertools.ifilterfalse(pred, seq):
-    return False
-  return True
+def SingleWaitForFdCondition(fdobj, event, timeout):
+  """Waits for a condition to occur on the socket.
+
+  Immediately returns at the first interruption.
+
+  @type fdobj: integer or object supporting a fileno() method
+  @param fdobj: entity to wait for events on
+  @type event: integer
+  @param event: ORed condition (see select module)
+  @type timeout: float or None
+  @param timeout: Timeout in seconds
+  @rtype: int or None
+  @return: None for timeout, otherwise occured conditions
+
+  """
+  check = (event | select.POLLPRI |
+           select.POLLNVAL | select.POLLHUP | select.POLLERR)
+
+  if timeout is not None:
+    # Poller object expects milliseconds
+    timeout *= 1000
+
+  poller = select.poll()
+  poller.register(fdobj, event)
+  try:
+    # TODO: If the main thread receives a signal and we have no timeout, we
+    # could wait forever. This should check a global "quit" flag or something
+    # every so often.
+    io_events = poller.poll(timeout)
+  except select.error, err:
+    if err[0] != errno.EINTR:
+      raise
+    io_events = []
+  if io_events and io_events[0][1] & check:
+    return io_events[0][1]
+  else:
+    return None
+
+
+class FdConditionWaiterHelper(object):
+  """Retry helper for WaitForFdCondition.
+
+  This class contains the retried and wait functions that make sure
+  WaitForFdCondition can continue waiting until the timeout is actually
+  expired.
+
+  """
+
+  def __init__(self, timeout):
+    self.timeout = timeout
+
+  def Poll(self, fdobj, event):
+    result = SingleWaitForFdCondition(fdobj, event, self.timeout)
+    if result is None:
+      raise RetryAgain()
+    else:
+      return result
 
+  def UpdateTimeout(self, timeout):
+    self.timeout = timeout
 
-def any(seq, pred=bool): # pylint: disable-msg=W0622
-  "Returns True if pred(x) is True for at least one element in the iterable"
-  for _ in itertools.ifilter(pred, seq):
-    return True
-  return False
 
+def WaitForFdCondition(fdobj, event, timeout):
+  """Waits for a condition to occur on the socket.
 
-def partition(seq, pred=bool): # # pylint: disable-msg=W0622
-  "Partition a list in two, based on the given predicate"
-  return (list(itertools.ifilter(pred, seq)),
-          list(itertools.ifilterfalse(pred, seq)))
+  Retries until the timeout is expired, even if interrupted.
+
+  @type fdobj: integer or object supporting a fileno() method
+  @param fdobj: entity to wait for events on
+  @type event: integer
+  @param event: ORed condition (see select module)
+  @type timeout: float or None
+  @param timeout: Timeout in seconds
+  @rtype: int or None
+  @return: None for timeout, otherwise occured conditions
+
+  """
+  if timeout is not None:
+    retrywaiter = FdConditionWaiterHelper(timeout)
+    try:
+      result = Retry(retrywaiter.Poll, RETRY_REMAINING_TIME, timeout,
+                     args=(fdobj, event), wait_fn=retrywaiter.UpdateTimeout)
+    except RetryTimeout:
+      result = None
+  else:
+    result = None
+    while result is None:
+      result = SingleWaitForFdCondition(fdobj, event, timeout)
+  return result
 
 
 def UniqueSequence(seq):
@@ -2026,6 +2160,69 @@ def TailFile(fname, lines=20):
   return rows[-lines:]
 
 
+def _ParseAsn1Generalizedtime(value):
+  """Parses an ASN1 GENERALIZEDTIME timestamp as used by pyOpenSSL.
+
+  @type value: string
+  @param value: ASN1 GENERALIZEDTIME timestamp
+
+  """
+  m = re.match(r"^(\d+)([-+]\d\d)(\d\d)$", value)
+  if m:
+    # We have an offset
+    asn1time = m.group(1)
+    hours = int(m.group(2))
+    minutes = int(m.group(3))
+    utcoffset = (60 * hours) + minutes
+  else:
+    if not value.endswith("Z"):
+      raise ValueError("Missing timezone")
+    asn1time = value[:-1]
+    utcoffset = 0
+
+  parsed = time.strptime(asn1time, "%Y%m%d%H%M%S")
+
+  tt = datetime.datetime(*(parsed[:7])) - datetime.timedelta(minutes=utcoffset)
+
+  return calendar.timegm(tt.utctimetuple())
+
+
+def GetX509CertValidity(cert):
+  """Returns the validity period of the certificate.
+
+  @type cert: OpenSSL.crypto.X509
+  @param cert: X509 certificate object
+
+  """
+  # The get_notBefore and get_notAfter functions are only supported in
+  # pyOpenSSL 0.7 and above.
+  try:
+    get_notbefore_fn = cert.get_notBefore
+  except AttributeError:
+    not_before = None
+  else:
+    not_before_asn1 = get_notbefore_fn()
+
+    if not_before_asn1 is None:
+      not_before = None
+    else:
+      not_before = _ParseAsn1Generalizedtime(not_before_asn1)
+
+  try:
+    get_notafter_fn = cert.get_notAfter
+  except AttributeError:
+    not_after = None
+  else:
+    not_after_asn1 = get_notafter_fn()
+
+    if not_after_asn1 is None:
+      not_after = None
+    else:
+      not_after = _ParseAsn1Generalizedtime(not_after_asn1)
+
+  return (not_before, not_after)
+
+
 def SafeEncode(text):
   """Return a 'safe' version of a source string.
 
@@ -2313,12 +2510,25 @@ def ReadWatcherPauseFile(filename, now=None, remove_after=3600):
 class RetryTimeout(Exception):
   """Retry loop timed out.
 
+  Any arguments which was passed by the retried function to RetryAgain will be
+  preserved in RetryTimeout, if it is raised. If such argument was an exception
+  the RaiseInner helper method will reraise it.
+
   """
+  def RaiseInner(self):
+    if self.args and isinstance(self.args[0], Exception):
+      raise self.args[0]
+    else:
+      raise RetryTimeout(*self.args)
 
 
 class RetryAgain(Exception):
   """Retry again.
 
+  Any arguments passed to RetryAgain will be preserved, if a timeout occurs, as
+  arguments to RetryTimeout. If an exception is passed, the RaiseInner() method
+  of the RetryTimeout() method can be used to reraise it.
+
   """
 
 
@@ -2427,16 +2637,21 @@ def Retry(fn, delay, timeout, args=None, wait_fn=time.sleep,
   assert calc_delay is None or callable(calc_delay)
 
   while True:
+    retry_args = []
     try:
       # pylint: disable-msg=W0142
       return fn(*args)
-    except RetryAgain:
-      pass
+    except RetryAgain, err:
+      retry_args = err.args
+    except RetryTimeout:
+      raise errors.ProgrammerError("Nested retry loop detected that didn't"
+                                   " handle RetryTimeout")
 
     remaining_time = end_time - _time_fn()
 
     if remaining_time < 0.0:
-      raise RetryTimeout()
+      # pylint: disable-msg=W0142
+      raise RetryTimeout(*retry_args)
 
     assert remaining_time >= 0.0
 
@@ -2581,6 +2796,47 @@ class FileLock(object):
                 "Failed to unlock %s" % self.filename)
 
 
+class LineSplitter:
+  """Splits data chunks into lines separated by newline.
+
+  Instances provide a file-like interface.
+
+  """
+  def __init__(self, line_fn, *args):
+    """Initializes this class.
+
+    @type line_fn: callable
+    @param line_fn: Function called for each line, first parameter is line
+    @param args: Extra arguments for L{line_fn}
+
+    """
+    assert callable(line_fn)
+
+    if args:
+      # Python 2.4 doesn't have functools.partial yet
+      self._line_fn = \
+        lambda line: line_fn(line, *args) # pylint: disable-msg=W0142
+    else:
+      self._line_fn = line_fn
+
+    self._lines = collections.deque()
+    self._buffer = ""
+
+  def write(self, data):
+    parts = (self._buffer + data).split("\n")
+    self._buffer = parts.pop()
+    self._lines.extend(parts)
+
+  def flush(self):
+    while self._lines:
+      self._line_fn(self._lines.popleft().rstrip("\r\n"))
+
+  def close(self):
+    self.flush()
+    if self._buffer:
+      self._line_fn(self._buffer)
+
+
 def SignalHandled(signums):
   """Signal Handled decoration.