- data = serializer.LoadJson(result)
- except Exception, err:
- raise ProtocolError("Error while deserializing response: %s" % str(err))
-
- # Validate response
- if (not isinstance(data, dict) or
- KEY_SUCCESS not in data or
- KEY_RESULT not in data):
- raise DecodingError("Invalid response from server: %s" % str(data))
-
- if not data[KEY_SUCCESS]:
- # TODO: decide on a standard exception
- raise RequestError(data[KEY_RESULT])
-
- return data[KEY_RESULT]
-
- def SubmitJob(self, job):
- """Submit a job"""
- return self.CallMethod(REQ_SUBMIT, SerializeJob(job))
-
- def Query(self, data):
- """Make a query"""
- result = self.CallMethod(REQ_QUERY, data)
- if data["object"] == "jobs":
- # custom job processing of query values
- for row in result:
- for idx, field in enumerate(data["fields"]):
- if field == "op_list":
- row[idx] = [opcodes.OpCode.LoadOpCode(i) for i in row[idx]]
+ self._InitTransport()
+ return self.transport.Call(data)
+ except Exception:
+ self._CloseTransport()
+ raise
+
+ def CallMethod(self, method, args):
+ """Send a generic request and return the response.
+
+ """
+ return CallLuxiMethod(self._SendMethodCall, method, args)
+
+ def SetQueueDrainFlag(self, drain_flag):
+ return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
+
+ def SetWatcherPause(self, until):
+ return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
+
+ def SubmitJob(self, ops):
+ ops_state = map(lambda op: op.__getstate__(), ops)
+ return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
+
+ def SubmitManyJobs(self, jobs):
+ jobs_state = []
+ for ops in jobs:
+ jobs_state.append([op.__getstate__() for op in ops])
+ return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
+
+ def CancelJob(self, job_id):
+ return self.CallMethod(REQ_CANCEL_JOB, job_id)
+
+ def ArchiveJob(self, job_id):
+ return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
+
+ def AutoArchiveJobs(self, age):
+ timeout = (DEF_RWTO - 1) / 2
+ return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
+
+ def WaitForJobChangeOnce(self, job_id, fields,
+ prev_job_info, prev_log_serial,
+ timeout=WFJC_TIMEOUT):
+ """Waits for changes on a job.
+
+ @param job_id: Job ID
+ @type fields: list
+ @param fields: List of field names to be observed
+ @type prev_job_info: None or list
+ @param prev_job_info: Previously received job information
+ @type prev_log_serial: None or int/long
+ @param prev_log_serial: Highest log serial number previously received
+ @type timeout: int/float
+ @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
+ be capped to that value)
+
+ """
+ assert timeout >= 0, "Timeout can not be negative"
+ return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
+ (job_id, fields, prev_job_info,
+ prev_log_serial,
+ min(WFJC_TIMEOUT, timeout)))
+
+ def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
+ while True:
+ result = self.WaitForJobChangeOnce(job_id, fields,
+ prev_job_info, prev_log_serial)
+ if result != constants.JOB_NOTCHANGED:
+ break