"""Module for the unix socket protocol
-This module implements the local unix socket protocl. You only need
+This module implements the local unix socket protocol. You only need
this module and the opcodes module in the client program in order to
communicate with the master.
-The module is also be used by the master daemon.
+The module is also used by the master daemon.
"""
REQ_QUERY_NODES = "QueryNodes"
REQ_QUERY_EXPORTS = "QueryExports"
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
+REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
DEF_CTMO = 10
DEF_RWTO = 60
"""
if address is None:
address = constants.MASTER_SOCKET
- self.transport = transport(address, timeouts=timeouts)
+ self.address = address
+ self.timeouts = timeouts
+ self.transport_class = transport
+ self.transport = None
+ self._InitTransport()
+
+ def _InitTransport(self):
+ """(Re)initialize the transport if needed.
+
+ """
+ if self.transport is None:
+ self.transport = self.transport_class(self.address,
+ timeouts=self.timeouts)
+
+ def _CloseTransport(self):
+ """Close the transport, ignoring errors.
+
+ """
+ if self.transport is None:
+ return
+ try:
+ old_transp = self.transport
+ self.transport = None
+ old_transp.Close()
+ except Exception, err:
+ pass
def CallMethod(self, method, args):
"""Send a generic request and return the response.
KEY_ARGS: args,
}
+ # Serialize the request
+ send_data = serializer.DumpJson(request, indent=False)
+
# Send request and wait for response
- result = self.transport.Call(serializer.DumpJson(request, indent=False))
+ try:
+ self._InitTransport()
+ result = self.transport.Call(send_data)
+ except Exception:
+ self._CloseTransport()
+ raise
+
+ # Parse the result
try:
data = serializer.LoadJson(result)
except Exception, err:
return result
+ def SetQueueDrainFlag(self, drain_flag):
+ return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
+
def SubmitJob(self, ops):
ops_state = map(lambda op: op.__getstate__(), ops)
return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
def AutoArchiveJobs(self, age):
- return self.CallMethod(REQ_AUTOARCHIVE_JOBS, age)
+ timeout = (DEF_RWTO - 1) / 2
+ return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
timeout = (DEF_RWTO - 1) / 2
def QueryConfigValues(self, fields):
return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
+
# TODO: class Server(object)