"""Module for the unix socket protocol
-This module implements the local unix socket protocl. You only need
+This module implements the local unix socket protocol. You only need
this module and the opcodes module in the client program in order to
communicate with the master.
import collections
import time
import errno
+import logging
from ganeti import serializer
from ganeti import constants
from ganeti import errors
+from ganeti import utils
-KEY_METHOD = 'method'
-KEY_ARGS = 'args'
+KEY_METHOD = "method"
+KEY_ARGS = "args"
KEY_SUCCESS = "success"
KEY_RESULT = "result"
REQ_SUBMIT_JOB = "SubmitJob"
+REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
REQ_CANCEL_JOB = "CancelJob"
REQ_ARCHIVE_JOB = "ArchiveJob"
REQ_QUERY_NODES = "QueryNodes"
REQ_QUERY_EXPORTS = "QueryExports"
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
+REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
+REQ_QUERY_TAGS = "QueryTags"
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
+REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
DEF_CTMO = 10
DEF_RWTO = 60
+# WaitForJobChange timeout
+WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
-class ProtocolError(Exception):
- """Denotes an error in the server communication"""
+
+class ProtocolError(errors.GenericError):
+ """Denotes an error in the LUXI protocol"""
class ConnectionClosedError(ProtocolError):
"""Operation timeout error"""
-class EncodingError(ProtocolError):
- """Encoding failure on the sending side"""
-
-
-class DecodingError(ProtocolError):
- """Decoding failure on the receiving side"""
-
-
class RequestError(ProtocolError):
"""Error on request
"""
- def __init__(self, address, timeouts=None, eom=None):
+ def __init__(self, address, timeouts=None):
"""Constructor for the Client class.
Arguments:
- address: a valid address the the used transport class
- timeout: a list of timeouts, to be used on connect and read/write
- - eom: an identifier to be used as end-of-message which the
- upper-layer will guarantee that this identifier will not appear
- in any message
There are two timeouts used since we might want to wait for a long
time for a response, but the connect timeout should be lower.
self._buffer = ""
self._msgs = collections.deque()
- if eom is None:
- self.eom = '\3'
- else:
- self.eom = eom
-
try:
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- self.socket.settimeout(self._ctimeout)
+
+ # Try to connect
try:
- self.socket.connect(address)
- except socket.timeout, err:
- raise TimeoutError("Connect timed out: %s" % str(err))
- except socket.error, err:
- if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
- raise NoMasterError((address,))
- raise
+ utils.Retry(self._Connect, 1.0, self._ctimeout,
+ args=(self.socket, address, self._ctimeout))
+ except utils.RetryTimeout:
+ raise TimeoutError("Connect timed out")
+
self.socket.settimeout(self._rwtimeout)
except (socket.error, NoMasterError):
if self.socket is not None:
self.socket = None
raise
+ @staticmethod
+ def _Connect(sock, address, timeout):
+ sock.settimeout(timeout)
+ try:
+ sock.connect(address)
+ except socket.timeout, err:
+ raise TimeoutError("Connect timed out: %s" % str(err))
+ except socket.error, err:
+ if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
+ raise NoMasterError(address)
+ if err.args[0] == errno.EAGAIN:
+ # Server's socket backlog is full at the moment
+ raise utils.RetryAgain()
+ raise
+
def _CheckSocket(self):
"""Make sure we are connected.
This just sends a message and doesn't wait for the response.
"""
- if self.eom in msg:
- raise EncodingError("Message terminator found in payload")
+ if constants.LUXI_EOM in msg:
+ raise ProtocolError("Message terminator found in payload")
+
self._CheckSocket()
try:
- self.socket.sendall(msg + self.eom)
+ # TODO: sendall is not guaranteed to send everything
+ self.socket.sendall(msg + constants.LUXI_EOM)
except socket.timeout, err:
raise TimeoutError("Sending timeout: %s" % str(err))
def Recv(self):
- """Try to receive a messae from the socket.
+ """Try to receive a message from the socket.
In case we already have messages queued, we just return from the
queue. Otherwise, we try to read data with a _rwtimeout network
while not self._msgs:
if time.time() > etime:
raise TimeoutError("Extended receive timeout")
- try:
- data = self.socket.recv(4096)
- except socket.timeout, err:
- raise TimeoutError("Receive timeout: %s" % str(err))
+ while True:
+ try:
+ data = self.socket.recv(4096)
+ except socket.error, err:
+ if err.args and err.args[0] == errno.EAGAIN:
+ continue
+ raise
+ except socket.timeout, err:
+ raise TimeoutError("Receive timeout: %s" % str(err))
+ break
if not data:
raise ConnectionClosedError("Connection closed while reading")
- new_msgs = (self._buffer + data).split(self.eom)
+ new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
self._buffer = new_msgs.pop()
self._msgs.extend(new_msgs)
return self._msgs.popleft()
self.socket = None
+def ParseRequest(msg):
+ """Parses a LUXI request message.
+
+ """
+ try:
+ request = serializer.LoadJson(msg)
+ except ValueError, err:
+ raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
+
+ logging.debug("LUXI request: %s", request)
+
+ if not isinstance(request, dict):
+ logging.error("LUXI request not a dict: %r", msg)
+ raise ProtocolError("Invalid LUXI request (not a dict)")
+
+ method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
+ args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
+
+ if method is None or args is None:
+ logging.error("LUXI request missing method or arguments: %r", msg)
+ raise ProtocolError(("Invalid LUXI request (no method or arguments"
+ " in request): %r") % msg)
+
+ return (method, args)
+
+
+def ParseResponse(msg):
+ """Parses a LUXI response message.
+
+ """
+ # Parse the result
+ try:
+ data = serializer.LoadJson(msg)
+ except Exception, err:
+ raise ProtocolError("Error while deserializing response: %s" % str(err))
+
+ # Validate response
+ if not (isinstance(data, dict) and
+ KEY_SUCCESS in data and
+ KEY_RESULT in data):
+ raise ProtocolError("Invalid response from server: %r" % data)
+
+ return (data[KEY_SUCCESS], data[KEY_RESULT])
+
+
+def FormatResponse(success, result):
+ """Formats a LUXI response message.
+
+ """
+ response = {
+ KEY_SUCCESS: success,
+ KEY_RESULT: result,
+ }
+
+ logging.debug("LUXI response: %s", response)
+
+ return serializer.DumpJson(response)
+
+
+def FormatRequest(method, args):
+ """Formats a LUXI request message.
+
+ """
+ # Build request
+ request = {
+ KEY_METHOD: method,
+ KEY_ARGS: args,
+ }
+
+ # Serialize the request
+ return serializer.DumpJson(request, indent=False)
+
+
+def CallLuxiMethod(transport_cb, method, args):
+ """Send a LUXI request via a transport and return the response.
+
+ """
+ assert callable(transport_cb)
+
+ request_msg = FormatRequest(method, args)
+
+ # Send request and wait for response
+ response_msg = transport_cb(request_msg)
+
+ (success, result) = ParseResponse(response_msg)
+
+ if success:
+ return result
+
+ errors.MaybeRaise(result)
+ raise RequestError(result)
+
+
class Client(object):
"""High-level client implementation.
"""
if address is None:
address = constants.MASTER_SOCKET
- self.transport = transport(address, timeouts=timeouts)
+ self.address = address
+ self.timeouts = timeouts
+ self.transport_class = transport
+ self.transport = None
+ self._InitTransport()
- def CallMethod(self, method, args):
- """Send a generic request and return the response.
+ def _InitTransport(self):
+ """(Re)initialize the transport if needed.
"""
- # Build request
- request = {
- KEY_METHOD: method,
- KEY_ARGS: args,
- }
+ if self.transport is None:
+ self.transport = self.transport_class(self.address,
+ timeouts=self.timeouts)
- # Send request and wait for response
- result = self.transport.Call(serializer.DumpJson(request, indent=False))
- try:
- data = serializer.LoadJson(result)
- except Exception, err:
- raise ProtocolError("Error while deserializing response: %s" % str(err))
+ def _CloseTransport(self):
+ """Close the transport, ignoring errors.
- # Validate response
- if (not isinstance(data, dict) or
- KEY_SUCCESS not in data or
- KEY_RESULT not in data):
- raise DecodingError("Invalid response from server: %s" % str(data))
-
- result = data[KEY_RESULT]
+ """
+ if self.transport is None:
+ return
+ try:
+ old_transp = self.transport
+ self.transport = None
+ old_transp.Close()
+ except Exception: # pylint: disable-msg=W0703
+ pass
- if not data[KEY_SUCCESS]:
- # TODO: decide on a standard exception
- if (isinstance(result, (tuple, list)) and len(result) == 2 and
- isinstance(result[1], (tuple, list))):
- # custom ganeti errors
- err_class = errors.GetErrorClass(result[0])
- if err_class is not None:
- raise err_class, tuple(result[1])
+ def _SendMethodCall(self, data):
+ # Send request and wait for response
+ try:
+ self._InitTransport()
+ return self.transport.Call(data)
+ except Exception:
+ self._CloseTransport()
+ raise
- raise RequestError(result)
+ def CallMethod(self, method, args):
+ """Send a generic request and return the response.
- return result
+ """
+ return CallLuxiMethod(self._SendMethodCall, method, args)
def SetQueueDrainFlag(self, drain_flag):
return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
+ def SetWatcherPause(self, until):
+ return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
+
def SubmitJob(self, ops):
ops_state = map(lambda op: op.__getstate__(), ops)
return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
+ def SubmitManyJobs(self, jobs):
+ jobs_state = []
+ for ops in jobs:
+ jobs_state.append([op.__getstate__() for op in ops])
+ return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
+
def CancelJob(self, job_id):
return self.CallMethod(REQ_CANCEL_JOB, job_id)
timeout = (DEF_RWTO - 1) / 2
return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
+ def WaitForJobChangeOnce(self, job_id, fields,
+ prev_job_info, prev_log_serial,
+ timeout=WFJC_TIMEOUT):
+ """Waits for changes on a job.
+
+ @param job_id: Job ID
+ @type fields: list
+ @param fields: List of field names to be observed
+ @type prev_job_info: None or list
+ @param prev_job_info: Previously received job information
+ @type prev_log_serial: None or int/long
+ @param prev_log_serial: Highest log serial number previously received
+ @type timeout: int/float
+ @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
+ be capped to that value)
+
+ """
+ assert timeout >= 0, "Timeout can not be negative"
+ return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
+ (job_id, fields, prev_job_info,
+ prev_log_serial,
+ min(WFJC_TIMEOUT, timeout)))
+
def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
- timeout = (DEF_RWTO - 1) / 2
while True:
- result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
- (job_id, fields, prev_job_info,
- prev_log_serial, timeout))
+ result = self.WaitForJobChangeOnce(job_id, fields,
+ prev_job_info, prev_log_serial)
if result != constants.JOB_NOTCHANGED:
break
return result
def QueryJobs(self, job_ids, fields):
return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
- def QueryInstances(self, names, fields):
- return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields))
+ def QueryInstances(self, names, fields, use_locking):
+ return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
- def QueryNodes(self, names, fields):
- return self.CallMethod(REQ_QUERY_NODES, (names, fields))
+ def QueryNodes(self, names, fields, use_locking):
+ return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
- def QueryExports(self, nodes):
- return self.CallMethod(REQ_QUERY_EXPORTS, nodes)
+ def QueryExports(self, nodes, use_locking):
+ return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
+
+ def QueryClusterInfo(self):
+ return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
def QueryConfigValues(self, fields):
return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
-
-# TODO: class Server(object)
+ def QueryTags(self, kind, name):
+ return self.CallMethod(REQ_QUERY_TAGS, (kind, name))