Add --uid-pool option to gnt-cluster init
[ganeti-local] / lib / luxi.py
index 9c27ad7..a34a237 100644 (file)
 
 """Module for the unix socket protocol
 
 
 """Module for the unix socket protocol
 
-This module implements the local unix socket protocl. You only need
+This module implements the local unix socket protocol. You only need
 this module and the opcodes module in the client program in order to
 communicate with the master.
 
 this module and the opcodes module in the client program in order to
 communicate with the master.
 
-The module is also be used by the master daemon.
+The module is also used by the master daemon.
 
 """
 
 import socket
 import collections
 
 """
 
 import socket
 import collections
-import simplejson
 import time
 import time
-
-from ganeti import opcodes
-
-
-KEY_REQUEST = 'request'
-KEY_DATA = 'data'
-REQ_SUBMIT = 'submit'
-REQ_ABORT = 'abort'
-REQ_QUERY = 'query'
+import errno
+
+from ganeti import serializer
+from ganeti import constants
+from ganeti import errors
+from ganeti import utils
+
+
+KEY_METHOD = 'method'
+KEY_ARGS = 'args'
+KEY_SUCCESS = "success"
+KEY_RESULT = "result"
+
+REQ_SUBMIT_JOB = "SubmitJob"
+REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
+REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
+REQ_CANCEL_JOB = "CancelJob"
+REQ_ARCHIVE_JOB = "ArchiveJob"
+REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
+REQ_QUERY_JOBS = "QueryJobs"
+REQ_QUERY_INSTANCES = "QueryInstances"
+REQ_QUERY_NODES = "QueryNodes"
+REQ_QUERY_EXPORTS = "QueryExports"
+REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
+REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
+REQ_QUERY_TAGS = "QueryTags"
+REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
+REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
 
 DEF_CTMO = 10
 DEF_RWTO = 60
 
 DEF_CTMO = 10
 DEF_RWTO = 60
@@ -67,22 +85,26 @@ class DecodingError(ProtocolError):
   """Decoding failure on the receiving side"""
 
 
   """Decoding failure on the receiving side"""
 
 
-def SerializeJob(job):
-  """Convert a job description to a string format.
+class RequestError(ProtocolError):
+  """Error on request
+
+  This signifies an error in the request format or request handling,
+  but not (e.g.) an error in starting up an instance.
+
+  Some common conditions that can trigger this exception:
+    - job submission failed because the job data was wrong
+    - query failed because required fields were missing
 
   """
 
   """
-  return simplejson.dumps(job.__getstate__())
 
 
 
 
-def UnserializeJob(data):
-  """Load a job from a string format"""
-  try:
-    new_data = simplejson.loads(data)
-  except Exception, err:
-    raise DecodingError("Error while unserializing: %s" % str(err))
-  job = opcodes.Job()
-  job.__setstate__(new_data)
-  return job
+class NoMasterError(ProtocolError):
+  """The master cannot be reached
+
+  This means that the master daemon is not running or the socket has
+  been removed.
+
+  """
 
 
 class Transport:
 
 
 class Transport:
@@ -135,18 +157,36 @@ class Transport:
 
     try:
       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
 
     try:
       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-      self.socket.settimeout(self._ctimeout)
+
+      # Try to connect
       try:
       try:
-        self.socket.connect(address)
-      except socket.timeout, err:
-        raise TimeoutError("Connection timed out: %s" % str(err))
+        utils.Retry(self._Connect, 1.0, self._ctimeout,
+                    args=(self.socket, address, self._ctimeout))
+      except utils.RetryTimeout:
+        raise TimeoutError("Connect timed out")
+
       self.socket.settimeout(self._rwtimeout)
       self.socket.settimeout(self._rwtimeout)
-    except socket.error:
+    except (socket.error, NoMasterError):
       if self.socket is not None:
         self.socket.close()
       self.socket = None
       raise
 
       if self.socket is not None:
         self.socket.close()
       self.socket = None
       raise
 
+  @staticmethod
+  def _Connect(sock, address, timeout):
+    sock.settimeout(timeout)
+    try:
+      sock.connect(address)
+    except socket.timeout, err:
+      raise TimeoutError("Connect timed out: %s" % str(err))
+    except socket.error, err:
+      if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
+        raise NoMasterError(address)
+      if err.args[0] == errno.EAGAIN:
+        # Server's socket backlog is full at the moment
+        raise utils.RetryAgain()
+      raise
+
   def _CheckSocket(self):
     """Make sure we are connected.
 
   def _CheckSocket(self):
     """Make sure we are connected.
 
@@ -164,12 +204,13 @@ class Transport:
       raise EncodingError("Message terminator found in payload")
     self._CheckSocket()
     try:
       raise EncodingError("Message terminator found in payload")
     self._CheckSocket()
     try:
+      # TODO: sendall is not guaranteed to send everything
       self.socket.sendall(msg + self.eom)
     except socket.timeout, err:
       raise TimeoutError("Sending timeout: %s" % str(err))
 
   def Recv(self):
       self.socket.sendall(msg + self.eom)
     except socket.timeout, err:
       raise TimeoutError("Sending timeout: %s" % str(err))
 
   def Recv(self):
-    """Try to receive a messae from the socket.
+    """Try to receive a message from the socket.
 
     In case we already have messages queued, we just return from the
     queue. Otherwise, we try to read data with a _rwtimeout network
 
     In case we already have messages queued, we just return from the
     queue. Otherwise, we try to read data with a _rwtimeout network
