Export the hypervisor.ValidateParameters over RPC
[ganeti-local] / lib / utils.py
index 33bf83f..761dc0f 100644 (file)
@@ -40,6 +40,7 @@ import select
 import fcntl
 import resource
 import logging
+import signal
 
 from cStringIO import StringIO
 
@@ -51,6 +52,7 @@ _locksheld = []
 _re_shell_unquoted = re.compile('^[-.,=:/_+@A-Za-z0-9]+$')
 
 debug = False
+debug_locks = False
 no_fork = False
 
 
@@ -73,13 +75,13 @@ class RunResult(object):
                "failed", "fail_reason", "cmd"]
 
 
-  def __init__(self, exit_code, signal, stdout, stderr, cmd):
+  def __init__(self, exit_code, signal_, stdout, stderr, cmd):
     self.cmd = cmd
     self.exit_code = exit_code
-    self.signal = signal
+    self.signal = signal_
     self.stdout = stdout
     self.stderr = stderr
-    self.failed = (signal is not None or exit_code != 0)
+    self.failed = (signal_ is not None or exit_code != 0)
 
     if self.signal is not None:
       self.fail_reason = "terminated by signal %s" % self.signal
@@ -101,16 +103,18 @@ class RunResult(object):
   output = property(_GetOutput, None, None, "Return full output")
 
 
-def RunCmd(cmd):
+def RunCmd(cmd, env=None):
   """Execute a (shell) command.
 
   The command should not read from its standard input, as it will be
   closed.
 
-  Args:
-    cmd: command to run. (str)
-
-  Returns: `RunResult` instance
+  @param cmd: Command to run
+  @type  cmd: string or list
+  @param env: Additional environment
+  @type env: dict
+  @return: `RunResult` instance
+  @rtype: RunResult
 
   """
   if no_fork:
@@ -124,14 +128,18 @@ def RunCmd(cmd):
     strcmd = cmd
     shell = True
   logging.debug("RunCmd '%s'", strcmd)
-  env = os.environ.copy()
-  env["LC_ALL"] = "C"
+
+  cmd_env = os.environ.copy()
+  cmd_env["LC_ALL"] = "C"
+  if env is not None:
+    cmd_env.update(env)
+
   poller = select.poll()
   child = subprocess.Popen(cmd, shell=shell,
                            stderr=subprocess.PIPE,
                            stdout=subprocess.PIPE,
                            stdin=subprocess.PIPE,
-                           close_fds=True, env=env)
+                           close_fds=True, env=cmd_env)
 
   child.stdin.close()
   poller.register(child.stdout, select.POLLIN)
@@ -167,12 +175,12 @@ def RunCmd(cmd):
   status = child.wait()
   if status >= 0:
     exitcode = status
-    signal = None
+    signal_ = None
   else:
     exitcode = None
-    signal = -status
+    signal_ = -status
 
-  return RunResult(exitcode, signal, out, err, strcmd)
+  return RunResult(exitcode, signal_, out, err, strcmd)
 
 
 def RemoveFile(filename):
@@ -267,9 +275,13 @@ def IsProcessAlive(pid):
 
   Returns: true or false, depending on if the pid exists or not
 
-  Remarks: zombie processes treated as not alive
+  Remarks: zombie processes treated as not alive, and giving a pid <=
+  0 makes the function to return False.
 
   """
+  if pid <= 0:
+    return False
+
   try:
     f = open("/proc/%d/status" % pid)
   except IOError, err:
@@ -289,6 +301,32 @@ def IsProcessAlive(pid):
   return alive
 
 
+def ReadPidFile(pidfile):
+  """Read the pid from a file.
+
+  @param pidfile: Path to a file containing the pid to be checked
+  @type  pidfile: string (filename)
+  @return: The process id, if the file exista and contains a valid PID,
+           otherwise 0
+  @rtype: int
+
+  """
+  try:
+    pf = open(pidfile, 'r')
+  except EnvironmentError, err:
+    if err.errno != errno.ENOENT:
+      logging.exception("Can't read pid file?!")
+    return 0
+
+  try:
+    pid = int(pf.read())
+  except ValueError, err:
+    logging.info("Can't parse pid file contents", exc_info=True)
+    return 0
+
+  return pid
+
+
 def MatchNameComponent(key, name_list):
   """Try to match a name against a list.
 
@@ -785,6 +823,21 @@ def TcpPing(target, port, timeout=10, live_port_needed=False, source=None):
   return success
 
 
