"""Module for the unix socket protocol
-This module implements the local unix socket protocl. You only need
+This module implements the local unix socket protocol. You only need
this module and the opcodes module in the client program in order to
communicate with the master.
from ganeti import serializer
from ganeti import constants
from ganeti import errors
+from ganeti import utils
KEY_METHOD = 'method'
KEY_RESULT = "result"
REQ_SUBMIT_JOB = "SubmitJob"
+REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
REQ_CANCEL_JOB = "CancelJob"
REQ_ARCHIVE_JOB = "ArchiveJob"
REQ_QUERY_NODES = "QueryNodes"
REQ_QUERY_EXPORTS = "QueryExports"
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
+REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
+REQ_QUERY_TAGS = "QueryTags"
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
+REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
DEF_CTMO = 10
DEF_RWTO = 60
try:
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- self.socket.settimeout(self._ctimeout)
+
+ # Try to connect
try:
- self.socket.connect(address)
- except socket.timeout, err:
- raise TimeoutError("Connect timed out: %s" % str(err))
- except socket.error, err:
- if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
- raise NoMasterError((address,))
- raise
+ utils.Retry(self._Connect, 1.0, self._ctimeout,
+ args=(self.socket, address, self._ctimeout))
+ except utils.RetryTimeout:
+ raise TimeoutError("Connect timed out")
+
self.socket.settimeout(self._rwtimeout)
except (socket.error, NoMasterError):
if self.socket is not None:
self.socket = None
raise
+ @staticmethod
+ def _Connect(sock, address, timeout):
+ sock.settimeout(timeout)
+ try:
+ sock.connect(address)
+ except socket.timeout, err:
+ raise TimeoutError("Connect timed out: %s" % str(err))
+ except socket.error, err:
+ if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
+ raise NoMasterError(address)
+ if err.args[0] == errno.EAGAIN:
+ # Server's socket backlog is full at the moment
+ raise utils.RetryAgain()
+ raise
+
def _CheckSocket(self):
"""Make sure we are connected.
raise EncodingError("Message terminator found in payload")
self._CheckSocket()
try:
+ # TODO: sendall is not guaranteed to send everything
self.socket.sendall(msg + self.eom)
except socket.timeout, err:
raise TimeoutError("Sending timeout: %s" % str(err))
def Recv(self):
- """Try to receive a messae from the socket.
+ """Try to receive a message from the socket.
In case we already have messages queued, we just return from the
queue. Otherwise, we try to read data with a _rwtimeout network
while not self._msgs:
if time.time() > etime:
raise TimeoutError("Extended receive timeout")
- try:
- data = self.socket.recv(4096)
- except socket.timeout, err:
- raise TimeoutError("Receive timeout: %s" % str(err))
+ while True:
+ try:
+ data = self.socket.recv(4096)
+ except socket.error, err:
+ if err.args and err.args[0] == errno.EAGAIN:
+ continue
+ raise
+ except socket.timeout, err:
+ raise TimeoutError("Receive timeout: %s" % str(err))
+ break
if not data:
raise ConnectionClosedError("Connection closed while reading")
new_msgs = (self._buffer + data).split(self.eom)
"""
if address is None:
address = constants.MASTER_SOCKET
- self.transport = transport(address, timeouts=timeouts)
+ self.address = address
+ self.timeouts = timeouts
+ self.transport_class = transport
+ self.transport = None
+ self._InitTransport()
+
+ def _InitTransport(self):
+ """(Re)initialize the transport if needed.
+
+ """
+ if self.transport is None:
+ self.transport = self.transport_class(self.address,
+ timeouts=self.timeouts)
+
+ def _CloseTransport(self):
+ """Close the transport, ignoring errors.
+
+ """
+ if self.transport is None:
+ return
+ try:
+ old_transp = self.transport
+ self.transport = None
+ old_transp.Close()
+ except Exception: # pylint: disable-msg=W0703
+ pass
def CallMethod(self, method, args):
"""Send a generic request and return the response.
KEY_ARGS: args,
}
+ # Serialize the request
+ send_data = serializer.DumpJson(request, indent=False)
+
# Send request and wait for response
- result = self.transport.Call(serializer.DumpJson(request, indent=False))
+ try:
+ self._InitTransport()
+ result = self.transport.Call(send_data)
+ except Exception:
+ self._CloseTransport()
+ raise
+
+ # Parse the result
try:
data = serializer.LoadJson(result)
except Exception, err:
result = data[KEY_RESULT]
if not data[KEY_SUCCESS]:
- # TODO: decide on a standard exception
- if (isinstance(result, (tuple, list)) and len(result) == 2 and
- isinstance(result[1], (tuple, list))):
- # custom ganeti errors
- err_class = errors.GetErrorClass(result[0])
- if err_class is not None:
- raise err_class, tuple(result[1])
-
+ errors.MaybeRaise(result)
raise RequestError(result)
return result
def SetQueueDrainFlag(self, drain_flag):
return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
+ def SetWatcherPause(self, until):
+ return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
+
def SubmitJob(self, ops):
ops_state = map(lambda op: op.__getstate__(), ops)
return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
+ def SubmitManyJobs(self, jobs):
+ jobs_state = []
+ for ops in jobs:
+ jobs_state.append([op.__getstate__() for op in ops])
+ return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
+
def CancelJob(self, job_id):
return self.CallMethod(REQ_CANCEL_JOB, job_id)
timeout = (DEF_RWTO - 1) / 2
return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
- def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
+ def WaitForJobChangeOnce(self, job_id, fields,
+ prev_job_info, prev_log_serial):
timeout = (DEF_RWTO - 1) / 2
+ return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
+ (job_id, fields, prev_job_info,
+ prev_log_serial, timeout))
+
+ def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
while True:
- result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
- (job_id, fields, prev_job_info,
- prev_log_serial, timeout))
+ result = self.WaitForJobChangeOnce(job_id, fields,
+ prev_job_info, prev_log_serial)
if result != constants.JOB_NOTCHANGED:
break
return result
def QueryJobs(self, job_ids, fields):
return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
- def QueryInstances(self, names, fields):
- return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields))
+ def QueryInstances(self, names, fields, use_locking):
+ return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
+
+ def QueryNodes(self, names, fields, use_locking):
+ return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
- def QueryNodes(self, names, fields):
- return self.CallMethod(REQ_QUERY_NODES, (names, fields))
+ def QueryExports(self, nodes, use_locking):
+ return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
- def QueryExports(self, nodes):
- return self.CallMethod(REQ_QUERY_EXPORTS, nodes)
+ def QueryClusterInfo(self):
+ return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
def QueryConfigValues(self, fields):
return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
-
-# TODO: class Server(object)
+ def QueryTags(self, kind, name):
+ return self.CallMethod(REQ_QUERY_TAGS, (kind, name))