RAPI: Export beparams as dict. The patch also enables LUQueryInstances to accept...
[ganeti-local] / lib / utils.py
index 54df210..cb4e0ce 100644 (file)
@@ -52,6 +52,7 @@ _locksheld = []
 _re_shell_unquoted = re.compile('^[-.,=:/_+@A-Za-z0-9]+$')
 
 debug = False
 _re_shell_unquoted = re.compile('^[-.,=:/_+@A-Za-z0-9]+$')
 
 debug = False
+debug_locks = False
 no_fork = False
 
 
 no_fork = False
 
 
@@ -102,7 +103,7 @@ class RunResult(object):
   output = property(_GetOutput, None, None, "Return full output")
 
 
   output = property(_GetOutput, None, None, "Return full output")
 
 
-def RunCmd(cmd):
+def RunCmd(cmd, env=None):
   """Execute a (shell) command.
 
   The command should not read from its standard input, as it will be
   """Execute a (shell) command.
 
   The command should not read from its standard input, as it will be
@@ -110,6 +111,8 @@ def RunCmd(cmd):
 
   @param cmd: Command to run
   @type  cmd: string or list
 
   @param cmd: Command to run
   @type  cmd: string or list
+  @param env: Additional environment
+  @type env: dict
   @return: `RunResult` instance
   @rtype: RunResult
 
   @return: `RunResult` instance
   @rtype: RunResult
 
@@ -125,14 +128,18 @@ def RunCmd(cmd):
     strcmd = cmd
     shell = True
   logging.debug("RunCmd '%s'", strcmd)
     strcmd = cmd
     shell = True
   logging.debug("RunCmd '%s'", strcmd)
-  env = os.environ.copy()
-  env["LC_ALL"] = "C"
+
+  cmd_env = os.environ.copy()
+  cmd_env["LC_ALL"] = "C"
+  if env is not None:
+    cmd_env.update(env)
+
   poller = select.poll()
   child = subprocess.Popen(cmd, shell=shell,
                            stderr=subprocess.PIPE,
                            stdout=subprocess.PIPE,
                            stdin=subprocess.PIPE,
   poller = select.poll()
   child = subprocess.Popen(cmd, shell=shell,
                            stderr=subprocess.PIPE,
                            stdout=subprocess.PIPE,
                            stdin=subprocess.PIPE,
-                           close_fds=True, env=env)
+                           close_fds=True, env=cmd_env)
 
   child.stdin.close()
   poller.register(child.stdout, select.POLLIN)
 
   child.stdin.close()
   poller.register(child.stdout, select.POLLIN)
@@ -816,6 +823,21 @@ def TcpPing(target, port, timeout=10, live_port_needed=False, source=None):
   return success
 
 
   return success
 
 
+def OwnIpAddress(address):
+  """Check if the current host has the the given IP address.
+
+  Currently this is done by tcp-pinging the address from the loopback
+  address.
+
+  @type address: string
+  @param address: the addres to check
+  @rtype: bool
+
+  """
+  return TcpPing(address, constants.DEFAULT_NODED_PORT,
+                 source=constants.LOCALHOST_IP_ADDRESS)
+
+
 def ListVisibleFiles(path):
   """Returns a list of all visible files in a directory.
 
 def ListVisibleFiles(path):
   """Returns a list of all visible files in a directory.
 
@@ -857,6 +879,33 @@ def NewUUID():
     f.close()
 
 
     f.close()
 
 