+def OwnIpAddress(address):
+  """Check if the current host has the the given IP address.
+
+  Currently this is done by tcp-pinging the address from the loopback
+  address.
+
+  @type address: string
+  @param address: the addres to check
+  @rtype: bool
+
+  """
+  return TcpPing(address, constants.DEFAULT_NODED_PORT,
+                 source=constants.LOCALHOST_IP_ADDRESS)
+
+
 def ListVisibleFiles(path):
   """Returns a list of all visible files in a directory.
 
@@ -826,6 +879,33 @@ def NewUUID():
     f.close()
 
 
+def GenerateSecret():
+  """Generates a random secret.
+
+  This will generate a pseudo-random secret, and return its sha digest
+  (so that it can be used where an ASCII string is needed).
+
+  """
+  return sha.new(os.urandom(64)).hexdigest()
+
+
+def ReadFile(file_name, size=None):
+  """Reads a file.
+
+  @type size: None or int
+  @param size: Read at most size bytes
+
+  """
+  f = open(file_name, "r")
+  try:
+    if size is None:
+      return f.read()
+    else:
+      return f.read(size)
+  finally:
+    f.close()
+
+
 def WriteFile(file_name, fn=None, data=None,
               mode=None, uid=-1, gid=-1,
               atime=None, mtime=None, close=True,
@@ -1025,6 +1105,71 @@ def Daemonize(logfile, noclose_fds=None):
   return 0
 
 
+def DaemonPidFileName(name):
+  """Compute a ganeti pid file absolute path, given the daemon name.
+
+  """
+  return os.path.join(constants.RUN_GANETI_DIR, "%s.pid" % name)
+
+
+def WritePidFile(name):
+  """Write the current process pidfile.
+
+  The file will be written to constants.RUN_GANETI_DIR/name.pid
+
+  """
+  pid = os.getpid()
+  pidfilename = DaemonPidFileName(name)
+  if IsProcessAlive(ReadPidFile(pidfilename)):
+    raise errors.GenericError("%s contains a live process" % pidfilename)
+
+  WriteFile(pidfilename, data="%d\n" % pid)
+
+
+def RemovePidFile(name):
+  """Remove the current process pidfile.
+
+  Any errors are ignored.
+
+  """
+  pid = os.getpid()
+  pidfilename = DaemonPidFileName(name)
+  # TODO: we could check here that the file contains our pid
+  try:
+    RemoveFile(pidfilename)
+  except:
+    pass
+
+
+def KillProcess(pid, signal_=signal.SIGTERM, timeout=30):
+  """Kill a process given by its pid.
+
+  @type pid: int
+  @param pid: The PID to terminate.
+  @type signal_: int
+  @param signal_: The signal to send, by default SIGTERM
+  @type timeout: int
+  @param timeout: The timeout after which, if the process is still alive,
+                  a SIGKILL will be sent. If not positive, no such checking
+                  will be done
+
+  """
+  if pid <= 0:
+    # kill with pid=0 == suicide
+    raise errors.ProgrammerError("Invalid pid given '%s'" % pid)
+
+  if not IsProcessAlive(pid):
+    return
+  os.kill(pid, signal_)
+  if timeout <= 0:
+    return
+  end = time.time() + timeout
+  while time.time() < end and IsProcessAlive(pid):
+    time.sleep(0.1)
+  if IsProcessAlive(pid):
+    os.kill(pid, signal.SIGKILL)
+
+
 def FindFile(name, search_path, test=os.path.exists):
   """Look for a filesystem object in a given path.
 
@@ -1065,6 +1210,65 @@ def CheckVolumeGroupSize(vglist, vgname, minsize):
   return None
 
 
+def SplitTime(value):
+  """Splits time as floating point number into a tuple.
+
+  @param value: Time in seconds
+  @type value: int or float
+  @return: Tuple containing (seconds, microseconds)
+
+  """
+  (seconds, microseconds) = divmod(int(value * 1000000), 1000000)
+
+  assert 0 <= seconds, \
+    "Seconds must be larger than or equal to 0, but are %s" % seconds
+  assert 0 <= microseconds <= 999999, \
+    "Microseconds must be 0-999999, but are %s" % microseconds
+
+  return (int(seconds), int(microseconds))
+
+
+def MergeTime(timetuple):
+  """Merges a tuple into time as a floating point number.
+
+  @param timetuple: Time as tuple, (seconds, microseconds)
+  @type timetuple: tuple
+  @return: Time as a floating point number expressed in seconds
+
+  """
+  (seconds, microseconds) = timetuple
+
+  assert 0 <= seconds, \
+    "Seconds must be larger than or equal to 0, but are %s" % seconds
+  assert 0 <= microseconds <= 999999, \
+    "Microseconds must be 0-999999, but are %s" % microseconds
+
+  return float(seconds) + (float(microseconds) * 0.000001)
+
+
+def GetNodeDaemonPort():
+  """Get the node daemon port for this cluster.
+
+  Note that this routine does not read a ganeti-specific file, but
+  instead uses socket.getservbyname to allow pre-customization of
+  this parameter outside of Ganeti.
+
+  """
+  try:
+    port = socket.getservbyname("ganeti-noded", "tcp")
+  except socket.error:
+    port = constants.DEFAULT_NODED_PORT
+
+  return port
+
+
+def GetNodeDaemonPassword():
+  """Get the node password for the cluster.
+
+  """
+  return ReadFile(constants.CLUSTER_PASSWORD_FILE)
+
+
 def LockedMethod(fn):
   """Synchronized object access decorator.
 
@@ -1072,13 +1276,186 @@ def LockedMethod(fn):
   object's own lock which is hardcoded to '_lock'.
 
   """
