Use subdirectories for job queue archive
[ganeti-local] / lib / utils.py
index 4de6a1e..76adfc5 100644 (file)
@@ -203,7 +203,18 @@ def _RunCmdPipe(cmd, env, via_shell, cwd):
     fcntl.fcntl(fd, fcntl.F_SETFL, status | os.O_NONBLOCK)
 
   while fdmap:
-    for fd, event in poller.poll():
+    try:
+      pollresult = poller.poll()
+    except EnvironmentError, eerr:
+      if eerr.errno == errno.EINTR:
+        continue
+      raise
+    except select.error, serr:
+      if serr[0] == errno.EINTR:
+        continue
+      raise
+
+    for fd, event in pollresult:
       if event & select.POLLIN or event & select.POLLPRI:
         data = fdmap[fd][1].read()
         # no data from read signifies EOF (the same as POLLHUP)
@@ -274,6 +285,32 @@ def RemoveFile(filename):
       raise
 
 
+def RenameFile(old, new, mkdir=False, mkdir_mode=0750):
+  """Renames a file.
+
+  @type old: string
+  @param old: Original path
+  @type new: string
+  @param new: New path
+  @type mkdir: bool
+  @param mkdir: Whether to create target directory if it doesn't exist
+  @type mkdir_mode: int
+  @param mkdir_mode: Mode for newly created directories
+
+  """
+  try:
+    return os.rename(old, new)
+  except OSError, err:
+    # In at least one use case of this function, the job queue, directory
+    # creation is very rare. Checking for the directory before renaming is not
+    # as efficient.
+    if mkdir and err.errno == errno.ENOENT:
+      # Create directory and try again
+      os.makedirs(os.path.dirname(new), mkdir_mode)
+      return os.rename(old, new)
+    raise
+
+
 def _FingerprintFile(filename):
   """Compute the fingerprint of a file.
 
@@ -352,8 +389,8 @@ def CheckDict(target, template, logname=None):
 def IsProcessAlive(pid):
   """Check if a given pid exists on the system.
 
-  @note: zombie processes treated as not alive, and giving a
-      pid M{<= 0} causes the function to return False.
+  @note: zombie status is not handled, so zombie processes
+      will be returned as alive
   @type pid: int
   @param pid: the process ID to check
   @rtype: boolean
@@ -364,22 +401,12 @@ def IsProcessAlive(pid):
     return False
 
   try:
-    f = open("/proc/%d/status" % pid)
-  except IOError, err:
+    os.stat("/proc/%d/status" % pid)
+    return True
+  except EnvironmentError, err:
     if err.errno in (errno.ENOENT, errno.ENOTDIR):
       return False
-
-  alive = True
-  try:
-    data = f.readlines()
-    if len(data) > 1:
-      state = data[1].split()
-      if len(state) > 1 and state[1] == "Z":
-        alive = False
-  finally:
-    f.close()
-
-  return alive
+    raise
 
 
 def ReadPidFile(pidfile):
@@ -530,6 +557,37 @@ def BridgeExists(bridge):
   return os.path.isdir("/sys/class/net/%s/bridge" % bridge)
 
 
+def CheckBEParams(beparams):
+  """Checks whether the user-supplied be-params are valid,
+  and converts them from string format where appropriate.
+
+  @type beparams: dict
+  @param beparams: new params dict
+
+  """
+  if beparams:
+    for item in beparams:
+      if item not in constants.BES_PARAMETERS:
+        raise errors.OpPrereqError("Unknown backend parameter %s" % item)
+      if item in (constants.BE_MEMORY, constants.BE_VCPUS):
+        val = beparams[item]
+        if val != constants.VALUE_DEFAULT:
+          try:
+            val = int(val)
+          except ValueError, err:
+            raise errors.OpPrereqError("Invalid %s size: %s" % (item, str(err)))
+          beparams[item] = val
+      if item in (constants.BE_AUTO_BALANCE):
+        val = beparams[item]
+        if not isinstance(val, bool):
+          if val == constants.VALUE_TRUE:
+            beparams[item] = True
+          elif val == constants.VALUE_FALSE:
+            beparams[item] = False
+          else:
+            raise errors.OpPrereqError("Invalid %s value: %s" % (item, val))
+
+
 def NiceSort(name_list):
   """Sort a list of strings based on digit and non-digit groupings.
 
@@ -648,23 +706,40 @@ def BuildShellCmd(template, *args):
   return template % args
 
 
-def FormatUnit(value):
+def FormatUnit(value, units):
   """Formats an incoming number of MiB with the appropriate unit.
 
   @type value: int
   @param value: integer representing the value in MiB (1048576)
