Merge branch 'devel-2.7'
[ganeti-local] / lib / luxi.py
index f588e81..a84085d 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
 #
 #
 
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -35,50 +35,86 @@ import time
 import errno
 import logging
 
 import errno
 import logging
 
+from ganeti import compat
 from ganeti import serializer
 from ganeti import constants
 from ganeti import errors
 from ganeti import serializer
 from ganeti import constants
 from ganeti import errors
+from ganeti import utils
+from ganeti import objects
+from ganeti import pathutils
 
 
 KEY_METHOD = "method"
 KEY_ARGS = "args"
 KEY_SUCCESS = "success"
 KEY_RESULT = "result"
 
 
 KEY_METHOD = "method"
 KEY_ARGS = "args"
 KEY_SUCCESS = "success"
 KEY_RESULT = "result"
+KEY_VERSION = "version"
 
 REQ_SUBMIT_JOB = "SubmitJob"
 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
 REQ_CANCEL_JOB = "CancelJob"
 REQ_ARCHIVE_JOB = "ArchiveJob"
 
 REQ_SUBMIT_JOB = "SubmitJob"
 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
 REQ_CANCEL_JOB = "CancelJob"
 REQ_ARCHIVE_JOB = "ArchiveJob"
-REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
+REQ_CHANGE_JOB_PRIORITY = "ChangeJobPriority"
+REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
+REQ_QUERY = "Query"
+REQ_QUERY_FIELDS = "QueryFields"
 REQ_QUERY_JOBS = "QueryJobs"
 REQ_QUERY_INSTANCES = "QueryInstances"
 REQ_QUERY_NODES = "QueryNodes"
 REQ_QUERY_JOBS = "QueryJobs"
 REQ_QUERY_INSTANCES = "QueryInstances"
 REQ_QUERY_NODES = "QueryNodes"
+REQ_QUERY_GROUPS = "QueryGroups"
+REQ_QUERY_NETWORKS = "QueryNetworks"
 REQ_QUERY_EXPORTS = "QueryExports"
 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
 REQ_QUERY_TAGS = "QueryTags"
 REQ_QUERY_EXPORTS = "QueryExports"
 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
 REQ_QUERY_TAGS = "QueryTags"
-REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
+REQ_SET_DRAIN_FLAG = "SetDrainFlag"
 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
 
 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
 
+#: List of all LUXI requests
+REQ_ALL = compat.UniqueFrozenset([
+  REQ_ARCHIVE_JOB,
+  REQ_AUTO_ARCHIVE_JOBS,
+  REQ_CANCEL_JOB,
+  REQ_CHANGE_JOB_PRIORITY,
+  REQ_QUERY,
+  REQ_QUERY_CLUSTER_INFO,
+  REQ_QUERY_CONFIG_VALUES,
+  REQ_QUERY_EXPORTS,
+  REQ_QUERY_FIELDS,
+  REQ_QUERY_GROUPS,
+  REQ_QUERY_INSTANCES,
+  REQ_QUERY_JOBS,
+  REQ_QUERY_NODES,
+  REQ_QUERY_TAGS,
+  REQ_SET_DRAIN_FLAG,
+  REQ_SET_WATCHER_PAUSE,
+  REQ_SUBMIT_JOB,
+  REQ_SUBMIT_MANY_JOBS,
+  REQ_WAIT_FOR_JOB_CHANGE,
+  ])
+
 DEF_CTMO = 10
 DEF_RWTO = 60
 
 DEF_CTMO = 10
 DEF_RWTO = 60
 
+# WaitForJobChange timeout
+WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
+
 
 
-class ProtocolError(errors.GenericError):
-  """Denotes an error in the LUXI protocol"""
+class ProtocolError(errors.LuxiError):
+  """Denotes an error in the LUXI protocol."""
 
 
 class ConnectionClosedError(ProtocolError):
 
 
 class ConnectionClosedError(ProtocolError):
-  """Connection closed error"""
+  """Connection closed error."""
 
 
 class TimeoutError(ProtocolError):
 
 
 class TimeoutError(ProtocolError):
