Change masterd/client RPC protocol
authorMichael Hanselmann <hansmi@google.com>
Wed, 9 Jul 2008 10:34:42 +0000 (10:34 +0000)
committerMichael Hanselmann <hansmi@google.com>
Wed, 9 Jul 2008 10:34:42 +0000 (10:34 +0000)
- Introduce abstraction class on client side
- Use constants for method names
- Adopt legacy function SubmitOpCode to use it

Reviewed-by: iustinp

daemons/ganeti-masterd
lib/cli.py
lib/luxi.py

index 7b028a7..db81b6d 100755 (executable)
@@ -225,46 +225,30 @@ class ClientOps:
   """Class holding high-level client operations."""
   def __init__(self, server):
     self.server = server
-    self._cpu = None
-
-  def _getcpu(self):
-    if self._cpu is None:
-      self._cpu = mcpu.Processor(lambda x: None)
-    return self._cpu
-
-  def handle_request(self, operation, args):
-    print operation, args
-    if operation == "submit":
-      return self.put(args)
-    elif operation == "query":
-      return self.query(args)
-    else:
-      raise ValueError("Invalid operation")
 
-  def put(self, args):
-    job = luxi.UnserializeJob(args)
-    rid = self.server.queue.put(job)
-    return rid
-
-  def query(self, args):
-    path = args["object"]
-    fields = args["fields"]
-    names = args["names"]
-    if path == "instances":
-      opclass = opcodes.OpQueryInstances
-    elif path == "jobs":
-      # early exit because job query-ing is special (not via opcodes)
-      return self.query_jobs(fields, names)
-    else:
-      raise ValueError("Invalid object %s" % path)
+  def handle_request(self, method, args):
+    queue = self.server.jobqueue
+
+    # TODO: Parameter validation
+
+    if method == luxi.REQ_SUBMIT_JOB:
+      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
+      return queue.SubmitJob(ops)
 
-    op = opclass(output_fields = fields, names=names)
-    cpu = self._getcpu()
-    result = cpu.ExecOpCode(op)
-    return result
+    elif method == luxi.REQ_CANCEL_JOB:
+      (job_id, ) = args
+      return queue.CancelJob(job_id)
 
-  def query_jobs(self, fields, names):
-    return self.server.queue.query_jobs(fields, names)
+    elif method == luxi.REQ_ARCHIVE_JOB:
+      (job_id, ) = args
+      return queue.ArchiveJob(job_id)
+
+    elif method == luxi.REQ_QUERY_JOBS:
+      (job_ids, fields) = args
+      return queue.QueryJobs(job_ids, fields)
+
+    else:
+      raise ValueError("Invalid operation")
 
 
 def JobRunner(proc, job, context):
index 7b8581f..befcb2c 100644 (file)
@@ -41,7 +41,7 @@ from optparse import (OptionParser, make_option, TitledHelpFormatter,
                       Option, OptionValueError, SUPPRESS_HELP)
 
 __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
-           "SubmitOpCode", "SubmitJob", "SubmitQuery",
+           "SubmitOpCode",
            "cli_option", "GenerateTable", "AskUser",
            "ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
            "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT",
@@ -370,7 +370,7 @@ def AskUser(text, choices=None):
 
 
 def SubmitOpCode(op, proc=None, feedback_fn=None):
-  """Function to submit an opcode.
+  """Legacy function to submit an opcode.
 
   This is just a simple wrapper over the construction of the processor
   instance. It should be extended to better handle feedback and
@@ -379,46 +379,30 @@ def SubmitOpCode(op, proc=None, feedback_fn=None):
   """
   # TODO: Fix feedback_fn situation.
   cl = luxi.Client()
-  job = opcodes.Job(op_list=[op])
-  jid = SubmitJob(job, cl)
 
-  query = {
-    "object": "jobs",
-    "fields": ["status"],
-    "names": [jid],
-    }
+  job_id = cl.SubmitJob([op])
 
   while True:
-    jdata = SubmitQuery(query, cl)
-    if not jdata:
+    jobs = cl.QueryJobs([job_id], ["status"])
+    if not jobs:
       # job not found, go away!
-      raise errors.JobLost("Job with id %s lost" % jid)
+      raise errors.JobLost("Job with id %s lost" % job_id)
 
-    status = jdata[0][0]
-    if status in (opcodes.Job.STATUS_SUCCESS, opcodes.Job.STATUS_FAIL):
+    # TODO: Handle canceled and archived jobs
+    status = jobs[0][0]
+    if status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR):
       break
     time.sleep(1)
 
-  query["fields"].extend(["op_list", "op_status", "op_result"])
-  jdata = SubmitQuery(query, cl)
-  if not jdata:
-    raise errors.JobLost("Job with id %s lost" % jid)
-  status, op_list, op_status, op_result = jdata[0]
-  if status != opcodes.Job.STATUS_SUCCESS:
-    raise errors.OpExecError(op_result[0])
-  return op_result[0]
+  jobs = cl.QueryJobs([job_id], ["status", "result"])
+  if not jobs:
+    raise errors.JobLost("Job with id %s lost" % job_id)
 
-
-def SubmitJob(job, cl=None):
-  if cl is None:
-    cl = luxi.Client()
-  return cl.SubmitJob(job)
-
-
-def SubmitQuery(data, cl=None):
-  if cl is None:
-    cl = luxi.Client()
-  return cl.Query(data)
+  status, result = jobs[0]
+  if status == constants.JOB_STATUS_SUCCESS:
+    return result[0]
+  else:
+    raise errors.OpExecError(result)
 
 
 def FormatError(err):
index 2a00431..db40fd2 100644 (file)
@@ -45,9 +45,10 @@ KEY_ARGS = 'args'
 KEY_SUCCESS = "success"
 KEY_RESULT = "result"
 
-REQ_SUBMIT = 'submit'
-REQ_ABORT = 'abort'
-REQ_QUERY = 'query'
+REQ_SUBMIT_JOB = "SubmitJob"
+REQ_CANCEL_JOB = "CancelJob"
+REQ_ARCHIVE_JOB = "ArchiveJob"
+REQ_QUERY_JOBS = "QueryJobs"
 
 DEF_CTMO = 10
 DEF_RWTO = 60
@@ -294,19 +295,17 @@ class Client(object):
 
     return data[KEY_RESULT]
 
-  def SubmitJob(self, job):
-    """Submit a job"""
-    return self.CallMethod(REQ_SUBMIT, SerializeJob(job))
-
-  def Query(self, data):
-    """Make a query"""
-    result = self.CallMethod(REQ_QUERY, data)
-    if data["object"] == "jobs":
-      # custom job processing of query values
-      for row in result:
-        for idx, field in enumerate(data["fields"]):
-          if field == "op_list":
-            row[idx] = [opcodes.OpCode.LoadOpCode(i) for i in row[idx]]
-    return result
+  def SubmitJob(self, ops):
+    ops_state = map(lambda op: op.__getstate__(), ops)
+    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
+
+  def CancelJob(self, job_id):
+    return self.CallMethod(REQ_CANCEL_JOB, job_id)
+
+  def ArchiveJob(self, job_id):
+    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
+
+  def QueryJobs(self, job_ids, fields):
+    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
 
 # TODO: class Server(object)