REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
REQ_QUERY_TAGS = "QueryTags"
+REQ_QUERY_LOCKS = "QueryLocks"
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
DEF_CTMO = 10
DEF_RWTO = 60
+# WaitForJobChange timeout
+WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
+
class ProtocolError(errors.GenericError):
- """Denotes an error in the LUXI protocol"""
+ """Denotes an error in the LUXI protocol."""
class ConnectionClosedError(ProtocolError):
- """Connection closed error"""
+ """Connection closed error."""
class TimeoutError(ProtocolError):
- """Operation timeout error"""
+ """Operation timeout error."""
class RequestError(ProtocolError):
- """Error on request
+ """Error on request.
This signifies an error in the request format or request handling,
but not (e.g.) an error in starting up an instance.
class NoMasterError(ProtocolError):
- """The master cannot be reached
+ """The master cannot be reached.
This means that the master daemon is not running or the socket has
been removed.
"""
+class PermissionError(ProtocolError):
+ """Permission denied while connecting to the master socket.
+
+ This means the user doesn't have the proper rights.
+
+ """
+
+
class Transport:
"""Low-level transport class.
"""
- def __init__(self, address, timeouts=None, eom=None):
+ def __init__(self, address, timeouts=None):
"""Constructor for the Client class.
Arguments:
- address: a valid address the the used transport class
- timeout: a list of timeouts, to be used on connect and read/write
- - eom: an identifier to be used as end-of-message which the
- upper-layer will guarantee that this identifier will not appear
- in any message
There are two timeouts used since we might want to wait for a long
time for a response, but the connect timeout should be lower.
self._buffer = ""
self._msgs = collections.deque()
- if eom is None:
- self.eom = '\3'
- else:
- self.eom = eom
-
try:
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
except socket.timeout, err:
raise TimeoutError("Connect timed out: %s" % str(err))
except socket.error, err:
- if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
+ error_code = err.args[0]
+ if error_code in (errno.ENOENT, errno.ECONNREFUSED):
raise NoMasterError(address)
- if err.args[0] == errno.EAGAIN:
+ elif error_code in (errno.EPERM, errno.EACCES):
+ raise PermissionError(address)
+ elif error_code == errno.EAGAIN:
# Server's socket backlog is full at the moment
raise utils.RetryAgain()
raise
This just sends a message and doesn't wait for the response.
"""
- if self.eom in msg:
+ if constants.LUXI_EOM in msg:
raise ProtocolError("Message terminator found in payload")
self._CheckSocket()
try:
# TODO: sendall is not guaranteed to send everything
- self.socket.sendall(msg + self.eom)
+ self.socket.sendall(msg + constants.LUXI_EOM)
except socket.timeout, err:
raise TimeoutError("Sending timeout: %s" % str(err))
break
if not data:
raise ConnectionClosedError("Connection closed while reading")
- new_msgs = (self._buffer + data).split(self.eom)
+ new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
self._buffer = new_msgs.pop()
self._msgs.extend(new_msgs)
return self._msgs.popleft()
logging.error("LUXI request not a dict: %r", msg)
raise ProtocolError("Invalid LUXI request (not a dict)")
- method = request.get(KEY_METHOD, None)
- args = request.get(KEY_ARGS, None)
+ method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
+ args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
+
if method is None or args is None:
logging.error("LUXI request missing method or arguments: %r", msg)
raise ProtocolError(("Invalid LUXI request (no method or arguments"
return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
def WaitForJobChangeOnce(self, job_id, fields,
- prev_job_info, prev_log_serial):
- timeout = (DEF_RWTO - 1) / 2
+ prev_job_info, prev_log_serial,
+ timeout=WFJC_TIMEOUT):
+ """Waits for changes on a job.
+
+ @param job_id: Job ID
+ @type fields: list
+ @param fields: List of field names to be observed
+ @type prev_job_info: None or list
+ @param prev_job_info: Previously received job information
+ @type prev_log_serial: None or int/long
+ @param prev_log_serial: Highest log serial number previously received
+ @type timeout: int/float
+ @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
+ be capped to that value)
+
+ """
+ assert timeout >= 0, "Timeout can not be negative"
return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
(job_id, fields, prev_job_info,
- prev_log_serial, timeout))
+ prev_log_serial,
+ min(WFJC_TIMEOUT, timeout)))
def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
while True:
def QueryTags(self, kind, name):
return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
+
+ def QueryLocks(self, fields, sync):
+ return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))