Implement utils.RunParts and use it for hooks
[ganeti-local] / lib / luxi.py
index 7ccc4b4..a34a237 100644 (file)
@@ -37,6 +37,7 @@ import errno
 from ganeti import serializer
 from ganeti import constants
 from ganeti import errors
+from ganeti import utils
 
 
 KEY_METHOD = 'method'
@@ -56,7 +57,9 @@ REQ_QUERY_NODES = "QueryNodes"
 REQ_QUERY_EXPORTS = "QueryExports"
 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
+REQ_QUERY_TAGS = "QueryTags"
 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
+REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
 
 DEF_CTMO = 10
 DEF_RWTO = 60
@@ -154,15 +157,14 @@ class Transport:
 
     try:
       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-      self.socket.settimeout(self._ctimeout)
+
+      # Try to connect
       try:
-        self.socket.connect(address)
-      except socket.timeout, err:
-        raise TimeoutError("Connect timed out: %s" % str(err))
-      except socket.error, err:
-        if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
-          raise NoMasterError((address,))
-        raise
+        utils.Retry(self._Connect, 1.0, self._ctimeout,
+                    args=(self.socket, address, self._ctimeout))
+      except utils.RetryTimeout:
+        raise TimeoutError("Connect timed out")
+
       self.socket.settimeout(self._rwtimeout)
     except (socket.error, NoMasterError):
       if self.socket is not None:
@@ -170,6 +172,21 @@ class Transport:
       self.socket = None
       raise
 
+  @staticmethod
+  def _Connect(sock, address, timeout):
+    sock.settimeout(timeout)
+    try:
+      sock.connect(address)
+    except socket.timeout, err:
+      raise TimeoutError("Connect timed out: %s" % str(err))
+    except socket.error, err:
+      if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
+        raise NoMasterError(address)
+      if err.args[0] == errno.EAGAIN:
+        # Server's socket backlog is full at the moment
+        raise utils.RetryAgain()
+      raise
+
   def _CheckSocket(self):
     """Make sure we are connected.
 
@@ -285,7 +302,7 @@ class Client(object):
       old_transp = self.transport
       self.transport = None
       old_transp.Close()
-    except Exception:
+    except Exception: # pylint: disable-msg=W0703
       pass
 
   def CallMethod(self, method, args):
@@ -332,6 +349,9 @@ class Client(object):
   def SetQueueDrainFlag(self, drain_flag):
     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
 
+  def SetWatcherPause(self, until):
+    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
+
   def SubmitJob(self, ops):
     ops_state = map(lambda op: op.__getstate__(), ops)
     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
@@ -352,12 +372,17 @@ class Client(object):
     timeout = (DEF_RWTO - 1) / 2
     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
 
-  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
+  def WaitForJobChangeOnce(self, job_id, fields,
+                           prev_job_info, prev_log_serial):
     timeout = (DEF_RWTO - 1) / 2
+    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
+                           (job_id, fields, prev_job_info,
+                            prev_log_serial, timeout))
+
+  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
     while True:
-      result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
-                               (job_id, fields, prev_job_info,
-                                prev_log_serial, timeout))
+      result = self.WaitForJobChangeOnce(job_id, fields,
+                                         prev_job_info, prev_log_serial)
       if result != constants.JOB_NOTCHANGED:
         break
     return result
@@ -380,5 +405,5 @@ class Client(object):
   def QueryConfigValues(self, fields):
     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
 
-
-# TODO: class Server(object)
+  def QueryTags(self, kind, name):
+    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))