-  """Operation timeout error"""
+  """Operation timeout error."""
 
 
 class RequestError(ProtocolError):
 
 
 class RequestError(ProtocolError):
-  """Error on request
+  """Error on request.
 
   This signifies an error in the request format or request handling,
   but not (e.g.) an error in starting up an instance.
 
   This signifies an error in the request format or request handling,
   but not (e.g.) an error in starting up an instance.
@@ -91,7 +127,7 @@ class RequestError(ProtocolError):
 
 
 class NoMasterError(ProtocolError):
 
 
 class NoMasterError(ProtocolError):
-  """The master cannot be reached
+  """The master cannot be reached.
 
   This means that the master daemon is not running or the socket has
   been removed.
 
   This means that the master daemon is not running or the socket has
   been removed.
@@ -99,6 +135,14 @@ class NoMasterError(ProtocolError):
   """
 
 
   """
 
 
+class PermissionError(ProtocolError):
+  """Permission denied while connecting to the master socket.
+
+  This means the user doesn't have the proper rights.
+
+  """
+
+
 class Transport:
   """Low-level transport class.
 
 class Transport:
   """Low-level transport class.
 
@@ -111,15 +155,12 @@ class Transport:
 
   """
 
 
   """
 
-  def __init__(self, address, timeouts=None, eom=None):
+  def __init__(self, address, timeouts=None):
     """Constructor for the Client class.
 
     Arguments:
       - address: a valid address the the used transport class
       - timeout: a list of timeouts, to be used on connect and read/write
     """Constructor for the Client class.
 
     Arguments:
       - address: a valid address the the used transport class
       - timeout: a list of timeouts, to be used on connect and read/write
-      - eom: an identifier to be used as end-of-message which the
-        upper-layer will guarantee that this identifier will not appear
-        in any message
 
     There are two timeouts used since we might want to wait for a long
     time for a response, but the connect timeout should be lower.
 
     There are two timeouts used since we might want to wait for a long
     time for a response, but the connect timeout should be lower.
@@ -142,22 +183,16 @@ class Transport:
     self._buffer = ""
     self._msgs = collections.deque()
 
     self._buffer = ""
     self._msgs = collections.deque()
 
-    if eom is None:
-      self.eom = '\3'
-    else:
-      self.eom = eom
-
     try:
       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
     try:
       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-      self.socket.settimeout(self._ctimeout)
+
+      # Try to connect
       try:
       try:
-        self.socket.connect(address)
-      except socket.timeout, err:
-        raise TimeoutError("Connect timed out: %s" % str(err))
-      except socket.error, err:
-        if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
-          raise NoMasterError(address)
-        raise
+        utils.Retry(self._Connect, 1.0, self._ctimeout,
+                    args=(self.socket, address, self._ctimeout))
+      except utils.RetryTimeout:
+        raise TimeoutError("Connect timed out")
+
       self.socket.settimeout(self._rwtimeout)
     except (socket.error, NoMasterError):
       if self.socket is not None:
       self.socket.settimeout(self._rwtimeout)
     except (socket.error, NoMasterError):
       if self.socket is not None:
@@ -165,6 +200,24 @@ class Transport:
       self.socket = None
       raise
 
       self.socket = None
       raise
 
+  @staticmethod
+  def _Connect(sock, address, timeout):
+    sock.settimeout(timeout)
+    try:
+      sock.connect(address)
+    except socket.timeout, err:
+      raise TimeoutError("Connect timed out: %s" % str(err))
+    except socket.error, err:
+      error_code = err.args[0]
+      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
+        raise NoMasterError(address)
+      elif error_code in (errno.EPERM, errno.EACCES):
+        raise PermissionError(address)
+      elif error_code == errno.EAGAIN:
+        # Server's socket backlog is full at the moment
+        raise utils.RetryAgain()
+      raise
+
   def _CheckSocket(self):
     """Make sure we are connected.
 
   def _CheckSocket(self):
     """Make sure we are connected.
 
