KVM: Check instances for actual liveness
[ganeti-local] / lib / utils.py
index 438a120..07bdc0e 100644 (file)
@@ -43,6 +43,8 @@ import resource
 import logging
 import logging.handlers
 import signal
+import datetime
+import calendar
 
 from cStringIO import StringIO
 
@@ -118,13 +120,13 @@ class RunResult(object):
   output = property(_GetOutput, None, None, "Return full output")
 
 
-def RunCmd(cmd, env=None, output=None, cwd='/'):
+def RunCmd(cmd, env=None, output=None, cwd='/', reset_env=False):
   """Execute a (shell) command.
 
   The command should not read from its standard input, as it will be
   closed.
 
-  @type  cmd: string or list
+  @type cmd: string or list
   @param cmd: Command to run
   @type env: dict
   @param env: Additional environment
@@ -135,6 +137,8 @@ def RunCmd(cmd, env=None, output=None, cwd='/'):
   @type cwd: string
   @param cwd: if specified, will be used as the working
       directory for the command; the default will be /
+  @type reset_env: boolean
+  @param reset_env: whether to reset or keep the default os environment
   @rtype: L{RunResult}
   @return: RunResult instance
   @raise errors.ProgrammerError: if we call this when forks are disabled
@@ -152,8 +156,12 @@ def RunCmd(cmd, env=None, output=None, cwd='/'):
     shell = True
   logging.debug("RunCmd '%s'", strcmd)
 
-  cmd_env = os.environ.copy()
-  cmd_env["LC_ALL"] = "C"
+  if not reset_env:
+    cmd_env = os.environ.copy()
+    cmd_env["LC_ALL"] = "C"
+  else:
+    cmd_env = {}
+
   if env is not None:
     cmd_env.update(env)
 
@@ -282,6 +290,43 @@ def _RunCmdFile(cmd, env, via_shell, output, cwd):
   return status
 
 
+def RunParts(dir_name, env=None, reset_env=False):
+  """Run Scripts or programs in a directory
+
+  @type dir_name: string
+  @param dir_name: absolute path to a directory
+  @type env: dict
+  @param env: The environment to use
+  @type reset_env: boolean
+  @param reset_env: whether to reset or keep the default os environment
+  @rtype: list of tuples
+  @return: list of (name, (one of RUNDIR_STATUS), RunResult)
+
+  """
+  rr = []
+
+  try:
+    dir_contents = ListVisibleFiles(dir_name)
+  except OSError, err:
+    logging.warning("RunParts: skipping %s (cannot list: %s)", dir_name, err)
+    return rr
+
+  for relname in sorted(dir_contents):
+    fname = PathJoin(dir_name, relname)
+    if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
+            constants.EXT_PLUGIN_MASK.match(relname) is not None):
+      rr.append((relname, constants.RUNPARTS_SKIP, None))
+    else:
+      try:
+        result = RunCmd([fname], env=env, reset_env=reset_env)
+      except Exception, err: # pylint: disable-msg=W0703
+        rr.append((relname, constants.RUNPARTS_ERR, str(err)))
+      else:
+        rr.append((relname, constants.RUNPARTS_RUN, result))
+
+  return rr
+
+
 def RemoveFile(filename):
   """Remove a file ignoring some errors.
 
@@ -334,6 +379,29 @@ def RenameFile(old, new, mkdir=False, mkdir_mode=0750):
     raise
 
 
+def ResetTempfileModule():
+  """Resets the random name generator of the tempfile module.
+
+  This function should be called after C{os.fork} in the child process to
+  ensure it creates a newly seeded random generator. Otherwise it would
+  generate the same random parts as the parent process. If several processes
+  race for the creation of a temporary file, this could lead to one not getting
+  a temporary name.
+
+  """
+  # pylint: disable-msg=W0212
+  if hasattr(tempfile, "_once_lock") and hasattr(tempfile, "_name_sequence"):
+    tempfile._once_lock.acquire()
+    try:
+      # Reset random name generator
+      tempfile._name_sequence = None
+    finally:
+      tempfile._once_lock.release()
+  else:
+    logging.critical("The tempfile module misses at least one of the"
+                     " '_once_lock' and '_name_sequence' attributes")
+
+
 def _FingerprintFile(filename):
   """Compute the fingerprint of a file.
 
@@ -549,6 +617,8 @@ class HostInfo:
   """Class implementing resolver and hostname functionality
 
   """
+  _VALID_NAME_RE = re.compile("^[a-z0-9._-]{1,255}$")
+
   def __init__(self, name=None):
     """Initialize the host name object.
 
