"""Class holding high-level client operations."""
def __init__(self, server):
self.server = server
- self._cpu = None
-
- def _getcpu(self):
- if self._cpu is None:
- self._cpu = mcpu.Processor(lambda x: None)
- return self._cpu
-
- def handle_request(self, operation, args):
- print operation, args
- if operation == "submit":
- return self.put(args)
- elif operation == "query":
- return self.query(args)
- else:
- raise ValueError("Invalid operation")
- def put(self, args):
- job = luxi.UnserializeJob(args)
- rid = self.server.queue.put(job)
- return rid
-
- def query(self, args):
- path = args["object"]
- fields = args["fields"]
- names = args["names"]
- if path == "instances":
- opclass = opcodes.OpQueryInstances
- elif path == "jobs":
- # early exit because job query-ing is special (not via opcodes)
- return self.query_jobs(fields, names)
- else:
- raise ValueError("Invalid object %s" % path)
+ def handle_request(self, method, args):
+ queue = self.server.jobqueue
+
+ # TODO: Parameter validation
+
+ if method == luxi.REQ_SUBMIT_JOB:
+ ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
+ return queue.SubmitJob(ops)
- op = opclass(output_fields = fields, names=names)
- cpu = self._getcpu()
- result = cpu.ExecOpCode(op)
- return result
+ elif method == luxi.REQ_CANCEL_JOB:
+ (job_id, ) = args
+ return queue.CancelJob(job_id)
- def query_jobs(self, fields, names):
- return self.server.queue.query_jobs(fields, names)
+ elif method == luxi.REQ_ARCHIVE_JOB:
+ (job_id, ) = args
+ return queue.ArchiveJob(job_id)
+
+ elif method == luxi.REQ_QUERY_JOBS:
+ (job_ids, fields) = args
+ return queue.QueryJobs(job_ids, fields)
+
+ else:
+ raise ValueError("Invalid operation")
def JobRunner(proc, job, context):
Option, OptionValueError, SUPPRESS_HELP)
__all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
- "SubmitOpCode", "SubmitJob", "SubmitQuery",
+ "SubmitOpCode",
"cli_option", "GenerateTable", "AskUser",
"ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
"USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT",
def SubmitOpCode(op, proc=None, feedback_fn=None):
- """Function to submit an opcode.
+ """Legacy function to submit an opcode.
This is just a simple wrapper over the construction of the processor
instance. It should be extended to better handle feedback and
"""
# TODO: Fix feedback_fn situation.
cl = luxi.Client()
- job = opcodes.Job(op_list=[op])
- jid = SubmitJob(job, cl)
- query = {
- "object": "jobs",
- "fields": ["status"],
- "names": [jid],
- }
+ job_id = cl.SubmitJob([op])
while True:
- jdata = SubmitQuery(query, cl)
- if not jdata:
+ jobs = cl.QueryJobs([job_id], ["status"])
+ if not jobs:
# job not found, go away!
- raise errors.JobLost("Job with id %s lost" % jid)
+ raise errors.JobLost("Job with id %s lost" % job_id)
- status = jdata[0][0]
- if status in (opcodes.Job.STATUS_SUCCESS, opcodes.Job.STATUS_FAIL):
+ # TODO: Handle canceled and archived jobs
+ status = jobs[0][0]
+ if status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR):
break
time.sleep(1)
- query["fields"].extend(["op_list", "op_status", "op_result"])
- jdata = SubmitQuery(query, cl)
- if not jdata:
- raise errors.JobLost("Job with id %s lost" % jid)
- status, op_list, op_status, op_result = jdata[0]
- if status != opcodes.Job.STATUS_SUCCESS:
- raise errors.OpExecError(op_result[0])
- return op_result[0]
+ jobs = cl.QueryJobs([job_id], ["status", "result"])
+ if not jobs:
+ raise errors.JobLost("Job with id %s lost" % job_id)
-
-def SubmitJob(job, cl=None):
- if cl is None:
- cl = luxi.Client()
- return cl.SubmitJob(job)
-
-
-def SubmitQuery(data, cl=None):
- if cl is None:
- cl = luxi.Client()
- return cl.Query(data)
+ status, result = jobs[0]
+ if status == constants.JOB_STATUS_SUCCESS:
+ return result[0]
+ else:
+ raise errors.OpExecError(result)
def FormatError(err):
KEY_SUCCESS = "success"
KEY_RESULT = "result"
-REQ_SUBMIT = 'submit'
-REQ_ABORT = 'abort'
-REQ_QUERY = 'query'
+REQ_SUBMIT_JOB = "SubmitJob"
+REQ_CANCEL_JOB = "CancelJob"
+REQ_ARCHIVE_JOB = "ArchiveJob"
+REQ_QUERY_JOBS = "QueryJobs"
DEF_CTMO = 10
DEF_RWTO = 60
return data[KEY_RESULT]
- def SubmitJob(self, job):
- """Submit a job"""
- return self.CallMethod(REQ_SUBMIT, SerializeJob(job))
-
- def Query(self, data):
- """Make a query"""
- result = self.CallMethod(REQ_QUERY, data)
- if data["object"] == "jobs":
- # custom job processing of query values
- for row in result:
- for idx, field in enumerate(data["fields"]):
- if field == "op_list":
- row[idx] = [opcodes.OpCode.LoadOpCode(i) for i in row[idx]]
- return result
+ def SubmitJob(self, ops):
+ ops_state = map(lambda op: op.__getstate__(), ops)
+ return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
+
+ def CancelJob(self, job_id):
+ return self.CallMethod(REQ_CANCEL_JOB, job_id)
+
+ def ArchiveJob(self, job_id):
+ return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
+
+ def QueryJobs(self, job_ids, fields):
+ return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
# TODO: class Server(object)