- self._cpu = None
-
- def _getcpu(self):
- if self._cpu is None:
- self._cpu = mcpu.Processor(lambda x: None)
- return self._cpu
-
- def handle_request(self, operation, args):
- print operation, args
- if operation == "submit":
- return self.put(args)
- elif operation == "query":
- return self.query(args)
- else:
- raise ValueError("Invalid operation")
-
- def put(self, args):
- job = luxi.UnserializeJob(args)
- rid = self.server.queue.put(job)
- return rid
-
- def query(self, args):
- path = args["object"]
- fields = args["fields"]
- names = args["names"]
- if path == "instances":
- opclass = opcodes.OpQueryInstances
- elif path == "jobs":
- # early exit because job query-ing is special (not via opcodes)
- return self.query_jobs(fields, names)
+
+ def handle_request(self, method, args):
+ queue = self.server.context.jobqueue
+
+ # TODO: Parameter validation
+
+ if method == luxi.REQ_SUBMIT_JOB:
+ logging.info("Received new job")
+ ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
+ return queue.SubmitJob(ops)
+
+ if method == luxi.REQ_SUBMIT_MANY_JOBS:
+ logging.info("Received multiple jobs")
+ jobs = []
+ for ops in args:
+ jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
+ return queue.SubmitManyJobs(jobs)
+
+ elif method == luxi.REQ_CANCEL_JOB:
+ job_id = args
+ logging.info("Received job cancel request for %s", job_id)
+ return queue.CancelJob(job_id)
+
+ elif method == luxi.REQ_ARCHIVE_JOB:
+ job_id = args
+ logging.info("Received job archive request for %s", job_id)
+ return queue.ArchiveJob(job_id)
+
+ elif method == luxi.REQ_AUTOARCHIVE_JOBS:
+ (age, timeout) = args
+ logging.info("Received job autoarchive request for age %s, timeout %s",
+ age, timeout)
+ return queue.AutoArchiveJobs(age, timeout)
+
+ elif method == luxi.REQ_WAIT_FOR_JOB_CHANGE:
+ (job_id, fields, prev_job_info, prev_log_serial, timeout) = args
+ logging.info("Received job poll request for %s", job_id)
+ return queue.WaitForJobChanges(job_id, fields, prev_job_info,
+ prev_log_serial, timeout)
+
+ elif method == luxi.REQ_QUERY_JOBS:
+ (job_ids, fields) = args
+ if isinstance(job_ids, (tuple, list)) and job_ids:
+ msg = ", ".join(job_ids)
+ else:
+ msg = str(job_ids)
+ logging.info("Received job query request for %s", msg)
+ return queue.QueryJobs(job_ids, fields)
+
+ elif method == luxi.REQ_QUERY_INSTANCES:
+ (names, fields, use_locking) = args
+ logging.info("Received instance query request for %s", names)
+ if use_locking:
+ raise errors.OpPrereqError("Sync queries are not allowed")
+ op = opcodes.OpQueryInstances(names=names, output_fields=fields,
+ use_locking=use_locking)
+ return self._Query(op)
+
+ elif method == luxi.REQ_QUERY_NODES:
+ (names, fields, use_locking) = args
+ logging.info("Received node query request for %s", names)
+ if use_locking:
+ raise errors.OpPrereqError("Sync queries are not allowed")
+ op = opcodes.OpQueryNodes(names=names, output_fields=fields,
+ use_locking=use_locking)
+ return self._Query(op)
+
+ elif method == luxi.REQ_QUERY_EXPORTS:
+ nodes, use_locking = args
+ if use_locking:
+ raise errors.OpPrereqError("Sync queries are not allowed")
+ logging.info("Received exports query request")
+ op = opcodes.OpQueryExports(nodes=nodes, use_locking=use_locking)
+ return self._Query(op)
+
+ elif method == luxi.REQ_QUERY_CONFIG_VALUES:
+ fields = args
+ logging.info("Received config values query request for %s", fields)
+ op = opcodes.OpQueryConfigValues(output_fields=fields)
+ return self._Query(op)
+
+ elif method == luxi.REQ_QUERY_CLUSTER_INFO:
+ logging.info("Received cluster info query request")
+ op = opcodes.OpQueryClusterInfo()
+ return self._Query(op)
+
+ elif method == luxi.REQ_QUEUE_SET_DRAIN_FLAG:
+ drain_flag = args
+ logging.info("Received queue drain flag change request to %s",
+ drain_flag)
+ return queue.SetDrainFlag(drain_flag)
+