@@ -599,6 +669,27 @@ class HostInfo:
 
     return result
 
+  @classmethod
+  def NormalizeName(cls, hostname):
+    """Validate and normalize the given hostname.
+
+    @attention: the validation is a bit more relaxed than the standards
+        require; most importantly, we allow underscores in names
+    @raise errors.OpPrereqError: when the name is not valid
+
+    """
+    hostname = hostname.lower()
+    if (not cls._VALID_NAME_RE.match(hostname) or
+        # double-dots, meaning empty label
+        ".." in hostname or
+        # empty initial label
+        hostname.startswith(".")):
+      raise errors.OpPrereqError("Invalid hostname '%s'" % hostname,
+                                 errors.ECODE_INVAL)
+    if hostname.endswith("."):
+      hostname = hostname.rstrip(".")
+    return hostname
+
 
 def GetHostInfo(name=None):
   """Lookup host name and raise an OpPrereqError for failures"""
@@ -1028,6 +1119,16 @@ def RemoveHostFromEtcHosts(hostname):
   RemoveEtcHostsEntry(constants.ETC_HOSTS, hi.ShortName())
 
 
+def TimestampForFilename():
+  """Returns the current time formatted for filenames.
+
+  The format doesn't contain colons as some shells and applications them as
+  separators.
+
+  """
+  return time.strftime("%Y-%m-%d_%H_%M_%S")
+
+
 def CreateBackup(file_name):
   """Creates a backup of a file.
 
@@ -1042,7 +1143,8 @@ def CreateBackup(file_name):
     raise errors.ProgrammerError("Can't make a backup of a non-file '%s'" %
                                 file_name)
 
-  prefix = '%s.backup-%d.' % (os.path.basename(file_name), int(time.time()))
+  prefix = ("%s.backup-%s." %
+            (os.path.basename(file_name), TimestampForFilename()))
   dir_name = os.path.dirname(file_name)
 
   fsrc = open(file_name, 'rb')
@@ -1050,6 +1152,7 @@ def CreateBackup(file_name):
     (fd, backup_name) = tempfile.mkstemp(prefix=prefix, dir=dir_name)
     fdst = os.fdopen(fd, 'wb')
     try:
+      logging.debug("Backing up %s at %s", file_name, backup_name)
       shutil.copyfileobj(fsrc, fdst)
     finally:
       fdst.close()
@@ -1155,8 +1258,12 @@ def ListVisibleFiles(path):
   @param path: the directory to enumerate
   @rtype: list
   @return: the list of all files not starting with a dot
+  @raise ProgrammerError: if L{path} is not an absolue and normalized path
 
   """
+  if not IsNormAbsPath(path):
+    raise errors.ProgrammerError("Path passed to ListVisibleFiles is not"
+                                 " absolute/normalized: '%s'" % path)
   files = [i for i in os.listdir(path) if not i.startswith(".")]
   files.sort()
   return files
@@ -1384,6 +1491,100 @@ def any(seq, pred=bool): # pylint: disable-msg=W0622
   return False
 
 
