Daemons conditionally setup console logging
[ganeti-local] / lib / luxi.py
index 9117253..c967f8e 100644 (file)
 
 """Module for the unix socket protocol
 
-This module implements the local unix socket protocl. You only need
+This module implements the local unix socket protocol. You only need
 this module and the opcodes module in the client program in order to
 communicate with the master.
 
-The module is also be used by the master daemon.
+The module is also used by the master daemon.
 
 """
 
@@ -37,6 +37,7 @@ import errno
 from ganeti import serializer
 from ganeti import constants
 from ganeti import errors
+from ganeti import utils
 
 
 KEY_METHOD = 'method'
@@ -45,6 +46,7 @@ KEY_SUCCESS = "success"
 KEY_RESULT = "result"
 
 REQ_SUBMIT_JOB = "SubmitJob"
+REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
 REQ_CANCEL_JOB = "CancelJob"
 REQ_ARCHIVE_JOB = "ArchiveJob"
@@ -54,10 +56,17 @@ REQ_QUERY_INSTANCES = "QueryInstances"
 REQ_QUERY_NODES = "QueryNodes"
 REQ_QUERY_EXPORTS = "QueryExports"
 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
+REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
+REQ_QUERY_TAGS = "QueryTags"
+REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
+REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
 
 DEF_CTMO = 10
 DEF_RWTO = 60
 
+# WaitForJobChange timeout
+WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
+
 
 class ProtocolError(Exception):
   """Denotes an error in the server communication"""
@@ -151,15 +160,14 @@ class Transport:
 
     try:
       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-      self.socket.settimeout(self._ctimeout)
+
+      # Try to connect
       try:
-        self.socket.connect(address)
-      except socket.timeout, err:
-        raise TimeoutError("Connect timed out: %s" % str(err))
-      except socket.error, err:
-        if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
-          raise NoMasterError((address,))
-        raise
+        utils.Retry(self._Connect, 1.0, self._ctimeout,
+                    args=(self.socket, address, self._ctimeout))
+      except utils.RetryTimeout:
+        raise TimeoutError("Connect timed out")
+
       self.socket.settimeout(self._rwtimeout)
     except (socket.error, NoMasterError):
       if self.socket is not None:
@@ -167,6 +175,21 @@ class Transport:
       self.socket = None
       raise
 
+  @staticmethod
+  def _Connect(sock, address, timeout):
+    sock.settimeout(timeout)
+    try:
+      sock.connect(address)
+    except socket.timeout, err:
+      raise TimeoutError("Connect timed out: %s" % str(err))
+    except socket.error, err:
+      if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
+        raise NoMasterError(address)
+      if err.args[0] == errno.EAGAIN:
+        # Server's socket backlog is full at the moment
+        raise utils.RetryAgain()
+      raise
+
   def _CheckSocket(self):
     """Make sure we are connected.
 
@@ -184,12 +207,13 @@ class Transport:
       raise EncodingError("Message terminator found in payload")
     self._CheckSocket()
     try:
+      # TODO: sendall is not guaranteed to send everything
       self.socket.sendall(msg + self.eom)
     except socket.timeout, err:
       raise TimeoutError("Sending timeout: %s" % str(err))
 
   def Recv(self):
-    """Try to receive a messae from the socket.
+    """Try to receive a message from the socket.
 
     In case we already have messages queued, we just return from the
     queue. Otherwise, we try to read data with a _rwtimeout network
@@ -202,10 +226,16 @@ class Transport:
     while not self._msgs:
       if time.time() > etime:
         raise TimeoutError("Extended receive timeout")
-      try:
-        data = self.socket.recv(4096)
-      except socket.timeout, err:
-        raise TimeoutError("Receive timeout: %s" % str(err))
+      while True:
+        try:
+          data = self.socket.recv(4096)
+        except socket.error, err:
+          if err.args and err.args[0] == errno.EAGAIN:
+            continue
+          raise
+        except socket.timeout, err:
+          raise TimeoutError("Receive timeout: %s" % str(err))
+        break
       if not data:
         raise ConnectionClosedError("Connection closed while reading")
       new_msgs = (self._buffer + data).split(self.eom)
@@ -251,7 +281,32 @@ class Client(object):
     """
     if address is None:
       address = constants.MASTER_SOCKET
