locking: Allow checking if lock is owned in certain mode
[ganeti-local] / lib / luxi.py
index b4d0609..503b5ec 100644 (file)
@@ -1,7 +1,7 @@
 #
 #
 
 #
 #
 
-# Copyright (C) 2006, 2007 Google Inc.
+# Copyright (C) 2006, 2007, 2011 Google Inc.
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -33,17 +33,21 @@ import socket
 import collections
 import time
 import errno
 import collections
 import time
 import errno
+import logging
+import warnings
 
 from ganeti import serializer
 from ganeti import constants
 from ganeti import errors
 from ganeti import utils
 
 from ganeti import serializer
 from ganeti import constants
 from ganeti import errors
 from ganeti import utils
+from ganeti import objects
 
 
 
 
-KEY_METHOD = 'method'
-KEY_ARGS = 'args'
+KEY_METHOD = "method"
+KEY_ARGS = "args"
 KEY_SUCCESS = "success"
 KEY_RESULT = "result"
 KEY_SUCCESS = "success"
 KEY_RESULT = "result"
+KEY_VERSION = "version"
 
 REQ_SUBMIT_JOB = "SubmitJob"
 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
 
 REQ_SUBMIT_JOB = "SubmitJob"
 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
@@ -51,13 +55,17 @@ REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
 REQ_CANCEL_JOB = "CancelJob"
 REQ_ARCHIVE_JOB = "ArchiveJob"
 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
 REQ_CANCEL_JOB = "CancelJob"
 REQ_ARCHIVE_JOB = "ArchiveJob"
 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
+REQ_QUERY = "Query"
+REQ_QUERY_FIELDS = "QueryFields"
 REQ_QUERY_JOBS = "QueryJobs"
 REQ_QUERY_INSTANCES = "QueryInstances"
 REQ_QUERY_NODES = "QueryNodes"
 REQ_QUERY_JOBS = "QueryJobs"
 REQ_QUERY_INSTANCES = "QueryInstances"
 REQ_QUERY_NODES = "QueryNodes"
+REQ_QUERY_GROUPS = "QueryGroups"
 REQ_QUERY_EXPORTS = "QueryExports"
 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
 REQ_QUERY_TAGS = "QueryTags"
 REQ_QUERY_EXPORTS = "QueryExports"
 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
 REQ_QUERY_TAGS = "QueryTags"
+REQ_QUERY_LOCKS = "QueryLocks"
 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
 
 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
 
@@ -68,28 +76,20 @@ DEF_RWTO = 60
 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
 
 
 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
 
 
-class ProtocolError(Exception):
-  """Denotes an error in the server communication"""
+class ProtocolError(errors.LuxiError):
+  """Denotes an error in the LUXI protocol."""
 
 
 class ConnectionClosedError(ProtocolError):
 
 
 class ConnectionClosedError(ProtocolError):
-  """Connection closed error"""
+  """Connection closed error."""
 
 
 class TimeoutError(ProtocolError):
 
 
 class TimeoutError(ProtocolError):
-  """Operation timeout error"""
-
-
-class EncodingError(ProtocolError):
-  """Encoding failure on the sending side"""
-
-
-class DecodingError(ProtocolError):
-  """Decoding failure on the receiving side"""
+  """Operation timeout error."""
 
 
 class RequestError(ProtocolError):
 
 
 class RequestError(ProtocolError):
-  """Error on request
+  """Error on request.
 
   This signifies an error in the request format or request handling,
   but not (e.g.) an error in starting up an instance.
 
   This signifies an error in the request format or request handling,
   but not (e.g.) an error in starting up an instance.
@@ -102,7 +102,7 @@ class RequestError(ProtocolError):
 
 
 class NoMasterError(ProtocolError):
 
 
 class NoMasterError(ProtocolError):
-  """The master cannot be reached
+  """The master cannot be reached.
 
   This means that the master daemon is not running or the socket has
   been removed.
 
   This means that the master daemon is not running or the socket has
   been removed.
@@ -110,6 +110,14 @@ class NoMasterError(ProtocolError):
   """
 
 
   """
 
 
+class PermissionError(ProtocolError):
+  """Permission denied while connecting to the master socket.
+
+  This means the user doesn't have the proper rights.
+
+  """
+
+
 class Transport:
   """Low-level transport class.
 
 class Transport:
   """Low-level transport class.
 
@@ -175,9 +183,12 @@ class Transport:
     except socket.timeout, err:
       raise TimeoutError("Connect timed out: %s" % str(err))
     except socket.error, err:
     except socket.timeout, err:
       raise TimeoutError("Connect timed out: %s" % str(err))
     except socket.error, err:
-      if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
+      error_code = err.args[0]
+      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
         raise NoMasterError(address)
         raise NoMasterError(address)
