Adding '--no-ssh-init' option to 'gnt-cluster init'.
[ganeti-local] / lib / luxi.py
index 1c3ca6d..753b005 100644 (file)
@@ -57,6 +57,7 @@ REQ_QUERY_EXPORTS = "QueryExports"
 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
+REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
 
 DEF_CTMO = 10
 DEF_RWTO = 60
@@ -161,7 +162,7 @@ class Transport:
         raise TimeoutError("Connect timed out: %s" % str(err))
       except socket.error, err:
         if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
-          raise NoMasterError((address,))
+          raise NoMasterError(address)
         raise
       self.socket.settimeout(self._rwtimeout)
     except (socket.error, NoMasterError):
@@ -187,12 +188,13 @@ class Transport:
       raise EncodingError("Message terminator found in payload")
     self._CheckSocket()
     try:
+      # TODO: sendall is not guaranteed to send everything
       self.socket.sendall(msg + self.eom)
     except socket.timeout, err:
       raise TimeoutError("Sending timeout: %s" % str(err))
 
   def Recv(self):
-    """Try to receive a messae from the socket.
+    """Try to receive a message from the socket.
 
     In case we already have messages queued, we just return from the
     queue. Otherwise, we try to read data with a _rwtimeout network
@@ -205,10 +207,16 @@ class Transport:
     while not self._msgs:
       if time.time() > etime:
         raise TimeoutError("Extended receive timeout")
-      try:
-        data = self.socket.recv(4096)
-      except socket.timeout, err:
-        raise TimeoutError("Receive timeout: %s" % str(err))
+      while True:
+        try:
+          data = self.socket.recv(4096)
+        except socket.error, err:
+          if err.args and err.args[0] == errno.EAGAIN:
+            continue
+          raise
+        except socket.timeout, err:
+          raise TimeoutError("Receive timeout: %s" % str(err))
+        break
       if not data:
         raise ConnectionClosedError("Connection closed while reading")
       new_msgs = (self._buffer + data).split(self.eom)
@@ -278,7 +286,7 @@ class Client(object):
       old_transp = self.transport
       self.transport = None
       old_transp.Close()
-    except Exception, err:
+    except Exception:
       pass
 
   def CallMethod(self, method, args):
@@ -317,14 +325,7 @@ class Client(object):
     result = data[KEY_RESULT]
 
     if not data[KEY_SUCCESS]:
-      # TODO: decide on a standard exception
-      if (isinstance(result, (tuple, list)) and len(result) == 2 and
-          isinstance(result[1], (tuple, list))):
-        # custom ganeti errors
-        err_class = errors.GetErrorClass(result[0])
-        if err_class is not None:
-          raise err_class, tuple(result[1])
-
+      errors.MaybeRaise(result)
       raise RequestError(result)
 
     return result
@@ -332,6 +333,9 @@ class Client(object):
   def SetQueueDrainFlag(self, drain_flag):
     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
 
+  def SetWatcherPause(self, until):
+    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
+
   def SubmitJob(self, ops):
     ops_state = map(lambda op: op.__getstate__(), ops)
     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)