-    self.transport = transport(address, timeouts=timeouts)
+    self.address = address
+    self.timeouts = timeouts
+    self.transport_class = transport
+    self.transport = None
+    self._InitTransport()
+
+  def _InitTransport(self):
+    """(Re)initialize the transport if needed.
+
+    """
+    if self.transport is None:
+      self.transport = self.transport_class(self.address,
+                                            timeouts=self.timeouts)
+
+  def _CloseTransport(self):
+    """Close the transport, ignoring errors.
+
+    """
+    if self.transport is None:
+      return
+    try:
+      old_transp = self.transport
+      self.transport = None
+      old_transp.Close()
+    except Exception: # pylint: disable-msg=W0703
+      pass
 
   def CallMethod(self, method, args):
     """Send a generic request and return the response.
@@ -263,8 +318,18 @@ class Client(object):
       KEY_ARGS: args,
       }
 
+    # Serialize the request
+    send_data = serializer.DumpJson(request, indent=False)
+
     # Send request and wait for response
-    result = self.transport.Call(serializer.DumpJson(request, indent=False))
+    try:
+      self._InitTransport()
+      result = self.transport.Call(send_data)
+    except Exception:
+      self._CloseTransport()
+      raise
+
+    # Parse the result
     try:
       data = serializer.LoadJson(result)
     except Exception, err:
@@ -279,22 +344,27 @@ class Client(object):
     result = data[KEY_RESULT]
 
     if not data[KEY_SUCCESS]:
-      # TODO: decide on a standard exception
-      if (isinstance(result, (tuple, list)) and len(result) == 2 and
-          isinstance(result[1], (tuple, list))):
-        # custom ganeti errors
-        err_class = errors.GetErrorClass(result[0])
-        if err_class is not None:
-          raise err_class, tuple(result[1])
-
+      errors.MaybeRaise(result)
       raise RequestError(result)
 
     return result
 
+  def SetQueueDrainFlag(self, drain_flag):
+    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
+
+  def SetWatcherPause(self, until):
+    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
+
   def SubmitJob(self, ops):
     ops_state = map(lambda op: op.__getstate__(), ops)
     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
 
+  def SubmitManyJobs(self, jobs):
+    jobs_state = []
+    for ops in jobs:
+      jobs_state.append([op.__getstate__() for op in ops])
+    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
+
   def CancelJob(self, job_id):
     return self.CallMethod(REQ_CANCEL_JOB, job_id)
 
@@ -302,14 +372,36 @@ class Client(object):
     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
 
   def AutoArchiveJobs(self, age):
-    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, age)
+    timeout = (DEF_RWTO - 1) / 2
+    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
+
+  def WaitForJobChangeOnce(self, job_id, fields,
+                           prev_job_info, prev_log_serial,
+                           timeout=WFJC_TIMEOUT):
+    """Waits for changes on a job.
+
+    @param job_id: Job ID
+    @type fields: list
+    @param fields: List of field names to be observed
+    @type prev_job_info: None or list
+    @param prev_job_info: Previously received job information
+    @type prev_log_serial: None or int/long
+    @param prev_log_serial: Highest log serial number previously received
+    @type timeout: int/float
+    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
+                    be capped to that value)
+
+    """
+    assert timeout >= 0, "Timeout can not be negative"
+    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
+                           (job_id, fields, prev_job_info,
+                            prev_log_serial,
+                            min(WFJC_TIMEOUT, timeout)))
 
   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
-    timeout = (DEF_RWTO - 1) / 2
     while True:
-      result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
-                               (job_id, fields, prev_job_info,
-                                prev_log_serial, timeout))
+      result = self.WaitForJobChangeOnce(job_id, fields,
+                                         prev_job_info, prev_log_serial)
       if result != constants.JOB_NOTCHANGED:
         break
     return result
@@ -317,16 +409,20 @@ class Client(object):
   def QueryJobs(self, job_ids, fields):
     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
 
-  def QueryInstances(self, names, fields):
-    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields))
+  def QueryInstances(self, names, fields, use_locking):
+    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
+
+  def QueryNodes(self, names, fields, use_locking):
+    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
 
-  def QueryNodes(self, names, fields):
-    return self.CallMethod(REQ_QUERY_NODES, (names, fields))
+  def QueryExports(self, nodes, use_locking):
+    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
 
-  def QueryExports(self, nodes):
-    return self.CallMethod(REQ_QUERY_EXPORTS, nodes)
+  def QueryClusterInfo(self):
+    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
 
   def QueryConfigValues(self, fields):
     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
 
-# TODO: class Server(object)
+  def QueryTags(self, kind, name):
+    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))