@@ -182,10 +223,16 @@ class Transport:
     while not self._msgs:
       if time.time() > etime:
         raise TimeoutError("Extended receive timeout")
     while not self._msgs:
       if time.time() > etime:
         raise TimeoutError("Extended receive timeout")
-      try:
-        data = self.socket.recv(4096)
-      except socket.timeout, err:
-        raise TimeoutError("Receive timeout: %s" % str(err))
+      while True:
+        try:
+          data = self.socket.recv(4096)
+        except socket.error, err:
+          if err.args and err.args[0] == errno.EAGAIN:
+            continue
+          raise
+        except socket.timeout, err:
+          raise TimeoutError("Receive timeout: %s" % str(err))
+        break
       if not data:
         raise ConnectionClosedError("Connection closed while reading")
       new_msgs = (self._buffer + data).split(self.eom)
       if not data:
         raise ConnectionClosedError("Connection closed while reading")
       new_msgs = (self._buffer + data).split(self.eom)
@@ -216,7 +263,7 @@ class Client(object):
   implements data serialization/deserialization.
 
   """
   implements data serialization/deserialization.
 
   """
-  def __init__(self, address, timeouts=None, transport=Transport):
+  def __init__(self, address=None, timeouts=None, transport=Transport):
     """Constructor for the Client class.
 
     Arguments:
     """Constructor for the Client class.
 
     Arguments:
@@ -229,24 +276,134 @@ class Client(object):
     class are used.
 
     """
     class are used.
 
     """
-    self.transport = transport(address, timeouts=timeouts)
+    if address is None:
+      address = constants.MASTER_SOCKET
+    self.address = address
+    self.timeouts = timeouts
+    self.transport_class = transport
+    self.transport = None
+    self._InitTransport()
+
+  def _InitTransport(self):
+    """(Re)initialize the transport if needed.
+
+    """
+    if self.transport is None:
+      self.transport = self.transport_class(self.address,
+                                            timeouts=self.timeouts)
+
+  def _CloseTransport(self):
+    """Close the transport, ignoring errors.
+
+    """
+    if self.transport is None:
+      return
+    try:
+      old_transp = self.transport
+      self.transport = None
+      old_transp.Close()
+    except Exception: # pylint: disable-msg=W0703
+      pass
 
 
-  def SendRequest(self, request, data):
+  def CallMethod(self, method, args):
     """Send a generic request and return the response.
 
     """
     """Send a generic request and return the response.
 
     """
-    msg = {KEY_REQUEST: request, KEY_DATA: data}
-    result = self.transport.Call(simplejson.dumps(msg))
+    # Build request
+    request = {
+      KEY_METHOD: method,
+      KEY_ARGS: args,
+      }
+
+    # Serialize the request
+    send_data = serializer.DumpJson(request, indent=False)
+
+    # Send request and wait for response
+    try:
+      self._InitTransport()
+      result = self.transport.Call(send_data)
+    except Exception:
+      self._CloseTransport()
+      raise
+
+    # Parse the result
     try:
     try:
-      data = simplejson.loads(result)
+      data = serializer.LoadJson(result)
     except Exception, err:
       raise ProtocolError("Error while deserializing response: %s" % str(err))
     except Exception, err:
       raise ProtocolError("Error while deserializing response: %s" % str(err))
-    return data
 
 
-  def SubmitJob(self, job):
-    """Submit a job"""
-    return self.SendRequest(REQ_SUBMIT, SerializeJob(job))
+    # Validate response
+    if (not isinstance(data, dict) or
+        KEY_SUCCESS not in data or
+        KEY_RESULT not in data):
+      raise DecodingError("Invalid response from server: %s" % str(data))
+
+    result = data[KEY_RESULT]
+
+    if not data[KEY_SUCCESS]:
+      errors.MaybeRaise(result)
+      raise RequestError(result)
+
+    return result
+
+  def SetQueueDrainFlag(self, drain_flag):
+    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
+
+  def SetWatcherPause(self, until):
+    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
+
+  def SubmitJob(self, ops):
+    ops_state = map(lambda op: op.__getstate__(), ops)
+    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
+
+  def SubmitManyJobs(self, jobs):
+    jobs_state = []
+    for ops in jobs:
+      jobs_state.append([op.__getstate__() for op in ops])
+    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
+
+  def CancelJob(self, job_id):
+    return self.CallMethod(REQ_CANCEL_JOB, job_id)
+
+  def ArchiveJob(self, job_id):
+    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
+
+  def AutoArchiveJobs(self, age):
+    timeout = (DEF_RWTO - 1) / 2
+    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
+
+  def WaitForJobChangeOnce(self, job_id, fields,
+                           prev_job_info, prev_log_serial):
+    timeout = (DEF_RWTO - 1) / 2
+    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
+                           (job_id, fields, prev_job_info,
+                            prev_log_serial, timeout))
+
+  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
+    while True:
+      result = self.WaitForJobChangeOnce(job_id, fields,
+                                         prev_job_info, prev_log_serial)
+      if result != constants.JOB_NOTCHANGED:
+        break
+    return result
+
+  def QueryJobs(self, job_ids, fields):
+    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
+
+  def QueryInstances(self, names, fields, use_locking):
+    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
+
+  def QueryNodes(self, names, fields, use_locking):
+    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
+
+  def QueryExports(self, nodes, use_locking):
+    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
+
+  def QueryClusterInfo(self):
+    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
+
+  def QueryConfigValues(self, fields):
+    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
 
 
-  def Query(self, data):
-    """Make a query"""
-    return self.SendRequest(REQ_QUERY, data)
+  def QueryTags(self, kind, name):
+    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))