"""Module for the unix socket protocol
-This module implements the local unix socket protocl. You only need
+This module implements the local unix socket protocol. You only need
this module and the opcodes module in the client program in order to
communicate with the master.
-The module is also be used by the master daemon.
+The module is also used by the master daemon.
"""
KEY_RESULT = "result"
REQ_SUBMIT_JOB = "SubmitJob"
+REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
REQ_CANCEL_JOB = "CancelJob"
REQ_ARCHIVE_JOB = "ArchiveJob"
REQ_QUERY_NODES = "QueryNodes"
REQ_QUERY_EXPORTS = "QueryExports"
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
+REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
DEF_CTMO = 10
"""
if address is None:
address = constants.MASTER_SOCKET
- self.transport = transport(address, timeouts=timeouts)
+ self.address = address
+ self.timeouts = timeouts
+ self.transport_class = transport
+ self.transport = None
+ self._InitTransport()
+
+ def _InitTransport(self):
+ """(Re)initialize the transport if needed.
+
+ """
+ if self.transport is None:
+ self.transport = self.transport_class(self.address,
+ timeouts=self.timeouts)
+
+ def _CloseTransport(self):
+ """Close the transport, ignoring errors.
+
+ """
+ if self.transport is None:
+ return
+ try:
+ old_transp = self.transport
+ self.transport = None
+ old_transp.Close()
+ except Exception, err:
+ pass
def CallMethod(self, method, args):
"""Send a generic request and return the response.
KEY_ARGS: args,
}
+ # Serialize the request
+ send_data = serializer.DumpJson(request, indent=False)
+
# Send request and wait for response
- result = self.transport.Call(serializer.DumpJson(request, indent=False))
+ try:
+ self._InitTransport()
+ result = self.transport.Call(send_data)
+ except Exception:
+ self._CloseTransport()
+ raise
+
+ # Parse the result
try:
data = serializer.LoadJson(result)
except Exception, err:
ops_state = map(lambda op: op.__getstate__(), ops)
return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
+ def SubmitManyJobs(self, jobs):
+ jobs_state = []
+ for ops in jobs:
+ jobs_state.append([op.__getstate__() for op in ops])
+ return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
+
def CancelJob(self, job_id):
return self.CallMethod(REQ_CANCEL_JOB, job_id)
return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
def AutoArchiveJobs(self, age):
- return self.CallMethod(REQ_AUTOARCHIVE_JOBS, age)
+ timeout = (DEF_RWTO - 1) / 2
+ return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
timeout = (DEF_RWTO - 1) / 2
def QueryJobs(self, job_ids, fields):
return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
- def QueryInstances(self, names, fields):
- return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields))
+ def QueryInstances(self, names, fields, use_locking):
+ return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
+
+ def QueryNodes(self, names, fields, use_locking):
+ return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
- def QueryNodes(self, names, fields):
- return self.CallMethod(REQ_QUERY_NODES, (names, fields))
+ def QueryExports(self, nodes, use_locking):
+ return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
- def QueryExports(self, nodes):
- return self.CallMethod(REQ_QUERY_EXPORTS, nodes)
+ def QueryClusterInfo(self):
+ return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
def QueryConfigValues(self, fields):
return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)