+ self._InitTransport()
+ return self.transport.Call(data)
+ except Exception:
+ self._CloseTransport()
+ raise
+
+ def Close(self):
+ """Close the underlying connection.
+
+ """
+ self._CloseTransport()
+
+ def CallMethod(self, method, args):
+ """Send a generic request and return the response.
+
+ """
+ return CallLuxiMethod(self._SendMethodCall, method, args,
+ version=constants.LUXI_VERSION)
+
+ def SetQueueDrainFlag(self, drain_flag):
+ return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
+
+ def SetWatcherPause(self, until):
+ return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
+
+ def SubmitJob(self, ops):
+ ops_state = map(lambda op: op.__getstate__(), ops)
+ return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
+
+ def SubmitManyJobs(self, jobs):
+ jobs_state = []
+ for ops in jobs:
+ jobs_state.append([op.__getstate__() for op in ops])
+ return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
+
+ def CancelJob(self, job_id):
+ return self.CallMethod(REQ_CANCEL_JOB, job_id)
+
+ def ArchiveJob(self, job_id):
+ return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
+
+ def AutoArchiveJobs(self, age):
+ timeout = (DEF_RWTO - 1) / 2
+ return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
+
+ def WaitForJobChangeOnce(self, job_id, fields,
+ prev_job_info, prev_log_serial,
+ timeout=WFJC_TIMEOUT):
+ """Waits for changes on a job.
+
+ @param job_id: Job ID
+ @type fields: list
+ @param fields: List of field names to be observed
+ @type prev_job_info: None or list
+ @param prev_job_info: Previously received job information
+ @type prev_log_serial: None or int/long
+ @param prev_log_serial: Highest log serial number previously received
+ @type timeout: int/float
+ @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
+ be capped to that value)
+
+ """
+ assert timeout >= 0, "Timeout can not be negative"
+ return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
+ (job_id, fields, prev_job_info,
+ prev_log_serial,
+ min(WFJC_TIMEOUT, timeout)))
+
+ def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
+ while True:
+ result = self.WaitForJobChangeOnce(job_id, fields,
+ prev_job_info, prev_log_serial)
+ if result != constants.JOB_NOTCHANGED:
+ break
+ return result
+
+ def Query(self, what, fields, filter_):
+ """Query for resources/items.
+
+ @param what: One of L{constants.QR_OP_LUXI}
+ @type fields: List of strings
+ @param fields: List of requested fields
+ @type filter_: None or list
+ @param filter_: Query filter
+ @rtype: L{objects.QueryResponse}
+
+ """
+ req = objects.QueryRequest(what=what, fields=fields, filter=filter_)
+ result = self.CallMethod(REQ_QUERY, req.ToDict())
+ return objects.QueryResponse.FromDict(result)
+
+ def QueryFields(self, what, fields):
+ """Query for available fields.
+
+ @param what: One of L{constants.QR_OP_LUXI}
+ @type fields: None or list of strings
+ @param fields: List of requested fields
+ @rtype: L{objects.QueryFieldsResponse}
+
+ """
+ req = objects.QueryFieldsRequest(what=what, fields=fields)
+ result = self.CallMethod(REQ_QUERY_FIELDS, req.ToDict())
+ return objects.QueryFieldsResponse.FromDict(result)
+
+ def QueryJobs(self, job_ids, fields):
+ return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
+
+ def QueryInstances(self, names, fields, use_locking):
+ return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
+
+ def QueryNodes(self, names, fields, use_locking):
+ return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
+
+ def QueryGroups(self, names, fields, use_locking):
+ return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
+
+ def QueryExports(self, nodes, use_locking):
+ return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
+
+ def QueryClusterInfo(self):
+ return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
+
+ def QueryConfigValues(self, fields):
+ return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
+
+ def QueryTags(self, kind, name):
+ return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
+
+ def QueryLocks(self, fields, sync):
+ warnings.warn("This LUXI call is deprecated and will be removed, use"
+ " Query(\"%s\", ...) instead" % constants.QR_LOCK)
+ return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))