@@ -178,13 +231,13 @@ class Transport:
     This just sends a message and doesn't wait for the response.
 
     """
     This just sends a message and doesn't wait for the response.
 
     """
-    if self.eom in msg:
+    if constants.LUXI_EOM in msg:
       raise ProtocolError("Message terminator found in payload")
 
     self._CheckSocket()
     try:
       # TODO: sendall is not guaranteed to send everything
       raise ProtocolError("Message terminator found in payload")
 
     self._CheckSocket()
     try:
       # TODO: sendall is not guaranteed to send everything
-      self.socket.sendall(msg + self.eom)
+      self.socket.sendall(msg + constants.LUXI_EOM)
     except socket.timeout, err:
       raise TimeoutError("Sending timeout: %s" % str(err))
 
     except socket.timeout, err:
       raise TimeoutError("Sending timeout: %s" % str(err))
 
@@ -205,16 +258,16 @@ class Transport:
       while True:
         try:
           data = self.socket.recv(4096)
       while True:
         try:
           data = self.socket.recv(4096)
+        except socket.timeout, err:
+          raise TimeoutError("Receive timeout: %s" % str(err))
         except socket.error, err:
           if err.args and err.args[0] == errno.EAGAIN:
             continue
           raise
         except socket.error, err:
           if err.args and err.args[0] == errno.EAGAIN:
             continue
           raise
-        except socket.timeout, err:
-          raise TimeoutError("Receive timeout: %s" % str(err))
         break
       if not data:
         raise ConnectionClosedError("Connection closed while reading")
         break
       if not data:
         raise ConnectionClosedError("Connection closed while reading")
-      new_msgs = (self._buffer + data).split(self.eom)
+      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
       self._buffer = new_msgs.pop()
       self._msgs.extend(new_msgs)
     return self._msgs.popleft()
       self._buffer = new_msgs.pop()
       self._msgs.extend(new_msgs)
     return self._msgs.popleft()
@@ -250,14 +303,16 @@ def ParseRequest(msg):
     logging.error("LUXI request not a dict: %r", msg)
     raise ProtocolError("Invalid LUXI request (not a dict)")
 
     logging.error("LUXI request not a dict: %r", msg)
     raise ProtocolError("Invalid LUXI request (not a dict)")
 
-  method = request.get(KEY_METHOD, None)
-  args = request.get(KEY_ARGS, None)
+  method = request.get(KEY_METHOD, None) # pylint: disable=E1103
+  args = request.get(KEY_ARGS, None) # pylint: disable=E1103
+  version = request.get(KEY_VERSION, None) # pylint: disable=E1103
+
   if method is None or args is None:
     logging.error("LUXI request missing method or arguments: %r", msg)
     raise ProtocolError(("Invalid LUXI request (no method or arguments"
                          " in request): %r") % msg)
 
   if method is None or args is None:
     logging.error("LUXI request missing method or arguments: %r", msg)
     raise ProtocolError(("Invalid LUXI request (no method or arguments"
                          " in request): %r") % msg)
 
-  return (method, args)
+  return (method, args, version)
 
 
 def ParseResponse(msg):
 
 
 def ParseResponse(msg):
@@ -267,6 +322,8 @@ def ParseResponse(msg):
   # Parse the result
   try:
     data = serializer.LoadJson(msg)
   # Parse the result
   try:
     data = serializer.LoadJson(msg)
+  except KeyboardInterrupt:
+    raise
   except Exception, err:
     raise ProtocolError("Error while deserializing response: %s" % str(err))
 
   except Exception, err:
     raise ProtocolError("Error while deserializing response: %s" % str(err))
 
@@ -276,10 +333,11 @@ def ParseResponse(msg):
           KEY_RESULT in data):
     raise ProtocolError("Invalid response from server: %r" % data)
 
           KEY_RESULT in data):
     raise ProtocolError("Invalid response from server: %r" % data)
 