+def SingleWaitForFdCondition(fdobj, event, timeout):
+  """Waits for a condition to occur on the socket.
+
+  Immediately returns at the first interruption.
+
+  @type fdobj: integer or object supporting a fileno() method
+  @param fdobj: entity to wait for events on
+  @type event: integer
+  @param event: ORed condition (see select module)
+  @type timeout: float or None
+  @param timeout: Timeout in seconds
+  @rtype: int or None
+  @return: None for timeout, otherwise occured conditions
+
+  """
+  check = (event | select.POLLPRI |
+           select.POLLNVAL | select.POLLHUP | select.POLLERR)
+
+  if timeout is not None:
+    # Poller object expects milliseconds
+    timeout *= 1000
+
+  poller = select.poll()
+  poller.register(fdobj, event)
+  try:
+    # TODO: If the main thread receives a signal and we have no timeout, we
+    # could wait forever. This should check a global "quit" flag or something
+    # every so often.
+    io_events = poller.poll(timeout)
+  except select.error, err:
+    if err[0] != errno.EINTR:
+      raise
+    io_events = []
+  if io_events and io_events[0][1] & check:
+    return io_events[0][1]
+  else:
+    return None
+
+
+class FdConditionWaiterHelper(object):
+  """Retry helper for WaitForFdCondition.
+
+  This class contains the retried and wait functions that make sure
+  WaitForFdCondition can continue waiting until the timeout is actually
+  expired.
+
+  """
+
+  def __init__(self, timeout):
+    self.timeout = timeout
+
+  def Poll(self, fdobj, event):
+    result = SingleWaitForFdCondition(fdobj, event, self.timeout)
+    if result is None:
+      raise RetryAgain()
+    else:
+      return result
+
+  def UpdateTimeout(self, timeout):
+    self.timeout = timeout
+
+
+def WaitForFdCondition(fdobj, event, timeout):
+  """Waits for a condition to occur on the socket.
+
+  Retries until the timeout is expired, even if interrupted.
+
+  @type fdobj: integer or object supporting a fileno() method
+  @param fdobj: entity to wait for events on
+  @type event: integer
+  @param event: ORed condition (see select module)
+  @type timeout: float or None
+  @param timeout: Timeout in seconds
+  @rtype: int or None
+  @return: None for timeout, otherwise occured conditions
+
+  """
+  if timeout is not None:
+    retrywaiter = FdConditionWaiterHelper(timeout)
+    result = Retry(retrywaiter.Poll, RETRY_REMAINING_TIME, timeout,
+                   args=(fdobj, event), wait_fn=retrywaiter.UpdateTimeout)
+  else:
+    result = None
+    while result is None:
+      result = SingleWaitForFdCondition(fdobj, event, timeout)
+  return result
+
+
+def partition(seq, pred=bool): # # pylint: disable-msg=W0622
+  "Partition a list in two, based on the given predicate"
+  return (list(itertools.ifilter(pred, seq)),
+          list(itertools.ifilterfalse(pred, seq)))
+
+
 def UniqueSequence(seq):
   """Returns a list with unique elements.
 
@@ -1541,7 +1742,20 @@ def DaemonPidFileName(name):
       daemon name
 
   """
-  return os.path.join(constants.RUN_GANETI_DIR, "%s.pid" % name)
+  return PathJoin(constants.RUN_GANETI_DIR, "%s.pid" % name)
+
+
+def EnsureDaemon(name):
+  """Check for and start daemon if not alive.
+
+  """
+  result = RunCmd([constants.DAEMON_UTIL, "check-and-start", name])
+  if result.failed:
+    logging.error("Can't start daemon '%s', failure %s, output: %s",
+                  name, result.fail_reason, result.output)
+    return False
+
+  return True
 
 
 def WritePidFile(name):
@@ -1669,6 +1883,7 @@ def FindFile(name, search_path, test=os.path.exists):
     return None
 
   for dir_name in search_path:
+    # FIXME: investigate switch to PathJoin
     item_name = os.path.sep.join([dir_name, name])
     # check the user test and that we're indeed resolving to the given
     # basename
@@ -1762,14 +1977,14 @@ def GetDaemonPort(daemon_name):
   return port
 
 