+def GenerateSecret():
+  """Generates a random secret.
+
+  This will generate a pseudo-random secret, and return its sha digest
+  (so that it can be used where an ASCII string is needed).
+
+  """
+  return sha.new(os.urandom(64)).hexdigest()
+
+
+def ReadFile(file_name, size=None):
+  """Reads a file.
+
+  @type size: None or int
+  @param size: Read at most size bytes
+
+  """
+  f = open(file_name, "r")
+  try:
+    if size is None:
+      return f.read()
+    else:
+      return f.read(size)
+  finally:
+    f.close()
+
+
 def WriteFile(file_name, fn=None, data=None,
               mode=None, uid=-1, gid=-1,
               atime=None, mtime=None, close=True,
 def WriteFile(file_name, fn=None, data=None,
               mode=None, uid=-1, gid=-1,
               atime=None, mtime=None, close=True,
@@ -1161,32 +1210,107 @@ def CheckVolumeGroupSize(vglist, vgname, minsize):
   return None
 
 
   return None
 
 
-def SplitTime(seconds):
+def SplitTime(value):
   """Splits time as floating point number into a tuple.
 
   """Splits time as floating point number into a tuple.
 
-  @param seconds: Time in seconds
-  @type seconds: int or float
-  @return: Tuple containing (seconds, milliseconds)
+  @param value: Time in seconds
+  @type value: int or float
+  @return: Tuple containing (seconds, microseconds)
 
   """
 
   """
-  (seconds, fraction) = divmod(seconds, 1.0)
-  return (int(seconds), int(round(fraction * 1000, 0)))
+  (seconds, microseconds) = divmod(int(value * 1000000), 1000000)
+
+  assert 0 <= seconds, \
+    "Seconds must be larger than or equal to 0, but are %s" % seconds
+  assert 0 <= microseconds <= 999999, \
+    "Microseconds must be 0-999999, but are %s" % microseconds
+
+  return (int(seconds), int(microseconds))
 
 
 def MergeTime(timetuple):
   """Merges a tuple into time as a floating point number.
 
 
 
 def MergeTime(timetuple):
   """Merges a tuple into time as a floating point number.
 
-  @param timetuple: Time as tuple, (seconds, milliseconds)
+  @param timetuple: Time as tuple, (seconds, microseconds)
   @type timetuple: tuple
   @return: Time as a floating point number expressed in seconds
 
   """
   @type timetuple: tuple
   @return: Time as a floating point number expressed in seconds
 
   """
-  (seconds, milliseconds) = timetuple
+  (seconds, microseconds) = timetuple
+
+  assert 0 <= seconds, \
+    "Seconds must be larger than or equal to 0, but are %s" % seconds
+  assert 0 <= microseconds <= 999999, \
+    "Microseconds must be 0-999999, but are %s" % microseconds
+
+  return float(seconds) + (float(microseconds) * 0.000001)
+
 
 
-  assert 0 <= seconds, "Seconds must be larger than 0"
-  assert 0 <= milliseconds <= 999, "Milliseconds must be 0-999"
+def GetNodeDaemonPort():
+  """Get the node daemon port for this cluster.
 
 
-  return float(seconds) + (float(1) / 1000 * milliseconds)
+  Note that this routine does not read a ganeti-specific file, but
+  instead uses socket.getservbyname to allow pre-customization of
+  this parameter outside of Ganeti.
+
+  """
+  try:
+    port = socket.getservbyname("ganeti-noded", "tcp")
+  except socket.error:
+    port = constants.DEFAULT_NODED_PORT
+
+  return port
+
+
+def GetNodeDaemonPassword():
+  """Get the node password for the cluster.
+
+  """
+  return ReadFile(constants.CLUSTER_PASSWORD_FILE)
+
+
+def SetupLogging(logfile, debug=False, stderr_logging=False, program=""):
+  """Configures the logging module.
+
+  """
+  fmt = "%(asctime)s: " + program + " "
+  if debug:
+    fmt += ("pid=%(process)d/%(threadName)s %(levelname)s"
+           " %(module)s:%(lineno)s %(message)s")
+  else:
+    fmt += "pid=%(process)d %(levelname)s %(message)s"
+  formatter = logging.Formatter(fmt)
+
+  root_logger = logging.getLogger("")
+  root_logger.setLevel(logging.NOTSET)
+
+  if stderr_logging:
+    stderr_handler = logging.StreamHandler()
+    stderr_handler.setFormatter(formatter)
+    if debug:
+      stderr_handler.setLevel(logging.NOTSET)
+    else:
+      stderr_handler.setLevel(logging.CRITICAL)
+    root_logger.addHandler(stderr_handler)
+
+  # this can fail, if the logging directories are not setup or we have
+  # a permisssion problem; in this case, it's best to log but ignore
+  # the error if stderr_logging is True, and if false we re-raise the
+  # exception since otherwise we could run but without any logs at all
+  try:
+    logfile_handler = logging.FileHandler(logfile)
+    logfile_handler.setFormatter(formatter)
+    if debug:
+      logfile_handler.setLevel(logging.DEBUG)
+    else:
+      logfile_handler.setLevel(logging.INFO)
+    root_logger.addHandler(logfile_handler)
+  except EnvironmentError, err:
+    if stderr_logging:
+      logging.exception("Failed to enable logging to file '%s'", logfile)
+    else:
+      # we need to re-raise the exception
+      raise
 
 
 def LockedMethod(fn):
 
 
 def LockedMethod(fn):
@@ -1196,14 +1320,22 @@ def LockedMethod(fn):
   object's own lock which is hardcoded to '_lock'.
 
   """
   object's own lock which is hardcoded to '_lock'.
 
   """
+  def _LockDebug(*args, **kwargs):
+    if debug_locks:
+      logging.debug(*args, **kwargs)
+
   def wrapper(self, *args, **kwargs):
     assert hasattr(self, '_lock')
     lock = self._lock
   def wrapper(self, *args, **kwargs):
     assert hasattr(self, '_lock')
     lock = self._lock
+    _LockDebug("Waiting for %s", lock)
     lock.acquire()
     try:
     lock.acquire()
     try:
+      _LockDebug("Acquired %s", lock)
       result = fn(self, *args, **kwargs)
     finally:
       result = fn(self, *args, **kwargs)
     finally:
+      _LockDebug("Releasing %s", lock)
       lock.release()
       lock.release()
+      _LockDebug("Released %s", lock)
     return result
   return wrapper
 
     return result
   return wrapper
 
@@ -1236,36 +1368,64 @@ class FileLock(object):
       self.fd.close()
       self.fd = None
 
       self.fd.close()
       self.fd = None
 
-  def _flock(self, flag, blocking, errmsg):
+  def _flock(self, flag, blocking, timeout, errmsg):
+    """Wrapper for fcntl.flock.
+
+    @type flag: int
+    @param flag: Operation flag
+    @type blocking: bool
+    @param blocking: Whether the operation should be done in blocking mode.
+    @type timeout: None or float
+    @param timeout: For how long the operation should be retried (implies
+                    non-blocking mode).
+    @type errmsg: string
+    @param errmsg: Error message in case operation fails.
+
+    """
     assert self.fd, "Lock was closed"
     assert self.fd, "Lock was closed"
+    assert timeout is None or timeout >= 0, \
+      "If specified, timeout must be positive"
 
 
-    if not blocking:
+    if timeout is not None:
       flag |= fcntl.LOCK_NB
       flag |= fcntl.LOCK_NB
+      timeout_end = time.time() + timeout
 
 
-    try:
-      fcntl.flock(self.fd, flag)
-    except IOError, err:
-      if err.errno in (errno.EAGAIN, ):
-        raise errors.LockError(errmsg)
-      else:
-        logging.exception("fcntl.flock failed")
-        raise
-
-  def Exclusive(self, blocking=False):
+    # Blocking doesn't have effect with timeout
+    elif not blocking:
+      flag |= fcntl.LOCK_NB
+      timeout_end = None
+
+    retry = True
+    while retry:
+      try:
+        fcntl.flock(self.fd, flag)
+        retry = False
+      except IOError, err:
+        if err.errno in (errno.EAGAIN, ):
+          if timeout_end is not None and time.time() < timeout_end:
+            # Wait before trying again
+            time.sleep(max(0.1, min(1.0, timeout)))
+          else:
+            raise errors.LockError(errmsg)
+        else:
+          logging.exception("fcntl.flock failed")
+          raise
+
+  def Exclusive(self, blocking=False, timeout=None):
     """Locks the file in exclusive mode.
 
     """
     """Locks the file in exclusive mode.
 
     """
-    self._flock(fcntl.LOCK_EX, blocking,
+    self._flock(fcntl.LOCK_EX, blocking, timeout,
                 "Failed to lock %s in exclusive mode" % self.filename)
 
                 "Failed to lock %s in exclusive mode" % self.filename)
 
-  def Shared(self, blocking=False):
+  def Shared(self, blocking=False, timeout=None):
     """Locks the file in shared mode.
 
     """
     """Locks the file in shared mode.
 
     """
-    self._flock(fcntl.LOCK_SH, blocking,
+    self._flock(fcntl.LOCK_SH, blocking, timeout,
                 "Failed to lock %s in shared mode" % self.filename)
 
                 "Failed to lock %s in shared mode" % self.filename)
 
-  def Unlock(self, blocking=True):
+  def Unlock(self, blocking=True, timeout=None):
     """Unlocks the file.
 
     According to "man flock", unlocking can also be a nonblocking operation:
     """Unlocks the file.
 
     According to "man flock", unlocking can also be a nonblocking operation:
@@ -1273,7 +1433,7 @@ class FileLock(object):
     operations"
 
     """
     operations"
 
     """
-    self._flock(fcntl.LOCK_UN, blocking,
+    self._flock(fcntl.LOCK_UN, blocking, timeout,
                 "Failed to unlock %s" % self.filename)
 
 
                 "Failed to unlock %s" % self.filename)