-  return (data[KEY_SUCCESS], data[KEY_RESULT])
+  return (data[KEY_SUCCESS], data[KEY_RESULT],
+          data.get(KEY_VERSION, None)) # pylint: disable=E1103
 
 
 
 
-def FormatResponse(success, result):
+def FormatResponse(success, result, version=None):
   """Formats a LUXI response message.
 
   """
   """Formats a LUXI response message.
 
   """
@@ -288,12 +346,15 @@ def FormatResponse(success, result):
     KEY_RESULT: result,
     }
 
     KEY_RESULT: result,
     }
 
+  if version is not None:
+    response[KEY_VERSION] = version
+
   logging.debug("LUXI response: %s", response)
 
   return serializer.DumpJson(response)
 
 
   logging.debug("LUXI response: %s", response)
 
   return serializer.DumpJson(response)
 
 
-def FormatRequest(method, args):
+def FormatRequest(method, args, version=None):
   """Formats a LUXI request message.
 
   """
   """Formats a LUXI request message.
 
   """
@@ -303,22 +364,30 @@ def FormatRequest(method, args):
     KEY_ARGS: args,
     }
 
     KEY_ARGS: args,
     }
 
+  if version is not None:
+    request[KEY_VERSION] = version
+
   # Serialize the request
   # Serialize the request
-  return serializer.DumpJson(request, indent=False)
+  return serializer.DumpJson(request)
 
 
 
 
-def CallLuxiMethod(transport_cb, method, args):
+def CallLuxiMethod(transport_cb, method, args, version=None):
   """Send a LUXI request via a transport and return the response.
 
   """
   assert callable(transport_cb)
 
   """Send a LUXI request via a transport and return the response.
 
   """
   assert callable(transport_cb)
 
-  request_msg = FormatRequest(method, args)
+  request_msg = FormatRequest(method, args, version=version)
 
   # Send request and wait for response
   response_msg = transport_cb(request_msg)
 
 
   # Send request and wait for response
   response_msg = transport_cb(request_msg)
 
-  (success, result) = ParseResponse(response_msg)
+  (success, result, resp_version) = ParseResponse(response_msg)
+
+  # Verify version if there was one in the response
+  if resp_version is not None and resp_version != version:
+    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
+                           (version, resp_version))
 
   if success:
     return result
 
   if success:
     return result
@@ -348,7 +417,7 @@ class Client(object):
 
     """
     if address is None:
 
     """
     if address is None:
-      address = constants.MASTER_SOCKET
+      address = pathutils.MASTER_SOCKET
     self.address = address
     self.timeouts = timeouts
     self.transport_class = transport
     self.address = address
     self.timeouts = timeouts
     self.transport_class = transport
@@ -373,7 +442,7 @@ class Client(object):
       old_transp = self.transport
       self.transport = None
       old_transp.Close()
       old_transp = self.transport
       self.transport = None
       old_transp.Close()
-    except Exception: # pylint: disable-msg=W0703
+    except Exception: # pylint: disable=W0703
       pass
 
   def _SendMethodCall(self, data):
       pass
 
   def _SendMethodCall(self, data):
@@ -385,48 +454,108 @@ class Client(object):
       self._CloseTransport()
       raise
 
       self._CloseTransport()
       raise
 
+  def Close(self):
+    """Close the underlying connection.
+
+    """
+    self._CloseTransport()
+
   def CallMethod(self, method, args):
     """Send a generic request and return the response.
 
     """
   def CallMethod(self, method, args):
     """Send a generic request and return the response.
 
     """
-    return CallLuxiMethod(self._SendMethodCall, method, args)
+    if not isinstance(args, (list, tuple)):
+      raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
+                                   " expected list, got %s" % type(args))
+    return CallLuxiMethod(self._SendMethodCall, method, args,
+                          version=constants.LUXI_VERSION)
 
   def SetQueueDrainFlag(self, drain_flag):
 
   def SetQueueDrainFlag(self, drain_flag):
-    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
+    return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
 
   def SetWatcherPause(self, until):
 
   def SetWatcherPause(self, until):
-    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
+    return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
 
   def SubmitJob(self, ops):
     ops_state = map(lambda op: op.__getstate__(), ops)
 
   def SubmitJob(self, ops):
     ops_state = map(lambda op: op.__getstate__(), ops)
