X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/63d96e4c4557490d21930ea6c44e1468a850da75..31155d60eed6d14d25979c41c031f746e51d3c7d:/lib/luxi.py diff --git a/lib/luxi.py b/lib/luxi.py index 753b005..9859e9b 100644 --- a/lib/luxi.py +++ b/lib/luxi.py @@ -33,14 +33,16 @@ import socket import collections import time import errno +import logging from ganeti import serializer from ganeti import constants from ganeti import errors +from ganeti import utils -KEY_METHOD = 'method' -KEY_ARGS = 'args' +KEY_METHOD = "method" +KEY_ARGS = "args" KEY_SUCCESS = "success" KEY_RESULT = "result" @@ -56,15 +58,19 @@ REQ_QUERY_NODES = "QueryNodes" REQ_QUERY_EXPORTS = "QueryExports" REQ_QUERY_CONFIG_VALUES = "QueryConfigValues" REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo" +REQ_QUERY_TAGS = "QueryTags" REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag" REQ_SET_WATCHER_PAUSE = "SetWatcherPause" DEF_CTMO = 10 DEF_RWTO = 60 +# WaitForJobChange timeout +WFJC_TIMEOUT = (DEF_RWTO - 1) / 2 -class ProtocolError(Exception): - """Denotes an error in the server communication""" + +class ProtocolError(errors.GenericError): + """Denotes an error in the LUXI protocol""" class ConnectionClosedError(ProtocolError): @@ -75,14 +81,6 @@ class TimeoutError(ProtocolError): """Operation timeout error""" -class EncodingError(ProtocolError): - """Encoding failure on the sending side""" - - -class DecodingError(ProtocolError): - """Decoding failure on the receiving side""" - - class RequestError(ProtocolError): """Error on request @@ -117,15 +115,12 @@ class Transport: """ - def __init__(self, address, timeouts=None, eom=None): + def __init__(self, address, timeouts=None): """Constructor for the Client class. Arguments: - address: a valid address the the used transport class - timeout: a list of timeouts, to be used on connect and read/write - - eom: an identifier to be used as end-of-message which the - upper-layer will guarantee that this identifier will not appear - in any message There are two timeouts used since we might want to wait for a long time for a response, but the connect timeout should be lower. @@ -148,22 +143,16 @@ class Transport: self._buffer = "" self._msgs = collections.deque() - if eom is None: - self.eom = '\3' - else: - self.eom = eom - try: self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - self.socket.settimeout(self._ctimeout) + + # Try to connect try: - self.socket.connect(address) - except socket.timeout, err: - raise TimeoutError("Connect timed out: %s" % str(err)) - except socket.error, err: - if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED): - raise NoMasterError(address) - raise + utils.Retry(self._Connect, 1.0, self._ctimeout, + args=(self.socket, address, self._ctimeout)) + except utils.RetryTimeout: + raise TimeoutError("Connect timed out") + self.socket.settimeout(self._rwtimeout) except (socket.error, NoMasterError): if self.socket is not None: @@ -171,6 +160,21 @@ class Transport: self.socket = None raise + @staticmethod + def _Connect(sock, address, timeout): + sock.settimeout(timeout) + try: + sock.connect(address) + except socket.timeout, err: + raise TimeoutError("Connect timed out: %s" % str(err)) + except socket.error, err: + if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED): + raise NoMasterError(address) + if err.args[0] == errno.EAGAIN: + # Server's socket backlog is full at the moment + raise utils.RetryAgain() + raise + def _CheckSocket(self): """Make sure we are connected. @@ -184,12 +188,13 @@ class Transport: This just sends a message and doesn't wait for the response. """ - if self.eom in msg: - raise EncodingError("Message terminator found in payload") + if constants.LUXI_EOM in msg: + raise ProtocolError("Message terminator found in payload") + self._CheckSocket() try: # TODO: sendall is not guaranteed to send everything - self.socket.sendall(msg + self.eom) + self.socket.sendall(msg + constants.LUXI_EOM) except socket.timeout, err: raise TimeoutError("Sending timeout: %s" % str(err)) @@ -219,7 +224,7 @@ class Transport: break if not data: raise ConnectionClosedError("Connection closed while reading") - new_msgs = (self._buffer + data).split(self.eom) + new_msgs = (self._buffer + data).split(constants.LUXI_EOM) self._buffer = new_msgs.pop() self._msgs.extend(new_msgs) return self._msgs.popleft() @@ -240,6 +245,99 @@ class Transport: self.socket = None +def ParseRequest(msg): + """Parses a LUXI request message. + + """ + try: + request = serializer.LoadJson(msg) + except ValueError, err: + raise ProtocolError("Invalid LUXI request (parsing error): %s" % err) + + logging.debug("LUXI request: %s", request) + + if not isinstance(request, dict): + logging.error("LUXI request not a dict: %r", msg) + raise ProtocolError("Invalid LUXI request (not a dict)") + + method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103 + args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103 + + if method is None or args is None: + logging.error("LUXI request missing method or arguments: %r", msg) + raise ProtocolError(("Invalid LUXI request (no method or arguments" + " in request): %r") % msg) + + return (method, args) + + +def ParseResponse(msg): + """Parses a LUXI response message. + + """ + # Parse the result + try: + data = serializer.LoadJson(msg) + except Exception, err: + raise ProtocolError("Error while deserializing response: %s" % str(err)) + + # Validate response + if not (isinstance(data, dict) and + KEY_SUCCESS in data and + KEY_RESULT in data): + raise ProtocolError("Invalid response from server: %r" % data) + + return (data[KEY_SUCCESS], data[KEY_RESULT]) + + +def FormatResponse(success, result): + """Formats a LUXI response message. + + """ + response = { + KEY_SUCCESS: success, + KEY_RESULT: result, + } + + logging.debug("LUXI response: %s", response) + + return serializer.DumpJson(response) + + +def FormatRequest(method, args): + """Formats a LUXI request message. + + """ + # Build request + request = { + KEY_METHOD: method, + KEY_ARGS: args, + } + + # Serialize the request + return serializer.DumpJson(request, indent=False) + + +def CallLuxiMethod(transport_cb, method, args): + """Send a LUXI request via a transport and return the response. + + """ + assert callable(transport_cb) + + request_msg = FormatRequest(method, args) + + # Send request and wait for response + response_msg = transport_cb(request_msg) + + (success, result) = ParseResponse(response_msg) + + if success: + return result + + errors.MaybeRaise(result) + raise RequestError(result) + + class Client(object): """High-level client implementation. @@ -286,49 +384,23 @@ class Client(object): old_transp = self.transport self.transport = None old_transp.Close() - except Exception: + except Exception: # pylint: disable-msg=W0703 pass - def CallMethod(self, method, args): - """Send a generic request and return the response. - - """ - # Build request - request = { - KEY_METHOD: method, - KEY_ARGS: args, - } - - # Serialize the request - send_data = serializer.DumpJson(request, indent=False) - + def _SendMethodCall(self, data): # Send request and wait for response try: self._InitTransport() - result = self.transport.Call(send_data) + return self.transport.Call(data) except Exception: self._CloseTransport() raise - # Parse the result - try: - data = serializer.LoadJson(result) - except Exception, err: - raise ProtocolError("Error while deserializing response: %s" % str(err)) - - # Validate response - if (not isinstance(data, dict) or - KEY_SUCCESS not in data or - KEY_RESULT not in data): - raise DecodingError("Invalid response from server: %s" % str(data)) - - result = data[KEY_RESULT] - - if not data[KEY_SUCCESS]: - errors.MaybeRaise(result) - raise RequestError(result) + def CallMethod(self, method, args): + """Send a generic request and return the response. - return result + """ + return CallLuxiMethod(self._SendMethodCall, method, args) def SetQueueDrainFlag(self, drain_flag): return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag) @@ -356,12 +428,33 @@ class Client(object): timeout = (DEF_RWTO - 1) / 2 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout)) + def WaitForJobChangeOnce(self, job_id, fields, + prev_job_info, prev_log_serial, + timeout=WFJC_TIMEOUT): + """Waits for changes on a job. + + @param job_id: Job ID + @type fields: list + @param fields: List of field names to be observed + @type prev_job_info: None or list + @param prev_job_info: Previously received job information + @type prev_log_serial: None or int/long + @param prev_log_serial: Highest log serial number previously received + @type timeout: int/float + @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will + be capped to that value) + + """ + assert timeout >= 0, "Timeout can not be negative" + return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE, + (job_id, fields, prev_job_info, + prev_log_serial, + min(WFJC_TIMEOUT, timeout))) + def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial): - timeout = (DEF_RWTO - 1) / 2 while True: - result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE, - (job_id, fields, prev_job_info, - prev_log_serial, timeout)) + result = self.WaitForJobChangeOnce(job_id, fields, + prev_job_info, prev_log_serial) if result != constants.JOB_NOTCHANGED: break return result @@ -384,5 +477,5 @@ class Client(object): def QueryConfigValues(self, fields): return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields) - -# TODO: class Server(object) + def QueryTags(self, kind, name): + return self.CallMethod(REQ_QUERY_TAGS, (kind, name))