+  def _LockDebug(*args, **kwargs):
+    if debug_locks:
+      logging.debug(*args, **kwargs)
+
   def wrapper(self, *args, **kwargs):
     assert hasattr(self, '_lock')
     lock = self._lock
+    _LockDebug("Waiting for %s", lock)
     lock.acquire()
     try:
+      _LockDebug("Acquired %s", lock)
       result = fn(self, *args, **kwargs)
     finally:
+      _LockDebug("Releasing %s", lock)
       lock.release()
+      _LockDebug("Released %s", lock)
     return result
   return wrapper
+
+
+def LockFile(fd):
+  """Locks a file using POSIX locks.
+
+  """
+  try:
+    fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
+  except IOError, err:
+    if err.errno == errno.EAGAIN:
+      raise errors.LockError("File already locked")
+    raise
+
+
+class FileLock(object):
+  """Utility class for file locks.
+
+  """
+  def __init__(self, filename):
+    self.filename = filename
+    self.fd = open(self.filename, "w")
+
+  def __del__(self):
+    self.Close()
+
+  def Close(self):
+    if self.fd:
+      self.fd.close()
+      self.fd = None
+
+  def _flock(self, flag, blocking, timeout, errmsg):
+    """Wrapper for fcntl.flock.
+
+    @type flag: int
+    @param flag: Operation flag
+    @type blocking: bool
+    @param blocking: Whether the operation should be done in blocking mode.
+    @type timeout: None or float
+    @param timeout: For how long the operation should be retried (implies
+                    non-blocking mode).
+    @type errmsg: string
+    @param errmsg: Error message in case operation fails.
+
+    """
+    assert self.fd, "Lock was closed"
+    assert timeout is None or timeout >= 0, \
+      "If specified, timeout must be positive"
+
+    if timeout is not None:
+      flag |= fcntl.LOCK_NB
+      timeout_end = time.time() + timeout
+
+    # Blocking doesn't have effect with timeout
+    elif not blocking:
+      flag |= fcntl.LOCK_NB
+      timeout_end = None
+
+    retry = True
+    while retry:
+      try:
+        fcntl.flock(self.fd, flag)
+        retry = False
+      except IOError, err:
+        if err.errno in (errno.EAGAIN, ):
+          if timeout_end is not None and time.time() < timeout_end:
+            # Wait before trying again
+            time.sleep(max(0.1, min(1.0, timeout)))
+          else:
+            raise errors.LockError(errmsg)
+        else:
+          logging.exception("fcntl.flock failed")
+          raise
+
+  def Exclusive(self, blocking=False, timeout=None):
+    """Locks the file in exclusive mode.
+
+    """
+    self._flock(fcntl.LOCK_EX, blocking, timeout,
+                "Failed to lock %s in exclusive mode" % self.filename)
+
+  def Shared(self, blocking=False, timeout=None):
+    """Locks the file in shared mode.
+
+    """
+    self._flock(fcntl.LOCK_SH, blocking, timeout,
+                "Failed to lock %s in shared mode" % self.filename)
+
+  def Unlock(self, blocking=True, timeout=None):
+    """Unlocks the file.
+
+    According to "man flock", unlocking can also be a nonblocking operation:
+    "To make a non-blocking request, include LOCK_NB with any of the above
+    operations"
+
+    """
+    self._flock(fcntl.LOCK_UN, blocking, timeout,
+                "Failed to unlock %s" % self.filename)
+
+
+class SignalHandler(object):
+  """Generic signal handler class.
+
+  It automatically restores the original handler when deconstructed or when
+  Reset() is called. You can either pass your own handler function in or query
+  the "called" attribute to detect whether the signal was sent.
+
+  """
+  def __init__(self, signum):
+    """Constructs a new SignalHandler instance.
+
+    @param signum: Single signal number or set of signal numbers
+
+    """
+    if isinstance(signum, (int, long)):
+      self.signum = set([signum])
+    else:
+      self.signum = set(signum)
+
+    self.called = False
+
+    self._previous = {}
+    try:
+      for signum in self.signum:
+        # Setup handler
+        prev_handler = signal.signal(signum, self._HandleSignal)
+        try:
+          self._previous[signum] = prev_handler
+        except:
+          # Restore previous handler
+          signal.signal(signum, prev_handler)
+          raise
+    except:
+      # Reset all handlers
+      self.Reset()
+      # Here we have a race condition: a handler may have already been called,
+      # but there's not much we can do about it at this point.
+      raise
+
+  def __del__(self):
+    self.Reset()
+
+  def Reset(self):
+    """Restore previous handler.
+
+    """
+    for signum, prev_handler in self._previous.items():
+      signal.signal(signum, prev_handler)
+      # If successful, remove from dict
+      del self._previous[signum]
+
+  def Clear(self):
+    """Unsets "called" flag.
+
+    This function can be used in case a signal may arrive several times.
+
+    """
+    self.called = False
+
+  def _HandleSignal(self, signum, frame):
+    """Actual signal handling function.
+
+    """
+    # This is not nice and not absolutely atomic, but it appears to be the only
+    # solution in Python -- there are no atomic types.
+    self.called = True