import collections
import time
import errno
+import logging
from ganeti import serializer
from ganeti import constants
from ganeti import utils
-KEY_METHOD = 'method'
-KEY_ARGS = 'args'
+KEY_METHOD = "method"
+KEY_ARGS = "args"
KEY_SUCCESS = "success"
KEY_RESULT = "result"
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
-class ProtocolError(Exception):
- """Denotes an error in the server communication"""
+class ProtocolError(errors.GenericError):
+ """Denotes an error in the LUXI protocol"""
class ConnectionClosedError(ProtocolError):
"""Operation timeout error"""
-class EncodingError(ProtocolError):
- """Encoding failure on the sending side"""
-
-
-class DecodingError(ProtocolError):
- """Decoding failure on the receiving side"""
-
-
class RequestError(ProtocolError):
"""Error on request
"""
- def __init__(self, address, timeouts=None, eom=None):
+ def __init__(self, address, timeouts=None):
"""Constructor for the Client class.
Arguments:
- address: a valid address the the used transport class
- timeout: a list of timeouts, to be used on connect and read/write
- - eom: an identifier to be used as end-of-message which the
- upper-layer will guarantee that this identifier will not appear
- in any message
There are two timeouts used since we might want to wait for a long
time for a response, but the connect timeout should be lower.
self._buffer = ""
self._msgs = collections.deque()
- if eom is None:
- self.eom = '\3'
- else:
- self.eom = eom
-
try:
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
This just sends a message and doesn't wait for the response.
"""
- if self.eom in msg:
- raise EncodingError("Message terminator found in payload")
+ if constants.LUXI_EOM in msg:
+ raise ProtocolError("Message terminator found in payload")
+
self._CheckSocket()
try:
# TODO: sendall is not guaranteed to send everything
- self.socket.sendall(msg + self.eom)
+ self.socket.sendall(msg + constants.LUXI_EOM)
except socket.timeout, err:
raise TimeoutError("Sending timeout: %s" % str(err))
break
if not data:
raise ConnectionClosedError("Connection closed while reading")
- new_msgs = (self._buffer + data).split(self.eom)
+ new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
self._buffer = new_msgs.pop()
self._msgs.extend(new_msgs)
return self._msgs.popleft()
self.socket = None
+def ParseRequest(msg):
+ """Parses a LUXI request message.
+
+ """
+ try:
+ request = serializer.LoadJson(msg)
+ except ValueError, err:
+ raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
+
+ logging.debug("LUXI request: %s", request)
+
+ if not isinstance(request, dict):
+ logging.error("LUXI request not a dict: %r", msg)
+ raise ProtocolError("Invalid LUXI request (not a dict)")
+
+ method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
+ args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
+
+ if method is None or args is None:
+ logging.error("LUXI request missing method or arguments: %r", msg)
+ raise ProtocolError(("Invalid LUXI request (no method or arguments"
+ " in request): %r") % msg)
+
+ return (method, args)
+
+
+def ParseResponse(msg):
+ """Parses a LUXI response message.
+
+ """
+ # Parse the result
+ try:
+ data = serializer.LoadJson(msg)
+ except Exception, err:
+ raise ProtocolError("Error while deserializing response: %s" % str(err))
+
+ # Validate response
+ if not (isinstance(data, dict) and
+ KEY_SUCCESS in data and
+ KEY_RESULT in data):
+ raise ProtocolError("Invalid response from server: %r" % data)
+
+ return (data[KEY_SUCCESS], data[KEY_RESULT])
+
+
+def FormatResponse(success, result):
+ """Formats a LUXI response message.
+
+ """
+ response = {
+ KEY_SUCCESS: success,
+ KEY_RESULT: result,
+ }
+
+ logging.debug("LUXI response: %s", response)
+
+ return serializer.DumpJson(response)
+
+
+def FormatRequest(method, args):
+ """Formats a LUXI request message.
+
+ """
+ # Build request
+ request = {
+ KEY_METHOD: method,
+ KEY_ARGS: args,
+ }
+
+ # Serialize the request
+ return serializer.DumpJson(request, indent=False)
+
+
+def CallLuxiMethod(transport_cb, method, args):
+ """Send a LUXI request via a transport and return the response.
+
+ """
+ assert callable(transport_cb)
+
+ request_msg = FormatRequest(method, args)
+
+ # Send request and wait for response
+ response_msg = transport_cb(request_msg)
+
+ (success, result) = ParseResponse(response_msg)
+
+ if success:
+ return result
+
+ errors.MaybeRaise(result)
+ raise RequestError(result)
+
+
class Client(object):
"""High-level client implementation.
except Exception: # pylint: disable-msg=W0703
pass
- def CallMethod(self, method, args):
- """Send a generic request and return the response.
-
- """
- # Build request
- request = {
- KEY_METHOD: method,
- KEY_ARGS: args,
- }
-
- # Serialize the request
- send_data = serializer.DumpJson(request, indent=False)
-
+ def _SendMethodCall(self, data):
# Send request and wait for response
try:
self._InitTransport()
- result = self.transport.Call(send_data)
+ return self.transport.Call(data)
except Exception:
self._CloseTransport()
raise
- # Parse the result
- try:
- data = serializer.LoadJson(result)
- except Exception, err:
- raise ProtocolError("Error while deserializing response: %s" % str(err))
-
- # Validate response
- if (not isinstance(data, dict) or
- KEY_SUCCESS not in data or
- KEY_RESULT not in data):
- raise DecodingError("Invalid response from server: %s" % str(data))
-
- result = data[KEY_RESULT]
-
- if not data[KEY_SUCCESS]:
- errors.MaybeRaise(result)
- raise RequestError(result)
+ def CallMethod(self, method, args):
+ """Send a generic request and return the response.
- return result
+ """
+ return CallLuxiMethod(self._SendMethodCall, method, args)
def SetQueueDrainFlag(self, drain_flag):
return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)