Add ConfigWriter.GetNodeGroup
[ganeti-local] / lib / luxi.py
index cdd5518..4b301e8 100644 (file)
@@ -45,6 +45,7 @@ KEY_METHOD = "method"
 KEY_ARGS = "args"
 KEY_SUCCESS = "success"
 KEY_RESULT = "result"
+KEY_VERSION = "version"
 
 REQ_SUBMIT_JOB = "SubmitJob"
 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
@@ -59,6 +60,7 @@ REQ_QUERY_EXPORTS = "QueryExports"
 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
 REQ_QUERY_TAGS = "QueryTags"
+REQ_QUERY_LOCKS = "QueryLocks"
 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
 
@@ -69,7 +71,7 @@ DEF_RWTO = 60
 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
 
 
-class ProtocolError(errors.GenericError):
+class ProtocolError(errors.LuxiError):
   """Denotes an error in the LUXI protocol."""
 
 
@@ -273,13 +275,14 @@ def ParseRequest(msg):
 
   method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
   args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
+  version = request.get(KEY_VERSION, None)
 
   if method is None or args is None:
     logging.error("LUXI request missing method or arguments: %r", msg)
     raise ProtocolError(("Invalid LUXI request (no method or arguments"
                          " in request): %r") % msg)
 
-  return (method, args)
+  return (method, args, version)
 
 
 def ParseResponse(msg):
@@ -298,10 +301,10 @@ def ParseResponse(msg):
           KEY_RESULT in data):
     raise ProtocolError("Invalid response from server: %r" % data)
 
-  return (data[KEY_SUCCESS], data[KEY_RESULT])
+  return (data[KEY_SUCCESS], data[KEY_RESULT], data.get(KEY_VERSION, None))
 
 
-def FormatResponse(success, result):
+def FormatResponse(success, result, version=None):
   """Formats a LUXI response message.
 
   """
@@ -310,12 +313,15 @@ def FormatResponse(success, result):
     KEY_RESULT: result,
     }
 
+  if version is not None:
+    response[KEY_VERSION] = version
+
   logging.debug("LUXI response: %s", response)
 
   return serializer.DumpJson(response)
 
 
-def FormatRequest(method, args):
+def FormatRequest(method, args, version=None):
   """Formats a LUXI request message.
 
   """
@@ -325,22 +331,30 @@ def FormatRequest(method, args):
     KEY_ARGS: args,
     }
 
+  if version is not None:
+    request[KEY_VERSION] = version
+
   # Serialize the request
   return serializer.DumpJson(request, indent=False)
 
 
-def CallLuxiMethod(transport_cb, method, args):
+def CallLuxiMethod(transport_cb, method, args, version=None):
   """Send a LUXI request via a transport and return the response.
 
   """
   assert callable(transport_cb)
 
-  request_msg = FormatRequest(method, args)
+  request_msg = FormatRequest(method, args, version=version)
 
   # Send request and wait for response
   response_msg = transport_cb(request_msg)
 
-  (success, result) = ParseResponse(response_msg)
+  (success, result, resp_version) = ParseResponse(response_msg)
+
+  # Verify version if there was one in the response
+  if resp_version is not None and resp_version != version:
+    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
+                           (version, resp_version))
 
   if success:
     return result
@@ -411,7 +425,8 @@ class Client(object):
     """Send a generic request and return the response.
 
     """
-    return CallLuxiMethod(self._SendMethodCall, method, args)
+    return CallLuxiMethod(self._SendMethodCall, method, args,
+                          version=constants.LUXI_VERSION)
 
   def SetQueueDrainFlag(self, drain_flag):
     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
@@ -490,3 +505,6 @@ class Client(object):
 
   def QueryTags(self, kind, name):
     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
+
+  def QueryLocks(self, fields, sync):
+    return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))