-      if err.args[0] == errno.EAGAIN:
+      elif error_code in (errno.EPERM, errno.EACCES):
+        raise PermissionError(address)
+      elif error_code == errno.EAGAIN:
         # Server's socket backlog is full at the moment
         raise utils.RetryAgain()
       raise
         # Server's socket backlog is full at the moment
         raise utils.RetryAgain()
       raise
@@ -196,7 +207,8 @@ class Transport:
 
     """
     if constants.LUXI_EOM in msg:
 
     """
     if constants.LUXI_EOM in msg:
-      raise EncodingError("Message terminator found in payload")
+      raise ProtocolError("Message terminator found in payload")
+
     self._CheckSocket()
     try:
       # TODO: sendall is not guaranteed to send everything
     self._CheckSocket()
     try:
       # TODO: sendall is not guaranteed to send everything
@@ -221,12 +233,12 @@ class Transport:
       while True:
         try:
           data = self.socket.recv(4096)
       while True:
         try:
           data = self.socket.recv(4096)
+        except socket.timeout, err:
+          raise TimeoutError("Receive timeout: %s" % str(err))
         except socket.error, err:
           if err.args and err.args[0] == errno.EAGAIN:
             continue
           raise
         except socket.error, err:
           if err.args and err.args[0] == errno.EAGAIN:
             continue
           raise
-        except socket.timeout, err:
-          raise TimeoutError("Receive timeout: %s" % str(err))
         break
       if not data:
         raise ConnectionClosedError("Connection closed while reading")
         break
       if not data:
         raise ConnectionClosedError("Connection closed while reading")
@@ -251,6 +263,114 @@ class Transport:
       self.socket = None
 
 
       self.socket = None
 
 
+def ParseRequest(msg):
+  """Parses a LUXI request message.
+
+  """
+  try:
+    request = serializer.LoadJson(msg)
+  except ValueError, err:
+    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
+
+  logging.debug("LUXI request: %s", request)
+
+  if not isinstance(request, dict):
+    logging.error("LUXI request not a dict: %r", msg)
+    raise ProtocolError("Invalid LUXI request (not a dict)")
+
+  method = request.get(KEY_METHOD, None) # pylint: disable=E1103
+  args = request.get(KEY_ARGS, None) # pylint: disable=E1103
+  version = request.get(KEY_VERSION, None) # pylint: disable=E1103
+
+  if method is None or args is None:
+    logging.error("LUXI request missing method or arguments: %r", msg)
+    raise ProtocolError(("Invalid LUXI request (no method or arguments"
+                         " in request): %r") % msg)
+
+  return (method, args, version)
+
+
+def ParseResponse(msg):
+  """Parses a LUXI response message.
+
+  """
+  # Parse the result
+  try:
+    data = serializer.LoadJson(msg)
+  except KeyboardInterrupt:
+    raise
+  except Exception, err:
+    raise ProtocolError("Error while deserializing response: %s" % str(err))
+
+  # Validate response
+  if not (isinstance(data, dict) and
+          KEY_SUCCESS in data and
+          KEY_RESULT in data):
+    raise ProtocolError("Invalid response from server: %r" % data)
+
+  return (data[KEY_SUCCESS], data[KEY_RESULT],
+          data.get(KEY_VERSION, None)) # pylint: disable=E1103
+
+
+def FormatResponse(success, result, version=None):
+  """Formats a LUXI response message.
+
+  """
+  response = {
+    KEY_SUCCESS: success,
+    KEY_RESULT: result,
+    }
+
+  if version is not None:
+    response[KEY_VERSION] = version
+
+  logging.debug("LUXI response: %s", response)
+
+  return serializer.DumpJson(response)
+
+
+def FormatRequest(method, args, version=None):
+  """Formats a LUXI request message.
+
+  """
+  # Build request
+  request = {
+    KEY_METHOD: method,
+    KEY_ARGS: args,
+    }
+
+  if version is not None:
+    request[KEY_VERSION] = version
+
+  # Serialize the request
+  return serializer.DumpJson(request, indent=False)
+
+
+def CallLuxiMethod(transport_cb, method, args, version=None):
+  """Send a LUXI request via a transport and return the response.
+
+  """
+  assert callable(transport_cb)
+
+  request_msg = FormatRequest(method, args, version=version)
+
+  # Send request and wait for response
+  response_msg = transport_cb(request_msg)
+
+  (success, result, resp_version) = ParseResponse(response_msg)
+
+  # Verify version if there was one in the response
+  if resp_version is not None and resp_version != version:
+    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
+                           (version, resp_version))
+
+  if success:
+    return result
+
+  errors.MaybeRaise(result)
+  raise RequestError(result)
+
+
 class Client(object):
   """High-level client implementation.
 
 class Client(object):
   """High-level client implementation.
 
@@ -297,55 +417,39 @@ class Client(object):
       old_transp = self.transport
       self.transport = None
       old_transp.Close()
       old_transp = self.transport
       self.transport = None
       old_transp.Close()
