Move the master socket in the ganeti run dir
[ganeti-local] / lib / luxi.py
index f99a435..e109138 100644 (file)
@@ -40,11 +40,15 @@ from ganeti import serializer
 from ganeti import constants
 
 
-KEY_REQUEST = 'request'
-KEY_DATA = 'data'
-REQ_SUBMIT = 'submit'
-REQ_ABORT = 'abort'
-REQ_QUERY = 'query'
+KEY_METHOD = 'method'
+KEY_ARGS = 'args'
+KEY_SUCCESS = "success"
+KEY_RESULT = "result"
+
+REQ_SUBMIT_JOB = "SubmitJob"
+REQ_CANCEL_JOB = "CancelJob"
+REQ_ARCHIVE_JOB = "ArchiveJob"
+REQ_QUERY_JOBS = "QueryJobs"
 
 DEF_CTMO = 10
 DEF_RWTO = 60
@@ -82,6 +86,7 @@ class RequestError(ProtocolError):
 
   """
 
+
 class NoMasterError(ProtocolError):
   """The master cannot be reached
 
@@ -91,24 +96,6 @@ class NoMasterError(ProtocolError):
   """
 
 
-def SerializeJob(job):
-  """Convert a job description to a string format.
-
-  """
-  return simplejson.dumps(job.__getstate__())
-
-
-def UnserializeJob(data):
-  """Load a job from a string format"""
-  try:
-    new_data = simplejson.loads(data)
-  except Exception, err:
-    raise DecodingError("Error while unserializing: %s" % str(err))
-  job = opcodes.Job()
-  job.__setstate__(new_data)
-  return job
-
-
 class Transport:
   """Low-level transport class.
 
@@ -261,39 +248,46 @@ class Client(object):
       address = constants.MASTER_SOCKET
     self.transport = transport(address, timeouts=timeouts)
 
-  def SendRequest(self, request, data):
+  def CallMethod(self, method, args):
     """Send a generic request and return the response.
 
     """
-    msg = {KEY_REQUEST: request, KEY_DATA: data}
-    result = self.transport.Call(serializer.DumpJson(msg, indent=False))
+    # Build request
+    request = {
+      KEY_METHOD: method,
+      KEY_ARGS: args,
+      }
+
+    # Send request and wait for response
+    result = self.transport.Call(serializer.DumpJson(request, indent=False))
     try:
       data = serializer.LoadJson(result)
     except Exception, err:
       raise ProtocolError("Error while deserializing response: %s" % str(err))
+
+    # Validate response
     if (not isinstance(data, dict) or
-        'success' not in data or
-        'result' not in data):
+        KEY_SUCCESS not in data or
+        KEY_RESULT not in data):
       raise DecodingError("Invalid response from server: %s" % str(data))
-    return data
-
-  def SubmitJob(self, job):
-    """Submit a job"""
-    result = self.SendRequest(REQ_SUBMIT, SerializeJob(job))
-    if not result['success']:
-      raise RequestError(result['result'])
-    return result['result']
-
-  def Query(self, data):
-    """Make a query"""
-    result = self.SendRequest(REQ_QUERY, data)
-    if not result['success']:
-      raise RequestError(result[result])
-    result = result['result']
-    if data["object"] == "jobs":
-      # custom job processing of query values
-      for row in result:
-        for idx, field in enumerate(data["fields"]):
-          if field == "op_list":
-            row[idx] = [opcodes.OpCode.LoadOpCode(i) for i in row[idx]]
-    return result
+
+    if not data[KEY_SUCCESS]:
+      # TODO: decide on a standard exception
+      raise RequestError(data[KEY_RESULT])
+
+    return data[KEY_RESULT]
+
+  def SubmitJob(self, ops):
+    ops_state = map(lambda op: op.__getstate__(), ops)
+    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
+
+  def CancelJob(self, job_id):
+    return self.CallMethod(REQ_CANCEL_JOB, job_id)
+
+  def ArchiveJob(self, job_id):
+    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
+
+  def QueryJobs(self, job_ids, fields):
+    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
+
+# TODO: class Server(object)