-def SetupLogging(logfile, debug=False, stderr_logging=False, program="",
+def SetupLogging(logfile, debug=0, stderr_logging=False, program="",
                  multithreaded=False, syslog=constants.SYSLOG_USAGE):
   """Configures the logging module.
 
   @type logfile: str
   @param logfile: the filename to which we should log
-  @type debug: boolean
-  @param debug: whether to enable debug messages too or
+  @type debug: integer
+  @param debug: if greater than zero, enable debug messages, otherwise
       only those at C{INFO} and above level
   @type stderr_logging: boolean
   @param stderr_logging: whether we should also log to the standard error
@@ -1857,6 +2072,36 @@ def IsNormAbsPath(path):
   return os.path.normpath(path) == path and os.path.isabs(path)
 
 
+def PathJoin(*args):
+  """Safe-join a list of path components.
+
+  Requirements:
+      - the first argument must be an absolute path
+      - no component in the path must have backtracking (e.g. /../),
+        since we check for normalization at the end
+
+  @param args: the path components to be joined
+  @raise ValueError: for invalid paths
+
+  """
+  # ensure we're having at least one path passed in
+  assert args
+  # ensure the first component is an absolute and normalized path name
+  root = args[0]
+  if not IsNormAbsPath(root):
+    raise ValueError("Invalid parameter to PathJoin: '%s'" % str(args[0]))
+  result = os.path.join(*args)
+  # ensure that the whole path is normalized
+  if not IsNormAbsPath(result):
+    raise ValueError("Invalid parameters to PathJoin: '%s'" % str(args))
+  # check that we're still under the original prefix
+  prefix = os.path.commonprefix([root, result])
+  if prefix != root:
+    raise ValueError("Error: path joining resulted in different prefix"
+                     " (%s != %s)" % (prefix, root))
+  return result
+
+
 def TailFile(fname, lines=20):
   """Return the last lines from a file.
 
@@ -1883,6 +2128,69 @@ def TailFile(fname, lines=20):
   return rows[-lines:]
 
 
+def _ParseAsn1Generalizedtime(value):
+  """Parses an ASN1 GENERALIZEDTIME timestamp as used by pyOpenSSL.
+
+  @type value: string
+  @param value: ASN1 GENERALIZEDTIME timestamp
+
+  """
+  m = re.match(r"^(\d+)([-+]\d\d)(\d\d)$", value)
+  if m:
+    # We have an offset
+    asn1time = m.group(1)
+    hours = int(m.group(2))
+    minutes = int(m.group(3))
+    utcoffset = (60 * hours) + minutes
+  else:
+    if not value.endswith("Z"):
+      raise ValueError("Missing timezone")
+    asn1time = value[:-1]
+    utcoffset = 0
+
+  parsed = time.strptime(asn1time, "%Y%m%d%H%M%S")
+
+  tt = datetime.datetime(*(parsed[:7])) - datetime.timedelta(minutes=utcoffset)
+
+  return calendar.timegm(tt.utctimetuple())
+
+
+def GetX509CertValidity(cert):
+  """Returns the validity period of the certificate.
+
+  @type cert: OpenSSL.crypto.X509
+  @param cert: X509 certificate object
+
+  """
+  # The get_notBefore and get_notAfter functions are only supported in
+  # pyOpenSSL 0.7 and above.
+  try:
+    get_notbefore_fn = cert.get_notBefore
+  except AttributeError:
+    not_before = None
+  else:
+    not_before_asn1 = get_notbefore_fn()
+
+    if not_before_asn1 is None:
+      not_before = None
+    else:
+      not_before = _ParseAsn1Generalizedtime(not_before_asn1)
+
+  try:
+    get_notafter_fn = cert.get_notAfter
+  except AttributeError:
+    not_after = None
+  else:
+    not_after_asn1 = get_notafter_fn()
+
+    if not_after_asn1 is None:
+      not_after = None
+    else:
+      not_after = _ParseAsn1Generalizedtime(not_after_asn1)
+
+  return (not_before, not_after)
+
+
 def SafeEncode(text):
   """Return a 'safe' version of a source string.
 
@@ -1997,7 +2305,7 @@ def CalculateDirectorySize(path):
 
   for (curpath, _, files) in os.walk(path):
     for filename in files:
-      st = os.lstat(os.path.join(curpath, filename))
+      st = os.lstat(PathJoin(curpath, filename))
       size += st.st_size
 
   return BytesToMebibyte(size)
@@ -2019,6 +2327,53 @@ def GetFilesystemStats(path):
   return (tsize, fsize)
 
 
+def RunInSeparateProcess(fn, *args):
+  """Runs a function in a separate process.
+
+  Note: Only boolean return values are supported.
+
+  @type fn: callable
+  @param fn: Function to be called
+  @rtype: bool
+  @return: Function's result
+
+  """
+  pid = os.fork()
+  if pid == 0:
+    # Child process
+    try:
+      # In case the function uses temporary files
+      ResetTempfileModule()
+
+      # Call function
+      result = int(bool(fn(*args)))
+      assert result in (0, 1)
+    except: # pylint: disable-msg=W0702
+      logging.exception("Error while calling function in separate process")
+      # 0 and 1 are reserved for the return value
+      result = 33
+
+    os._exit(result) # pylint: disable-msg=W0212
+
+  # Parent process
+
+  # Avoid zombies and check exit code
+  (_, status) = os.waitpid(pid, 0)
+
+  if os.WIFSIGNALED(status):
+    exitcode = None
+    signum = os.WTERMSIG(status)
+  else:
+    exitcode = os.WEXITSTATUS(status)
+    signum = None
+
+  if not (exitcode in (0, 1) and signum is None):
+    raise errors.GenericError("Child program failed (code=%s, signal=%s)" %
+                              (exitcode, signum))
+
+  return bool(exitcode)
+
+
 def LockedMethod(fn):
   """Synchronized object access decorator.
 
@@ -2172,7 +2527,7 @@ class _RetryDelayCalculator(object):
 
     # Update for next run
     if self._limit is None or self._next < self._limit:
-      self._next = max(self._limit, self._next * self._factor)
+      self._next = min(self._limit, self._next * self._factor)
 
     return current
 
@@ -2262,17 +2617,31 @@ class FileLock(object):
   """Utility class for file locks.
 
   """
-  def __init__(self, filename):
+  def __init__(self, fd, filename):
     """Constructor for FileLock.
 
-    This will open the file denoted by the I{filename} argument.
-
+    @type fd: file
+    @param fd: File object
     @type filename: str
-    @param filename: path to the file to be locked
+    @param filename: Path of the file opened at I{fd}
 
     """
+    self.fd = fd
     self.filename = filename
-    self.fd = open(self.filename, "w")
+
+  @classmethod
+  def Open(cls, filename):
+    """Creates and opens a file to be used as a file-based lock.
+
+    @type filename: string
+    @param filename: path to the file to be locked
+
+    """
+    # Using "os.open" is necessary to allow both opening existing file
+    # read/write and creating if not existing. Vanilla "open" will truncate an
+    # existing file -or- allow creating if not existing.
+    return cls(os.fdopen(os.open(filename, os.O_RDWR | os.O_CREAT), "w+"),
+               filename)
 
   def __del__(self):
     self.Close()
@@ -2302,33 +2671,31 @@ class FileLock(object):
     assert self.fd, "Lock was closed"
     assert timeout is None or timeout >= 0, \
       "If specified, timeout must be positive"
+    assert not (flag & fcntl.LOCK_NB), "LOCK_NB must not be set"
 
-    if timeout is not None:
+    # When a timeout is used, LOCK_NB must always be set
+    if not (timeout is None and blocking):
       flag |= fcntl.LOCK_NB
-      timeout_end = time.time() + timeout
 
-    # Blocking doesn't have effect with timeout
-    elif not blocking:
-      flag |= fcntl.LOCK_NB
-      timeout_end = None
+    if timeout is None:
+      self._Lock(self.fd, flag, timeout)
+    else:
+      try:
+        Retry(self._Lock, (0.1, 1.2, 1.0), timeout,
+              args=(self.fd, flag, timeout))
+      except RetryTimeout:
+        raise errors.LockError(errmsg)
 
-    # TODO: Convert to utils.Retry
+  @staticmethod
+  def _Lock(fd, flag, timeout):
+    try:
+      fcntl.flock(fd, flag)
+    except IOError, err:
+      if timeout is not None and err.errno == errno.EAGAIN:
+        raise RetryAgain()
 
-    retry = True
-    while retry:
-      try:
-        fcntl.flock(self.fd, flag)
-        retry = False
-      except IOError, err:
-        if err.errno in (errno.EAGAIN, ):
-          if timeout_end is not None and time.time() < timeout_end:
-            # Wait before trying again
-            time.sleep(max(0.1, min(1.0, timeout)))
-          else:
-            raise errors.LockError(errmsg)
-        else:
-          logging.exception("fcntl.flock failed")
-          raise
+      logging.exception("fcntl.flock failed")
+      raise
 
   def Exclusive(self, blocking=False, timeout=None):
     """Locks the file in exclusive mode.