-    except Exception: # pylint: disable-msg=W0703
+    except Exception: # pylint: disable=W0703
       pass
 
       pass
 
-  def CallMethod(self, method, args):
-    """Send a generic request and return the response.
-
-    """
-    # Build request
-    request = {
-      KEY_METHOD: method,
-      KEY_ARGS: args,
-      }
-
-    # Serialize the request
-    send_data = serializer.DumpJson(request, indent=False)
-
+  def _SendMethodCall(self, data):
     # Send request and wait for response
     try:
       self._InitTransport()
     # Send request and wait for response
     try:
       self._InitTransport()
-      result = self.transport.Call(send_data)
+      return self.transport.Call(data)
     except Exception:
       self._CloseTransport()
       raise
 
     except Exception:
       self._CloseTransport()
       raise
 
-    # Parse the result
-    try:
-      data = serializer.LoadJson(result)
-    except Exception, err:
-      raise ProtocolError("Error while deserializing response: %s" % str(err))
-
-    # Validate response
-    if (not isinstance(data, dict) or
-        KEY_SUCCESS not in data or
-        KEY_RESULT not in data):
-      raise DecodingError("Invalid response from server: %s" % str(data))
+  def Close(self):
+    """Close the underlying connection.
 
 
-    result = data[KEY_RESULT]
+    """
+    self._CloseTransport()
 
 
-    if not data[KEY_SUCCESS]:
-      errors.MaybeRaise(result)
-      raise RequestError(result)
+  def CallMethod(self, method, args):
+    """Send a generic request and return the response.
 
 
-    return result
+    """
+    if not isinstance(args, (list, tuple)):
+      raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
+                                   " expected list, got %s" % type(args))
+    return CallLuxiMethod(self._SendMethodCall, method, args,
+                          version=constants.LUXI_VERSION)
 
   def SetQueueDrainFlag(self, drain_flag):
 
   def SetQueueDrainFlag(self, drain_flag):
-    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
+    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, (drain_flag, ))
 
   def SetWatcherPause(self, until):
 
   def SetWatcherPause(self, until):
-    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
+    return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
 
   def SubmitJob(self, ops):
     ops_state = map(lambda op: op.__getstate__(), ops)
 
   def SubmitJob(self, ops):
     ops_state = map(lambda op: op.__getstate__(), ops)
@@ -358,10 +462,10 @@ class Client(object):
     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
 
   def CancelJob(self, job_id):
     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
 
   def CancelJob(self, job_id):
-    return self.CallMethod(REQ_CANCEL_JOB, job_id)
+    return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
 
   def ArchiveJob(self, job_id):
 
   def ArchiveJob(self, job_id):
-    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
+    return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
 
   def AutoArchiveJobs(self, age):
     timeout = (DEF_RWTO - 1) / 2
 
   def AutoArchiveJobs(self, age):
     timeout = (DEF_RWTO - 1) / 2
@@ -398,6 +502,32 @@ class Client(object):
         break
     return result
 
         break
     return result
 
+  def Query(self, what, fields, qfilter):
+    """Query for resources/items.
+
+    @param what: One of L{constants.QR_VIA_LUXI}
+    @type fields: List of strings
+    @param fields: List of requested fields
+    @type qfilter: None or list
+    @param qfilter: Query filter
+    @rtype: L{objects.QueryResponse}
+
+    """
+    result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
+    return objects.QueryResponse.FromDict(result)
+
+  def QueryFields(self, what, fields):
+    """Query for available fields.
+
+    @param what: One of L{constants.QR_VIA_LUXI}
+    @type fields: None or list of strings
+    @param fields: List of requested fields
+    @rtype: L{objects.QueryFieldsResponse}
+
+    """
+    result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
+    return objects.QueryFieldsResponse.FromDict(result)
+
   def QueryJobs(self, job_ids, fields):
     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
 
   def QueryJobs(self, job_ids, fields):
     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
 
@@ -407,6 +537,9 @@ class Client(object):
   def QueryNodes(self, names, fields, use_locking):
     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
 
   def QueryNodes(self, names, fields, use_locking):
     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
 
+  def QueryGroups(self, names, fields, use_locking):
+    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
+
   def QueryExports(self, nodes, use_locking):
     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
 
   def QueryExports(self, nodes, use_locking):
     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
 
@@ -414,7 +547,12 @@ class Client(object):
     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
 
   def QueryConfigValues(self, fields):
     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
 
   def QueryConfigValues(self, fields):
-    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
+    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
 
   def QueryTags(self, kind, name):
     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
 
   def QueryTags(self, kind, name):
     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
+
+  def QueryLocks(self, fields, sync):
+    warnings.warn("This LUXI call is deprecated and will be removed, use"
+                  " Query(\"%s\", ...) instead" % constants.QR_LOCK)
+    return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))