+  @type units: char
+  @param units: the type of formatting we should do:
+      - 'h' for automatic scaling
+      - 'm' for MiBs
+      - 'g' for GiBs
+      - 't' for TiBs
   @rtype: str
   @return: the formatted value (with suffix)
 
   """
-  if value < 1024:
-    return "%dM" % round(value, 0)
+  if units not in ('m', 'g', 't', 'h'):
+    raise errors.ProgrammerError("Invalid unit specified '%s'" % str(units))
+
+  suffix = ''
+
+  if units == 'm' or (units == 'h' and value < 1024):
+    if units == 'h':
+      suffix = 'M'
+    return "%d%s" % (round(value, 0), suffix)
 
-  elif value < (1024 * 1024):
-    return "%0.1fG" % round(float(value) / 1024, 1)
+  elif units == 'g' or (units == 'h' and value < (1024 * 1024)):
+    if units == 'h':
+      suffix = 'G'
+    return "%0.1f%s" % (round(float(value) / 1024, 1), suffix)
 
   else:
-    return "%0.1fT" % round(float(value) / 1024 / 1024, 1)
+    if units == 'h':
+      suffix = 'T'
+    return "%0.1f%s" % (round(float(value) / 1024 / 1024, 1), suffix)
 
 
 def ParseUnit(input_string):
@@ -796,7 +871,6 @@ def SetEtcHostsEntry(file_name, ip, hostname, aliases):
     try:
       f = open(file_name, 'r')
       try:
-        written = False
         for line in f:
           fields = line.split()
           if fields and not fields[0].startswith('#') and ip == fields[0]:
@@ -969,7 +1043,7 @@ def TcpPing(target, port, timeout=10, live_port_needed=False, source=None):
   """
   sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
-  sucess = False
+  success = False
 
   if source is not None:
     try:
@@ -1421,11 +1495,25 @@ def KillProcess(pid, signal_=signal.SIGTERM, timeout=30,
   _helper(pid, signal_, waitpid)
   if timeout <= 0:
     return
+
+  # Wait up to $timeout seconds
   end = time.time() + timeout
+  wait = 0.01
   while time.time() < end and IsProcessAlive(pid):
-    time.sleep(0.1)
+    try:
+      (result_pid, _) = os.waitpid(pid, os.WNOHANG)
+      if result_pid > 0:
+        break
+    except OSError:
+      pass
+    time.sleep(wait)
+    # Make wait time longer for next try
+    if wait < 0.1:
+      wait *= 1.5
+
   if IsProcessAlive(pid):
-    _helper(pid, signal.SIGKILL, wait)
+    # Kill process if it's still alive
+    _helper(pid, signal.SIGKILL, waitpid)
 
 
 def FindFile(name, search_path, test=os.path.exists):
@@ -1532,15 +1620,6 @@ def GetNodeDaemonPort():
   return port
 
 
-def GetNodeDaemonPassword():
-  """Get the node password for the cluster.
-
-  @rtype: str
-
-  """
-  return ReadFile(constants.CLUSTER_PASSWORD_FILE)
-
-
 def SetupLogging(logfile, debug=False, stderr_logging=False, program=""):
   """Configures the logging module.
 
@@ -1568,6 +1647,10 @@ def SetupLogging(logfile, debug=False, stderr_logging=False, program=""):
   root_logger = logging.getLogger("")
   root_logger.setLevel(logging.NOTSET)
 
+  # Remove all previously setup handlers
+  for handler in root_logger.handlers:
+    root_logger.removeHandler(handler)
+
   if stderr_logging:
     stderr_handler = logging.StreamHandler()
     stderr_handler.setFormatter(formatter)
@@ -1833,3 +1916,45 @@ class SignalHandler(object):
     # This is not nice and not absolutely atomic, but it appears to be the only
     # solution in Python -- there are no atomic types.
     self.called = True
+
+
+class FieldSet(object):
+  """A simple field set.
+
+  Among the features are:
+    - checking if a string is among a list of static string or regex objects
+    - checking if a whole list of string matches
+    - returning the matching groups from a regex match
+
+  Internally, all fields are held as regular expression objects.
+
+  """
+  def __init__(self, *items):
+    self.items = [re.compile("^%s$" % value) for value in items]
+
+  def Extend(self, other_set):
+    """Extend the field set with the items from another one"""
+    self.items.extend(other_set.items)
+
+  def Matches(self, field):
+    """Checks if a field matches the current set
+
+    @type field: str
+    @param field: the string to match
+    @return: either False or a regular expression match object
+
+    """
+    for m in itertools.ifilter(None, (val.match(field) for val in self.items)):
+      return m
+    return False
+
+  def NonMatching(self, items):
+    """Returns the list of fields not matching the current set
+
+    @type items: list
+    @param items: the list of fields to check
+    @rtype: list
+    @return: list of non-matching fields
+
+    """
+    return [val for val in items if not self.Matches(val)]