Support arguments in utils.RunInSeparateProcess
[ganeti-local] / lib / luxi.py
index f062816..a34a237 100644 (file)
@@ -37,6 +37,7 @@ import errno
 from ganeti import serializer
 from ganeti import constants
 from ganeti import errors
+from ganeti import utils
 
 
 KEY_METHOD = 'method'
@@ -156,15 +157,14 @@ class Transport:
 
     try:
       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-      self.socket.settimeout(self._ctimeout)
+
+      # Try to connect
       try:
-        self.socket.connect(address)
-      except socket.timeout, err:
-        raise TimeoutError("Connect timed out: %s" % str(err))
-      except socket.error, err:
-        if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
-          raise NoMasterError(address)
-        raise
+        utils.Retry(self._Connect, 1.0, self._ctimeout,
+                    args=(self.socket, address, self._ctimeout))
+      except utils.RetryTimeout:
+        raise TimeoutError("Connect timed out")
+
       self.socket.settimeout(self._rwtimeout)
     except (socket.error, NoMasterError):
       if self.socket is not None:
@@ -172,6 +172,21 @@ class Transport:
       self.socket = None
       raise
 
+  @staticmethod
+  def _Connect(sock, address, timeout):
+    sock.settimeout(timeout)
+    try:
+      sock.connect(address)
+    except socket.timeout, err:
+      raise TimeoutError("Connect timed out: %s" % str(err))
+    except socket.error, err:
+      if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
+        raise NoMasterError(address)
+      if err.args[0] == errno.EAGAIN:
+        # Server's socket backlog is full at the moment
+        raise utils.RetryAgain()
+      raise
+
   def _CheckSocket(self):
     """Make sure we are connected.
 
@@ -357,12 +372,17 @@ class Client(object):
     timeout = (DEF_RWTO - 1) / 2
     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
 
-  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
+  def WaitForJobChangeOnce(self, job_id, fields,
+                           prev_job_info, prev_log_serial):
     timeout = (DEF_RWTO - 1) / 2
+    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
+                           (job_id, fields, prev_job_info,
+                            prev_log_serial, timeout))
+
+  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
     while True:
-      result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
-                               (job_id, fields, prev_job_info,
-                                prev_log_serial, timeout))
+      result = self.WaitForJobChangeOnce(job_id, fields,
+                                         prev_job_info, prev_log_serial)
       if result != constants.JOB_NOTCHANGED:
         break
     return result
@@ -387,6 +407,3 @@ class Client(object):
 
   def QueryTags(self, kind, name):
     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
-
-
-# TODO: class Server(object)