-    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
+    return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
 
   def SubmitManyJobs(self, jobs):
     jobs_state = []
     for ops in jobs:
       jobs_state.append([op.__getstate__() for op in ops])
 
   def SubmitManyJobs(self, jobs):
     jobs_state = []
     for ops in jobs:
       jobs_state.append([op.__getstate__() for op in ops])
-    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
+    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
 
   def CancelJob(self, job_id):
 
   def CancelJob(self, job_id):
-    return self.CallMethod(REQ_CANCEL_JOB, job_id)
+    return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
 
   def ArchiveJob(self, job_id):
 
   def ArchiveJob(self, job_id):
-    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
+    return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
+
+  def ChangeJobPriority(self, job_id, priority):
+    return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
 
   def AutoArchiveJobs(self, age):
     timeout = (DEF_RWTO - 1) / 2
 
   def AutoArchiveJobs(self, age):
     timeout = (DEF_RWTO - 1) / 2
-    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
+    return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
+
+  def WaitForJobChangeOnce(self, job_id, fields,
+                           prev_job_info, prev_log_serial,
+                           timeout=WFJC_TIMEOUT):
+    """Waits for changes on a job.
+
+    @param job_id: Job ID
+    @type fields: list
+    @param fields: List of field names to be observed
+    @type prev_job_info: None or list
+    @param prev_job_info: Previously received job information
+    @type prev_log_serial: None or int/long
+    @param prev_log_serial: Highest log serial number previously received
+    @type timeout: int/float
+    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
+                    be capped to that value)
+
+    """
+    assert timeout >= 0, "Timeout can not be negative"
+    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
+                           (job_id, fields, prev_job_info,
+                            prev_log_serial,
+                            min(WFJC_TIMEOUT, timeout)))
 
   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
 
   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
-    timeout = (DEF_RWTO - 1) / 2
     while True:
     while True:
-      result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
-                               (job_id, fields, prev_job_info,
-                                prev_log_serial, timeout))
+      result = self.WaitForJobChangeOnce(job_id, fields,
+                                         prev_job_info, prev_log_serial)
       if result != constants.JOB_NOTCHANGED:
         break
     return result
 
       if result != constants.JOB_NOTCHANGED:
         break
     return result
 
+  def Query(self, what, fields, qfilter):
+    """Query for resources/items.
+
+    @param what: One of L{constants.QR_VIA_LUXI}
+    @type fields: List of strings
+    @param fields: List of requested fields
+    @type qfilter: None or list
+    @param qfilter: Query filter
+    @rtype: L{objects.QueryResponse}
+
+    """
+    result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
+    return objects.QueryResponse.FromDict(result)
+
+  def QueryFields(self, what, fields):
+    """Query for available fields.
+
+    @param what: One of L{constants.QR_VIA_LUXI}
+    @type fields: None or list of strings
+    @param fields: List of requested fields
+    @rtype: L{objects.QueryFieldsResponse}
+
+    """
+    result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
+    return objects.QueryFieldsResponse.FromDict(result)
+
   def QueryJobs(self, job_ids, fields):
     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
 
   def QueryJobs(self, job_ids, fields):
     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
 
@@ -436,6 +565,12 @@ class Client(object):
   def QueryNodes(self, names, fields, use_locking):
     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
 
   def QueryNodes(self, names, fields, use_locking):
     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
 
+  def QueryGroups(self, names, fields, use_locking):
+    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
+
+  def QueryNetworks(self, names, fields, use_locking):
+    return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
+
   def QueryExports(self, nodes, use_locking):
     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
 
   def QueryExports(self, nodes, use_locking):
     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
 
@@ -443,10 +578,7 @@ class Client(object):
     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
 
   def QueryConfigValues(self, fields):
     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
 
   def QueryConfigValues(self, fields):
-    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
+    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
 
   def QueryTags(self, kind, name):
     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
 
   def QueryTags(self, kind, name):
     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
-
-
-# TODO: class Server(object)