Add a new NodeGroup config object
[ganeti-local] / lib / luxi.py
index addf917..669c3dd 100644 (file)
@@ -59,27 +59,31 @@ REQ_QUERY_EXPORTS = "QueryExports"
 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
 REQ_QUERY_TAGS = "QueryTags"
+REQ_QUERY_LOCKS = "QueryLocks"
 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
 
 DEF_CTMO = 10
 DEF_RWTO = 60
 
+# WaitForJobChange timeout
+WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
+
 
 class ProtocolError(errors.GenericError):
-  """Denotes an error in the LUXI protocol"""
+  """Denotes an error in the LUXI protocol."""
 
 
 class ConnectionClosedError(ProtocolError):
-  """Connection closed error"""
+  """Connection closed error."""
 
 
 class TimeoutError(ProtocolError):
-  """Operation timeout error"""
+  """Operation timeout error."""
 
 
 class RequestError(ProtocolError):
-  """Error on request
+  """Error on request.
 
   This signifies an error in the request format or request handling,
   but not (e.g.) an error in starting up an instance.
@@ -92,7 +96,7 @@ class RequestError(ProtocolError):
 
 
 class NoMasterError(ProtocolError):
-  """The master cannot be reached
+  """The master cannot be reached.
 
   This means that the master daemon is not running or the socket has
   been removed.
@@ -100,6 +104,14 @@ class NoMasterError(ProtocolError):
   """
 
 
+class PermissionError(ProtocolError):
+  """Permission denied while connecting to the master socket.
+
+  This means the user doesn't have the proper rights.
+
+  """
+
+
 class Transport:
   """Low-level transport class.
 
@@ -112,15 +124,12 @@ class Transport:
 
   """
 
-  def __init__(self, address, timeouts=None, eom=None):
+  def __init__(self, address, timeouts=None):
     """Constructor for the Client class.
 
     Arguments:
       - address: a valid address the the used transport class
       - timeout: a list of timeouts, to be used on connect and read/write
-      - eom: an identifier to be used as end-of-message which the
-        upper-layer will guarantee that this identifier will not appear
-        in any message
 
     There are two timeouts used since we might want to wait for a long
     time for a response, but the connect timeout should be lower.
@@ -143,11 +152,6 @@ class Transport:
     self._buffer = ""
     self._msgs = collections.deque()
 
-    if eom is None:
-      self.eom = '\3'
-    else:
-      self.eom = eom
-
     try:
       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
 
@@ -173,9 +177,12 @@ class Transport:
     except socket.timeout, err:
       raise TimeoutError("Connect timed out: %s" % str(err))
     except socket.error, err:
-      if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
+      error_code = err.args[0]
+      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
         raise NoMasterError(address)
-      if err.args[0] == errno.EAGAIN:
+      elif error_code in (errno.EPERM, errno.EACCES):
+        raise PermissionError(address)
+      elif error_code == errno.EAGAIN:
         # Server's socket backlog is full at the moment
         raise utils.RetryAgain()
       raise
@@ -193,13 +200,13 @@ class Transport:
     This just sends a message and doesn't wait for the response.
 
     """
-    if self.eom in msg:
+    if constants.LUXI_EOM in msg:
       raise ProtocolError("Message terminator found in payload")
 
     self._CheckSocket()
     try:
       # TODO: sendall is not guaranteed to send everything
-      self.socket.sendall(msg + self.eom)
+      self.socket.sendall(msg + constants.LUXI_EOM)
     except socket.timeout, err:
       raise TimeoutError("Sending timeout: %s" % str(err))
 
@@ -229,7 +236,7 @@ class Transport:
         break
       if not data:
         raise ConnectionClosedError("Connection closed while reading")
-      new_msgs = (self._buffer + data).split(self.eom)
+      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
       self._buffer = new_msgs.pop()
       self._msgs.extend(new_msgs)
     return self._msgs.popleft()
@@ -265,8 +272,9 @@ def ParseRequest(msg):
     logging.error("LUXI request not a dict: %r", msg)
     raise ProtocolError("Invalid LUXI request (not a dict)")
 
-  method = request.get(KEY_METHOD, None)
-  args = request.get(KEY_ARGS, None)
+  method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
+  args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
+
   if method is None or args is None:
     logging.error("LUXI request missing method or arguments: %r", msg)
     raise ProtocolError(("Invalid LUXI request (no method or arguments"
@@ -433,11 +441,27 @@ class Client(object):
     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
 
   def WaitForJobChangeOnce(self, job_id, fields,
-                           prev_job_info, prev_log_serial):
-    timeout = (DEF_RWTO - 1) / 2
+                           prev_job_info, prev_log_serial,
+                           timeout=WFJC_TIMEOUT):
+    """Waits for changes on a job.
+
+    @param job_id: Job ID
+    @type fields: list
+    @param fields: List of field names to be observed
+    @type prev_job_info: None or list
+    @param prev_job_info: Previously received job information
+    @type prev_log_serial: None or int/long
+    @param prev_log_serial: Highest log serial number previously received
+    @type timeout: int/float
+    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
+                    be capped to that value)
+
+    """
+    assert timeout >= 0, "Timeout can not be negative"
     return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
                            (job_id, fields, prev_job_info,
-                            prev_log_serial, timeout))
+                            prev_log_serial,
+                            min(WFJC_TIMEOUT, timeout)))
 
   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
     while True:
@@ -467,3 +491,6 @@ class Client(object):
 
   def QueryTags(self, kind, name):
     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
+
+  def QueryLocks(self, fields, sync):
+    return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))