4 # Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
38 from ganeti import compat
39 from ganeti import serializer
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import utils
43 from ganeti import objects
44 from ganeti import pathutils
49 KEY_SUCCESS = "success"
51 KEY_VERSION = "version"
53 REQ_SUBMIT_JOB = "SubmitJob"
54 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
55 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
56 REQ_CANCEL_JOB = "CancelJob"
57 REQ_ARCHIVE_JOB = "ArchiveJob"
58 REQ_CHANGE_JOB_PRIORITY = "ChangeJobPriority"
59 REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
61 REQ_QUERY_FIELDS = "QueryFields"
62 REQ_QUERY_JOBS = "QueryJobs"
63 REQ_QUERY_INSTANCES = "QueryInstances"
64 REQ_QUERY_NODES = "QueryNodes"
65 REQ_QUERY_GROUPS = "QueryGroups"
66 REQ_QUERY_NETWORKS = "QueryNetworks"
67 REQ_QUERY_EXPORTS = "QueryExports"
68 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
69 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
70 REQ_QUERY_TAGS = "QueryTags"
71 REQ_SET_DRAIN_FLAG = "SetDrainFlag"
72 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
74 #: List of all LUXI requests
75 REQ_ALL = compat.UniqueFrozenset([
77 REQ_AUTO_ARCHIVE_JOBS,
79 REQ_CHANGE_JOB_PRIORITY,
81 REQ_QUERY_CLUSTER_INFO,
82 REQ_QUERY_CONFIG_VALUES,
92 REQ_SET_WATCHER_PAUSE,
95 REQ_WAIT_FOR_JOB_CHANGE,
101 # WaitForJobChange timeout
102 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
105 class ProtocolError(errors.LuxiError):
106 """Denotes an error in the LUXI protocol."""
109 class ConnectionClosedError(ProtocolError):
110 """Connection closed error."""
113 class TimeoutError(ProtocolError):
114 """Operation timeout error."""
117 class RequestError(ProtocolError):
120 This signifies an error in the request format or request handling,
121 but not (e.g.) an error in starting up an instance.
123 Some common conditions that can trigger this exception:
124 - job submission failed because the job data was wrong
125 - query failed because required fields were missing
130 class NoMasterError(ProtocolError):
131 """The master cannot be reached.
133 This means that the master daemon is not running or the socket has
139 class PermissionError(ProtocolError):
140 """Permission denied while connecting to the master socket.
142 This means the user doesn't have the proper rights.
148 """Low-level transport class.
150 This is used on the client side.
152 This could be replace by any other class that provides the same
153 semantics to the Client. This means:
154 - can send messages and receive messages
155 - safe for multithreading
159 def __init__(self, address, timeouts=None):
160 """Constructor for the Client class.
163 - address: a valid address the the used transport class
164 - timeout: a list of timeouts, to be used on connect and read/write
166 There are two timeouts used since we might want to wait for a long
167 time for a response, but the connect timeout should be lower.
169 If not passed, we use a default of 10 and respectively 60 seconds.
171 Note that on reading data, since the timeout applies to an
172 invidual receive, it might be that the total duration is longer
173 than timeout value passed (we make a hard limit at twice the read
177 self.address = address
179 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
181 self._ctimeout, self._rwtimeout = timeouts
185 self._msgs = collections.deque()
188 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
192 utils.Retry(self._Connect, 1.0, self._ctimeout,
193 args=(self.socket, address, self._ctimeout))
194 except utils.RetryTimeout:
195 raise TimeoutError("Connect timed out")
197 self.socket.settimeout(self._rwtimeout)
198 except (socket.error, NoMasterError):
199 if self.socket is not None:
205 def _Connect(sock, address, timeout):
206 sock.settimeout(timeout)
208 sock.connect(address)
209 except socket.timeout, err:
210 raise TimeoutError("Connect timed out: %s" % str(err))
211 except socket.error, err:
212 error_code = err.args[0]
213 if error_code in (errno.ENOENT, errno.ECONNREFUSED):
214 raise NoMasterError(address)
215 elif error_code in (errno.EPERM, errno.EACCES):
216 raise PermissionError(address)
217 elif error_code == errno.EAGAIN:
218 # Server's socket backlog is full at the moment
219 raise utils.RetryAgain()
222 def _CheckSocket(self):
223 """Make sure we are connected.
226 if self.socket is None:
227 raise ProtocolError("Connection is closed")
232 This just sends a message and doesn't wait for the response.
235 if constants.LUXI_EOM in msg:
236 raise ProtocolError("Message terminator found in payload")
240 # TODO: sendall is not guaranteed to send everything
241 self.socket.sendall(msg + constants.LUXI_EOM)
242 except socket.timeout, err:
243 raise TimeoutError("Sending timeout: %s" % str(err))
246 """Try to receive a message from the socket.
248 In case we already have messages queued, we just return from the
249 queue. Otherwise, we try to read data with a _rwtimeout network
250 timeout, and making sure we don't go over 2x_rwtimeout as a global
255 etime = time.time() + self._rwtimeout
256 while not self._msgs:
257 if time.time() > etime:
258 raise TimeoutError("Extended receive timeout")
261 data = self.socket.recv(4096)
262 except socket.timeout, err:
263 raise TimeoutError("Receive timeout: %s" % str(err))
264 except socket.error, err:
265 if err.args and err.args[0] == errno.EAGAIN:
270 raise ConnectionClosedError("Connection closed while reading")
271 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
272 self._buffer = new_msgs.pop()
273 self._msgs.extend(new_msgs)
274 return self._msgs.popleft()
277 """Send a message and wait for the response.
279 This is just a wrapper over Send and Recv.
286 """Close the socket"""
287 if self.socket is not None:
292 def ParseRequest(msg):
293 """Parses a LUXI request message.
297 request = serializer.LoadJson(msg)
298 except ValueError, err:
299 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
301 logging.debug("LUXI request: %s", request)
303 if not isinstance(request, dict):
304 logging.error("LUXI request not a dict: %r", msg)
305 raise ProtocolError("Invalid LUXI request (not a dict)")
307 method = request.get(KEY_METHOD, None) # pylint: disable=E1103
308 args = request.get(KEY_ARGS, None) # pylint: disable=E1103
309 version = request.get(KEY_VERSION, None) # pylint: disable=E1103
311 if method is None or args is None:
312 logging.error("LUXI request missing method or arguments: %r", msg)
313 raise ProtocolError(("Invalid LUXI request (no method or arguments"
314 " in request): %r") % msg)
316 return (method, args, version)
319 def ParseResponse(msg):
320 """Parses a LUXI response message.
325 data = serializer.LoadJson(msg)
326 except KeyboardInterrupt:
328 except Exception, err:
329 raise ProtocolError("Error while deserializing response: %s" % str(err))
332 if not (isinstance(data, dict) and
333 KEY_SUCCESS in data and
335 raise ProtocolError("Invalid response from server: %r" % data)
337 return (data[KEY_SUCCESS], data[KEY_RESULT],
338 data.get(KEY_VERSION, None)) # pylint: disable=E1103
341 def FormatResponse(success, result, version=None):
342 """Formats a LUXI response message.
346 KEY_SUCCESS: success,
350 if version is not None:
351 response[KEY_VERSION] = version
353 logging.debug("LUXI response: %s", response)
355 return serializer.DumpJson(response)
358 def FormatRequest(method, args, version=None):
359 """Formats a LUXI request message.
368 if version is not None:
369 request[KEY_VERSION] = version
371 # Serialize the request
372 return serializer.DumpJson(request)
375 def CallLuxiMethod(transport_cb, method, args, version=None):
376 """Send a LUXI request via a transport and return the response.
379 assert callable(transport_cb)
381 request_msg = FormatRequest(method, args, version=version)
383 # Send request and wait for response
384 response_msg = transport_cb(request_msg)
386 (success, result, resp_version) = ParseResponse(response_msg)
388 # Verify version if there was one in the response
389 if resp_version is not None and resp_version != version:
390 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
391 (version, resp_version))
396 errors.MaybeRaise(result)
397 raise RequestError(result)
400 class Client(object):
401 """High-level client implementation.
403 This uses a backing Transport-like class on top of which it
404 implements data serialization/deserialization.
407 def __init__(self, address=None, timeouts=None, transport=Transport):
408 """Constructor for the Client class.
411 - address: a valid address the the used transport class
412 - timeout: a list of timeouts, to be used on connect and read/write
413 - transport: a Transport-like class
416 If timeout is not passed, the default timeouts of the transport
421 address = pathutils.MASTER_SOCKET
422 self.address = address
423 self.timeouts = timeouts
424 self.transport_class = transport
425 self.transport = None
426 self._InitTransport()
428 def _InitTransport(self):
429 """(Re)initialize the transport if needed.
432 if self.transport is None:
433 self.transport = self.transport_class(self.address,
434 timeouts=self.timeouts)
436 def _CloseTransport(self):
437 """Close the transport, ignoring errors.
440 if self.transport is None:
443 old_transp = self.transport
444 self.transport = None
446 except Exception: # pylint: disable=W0703
449 def _SendMethodCall(self, data):
450 # Send request and wait for response
452 self._InitTransport()
453 return self.transport.Call(data)
455 self._CloseTransport()
459 """Close the underlying connection.
462 self._CloseTransport()
464 def CallMethod(self, method, args):
465 """Send a generic request and return the response.
468 if not isinstance(args, (list, tuple)):
469 raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
470 " expected list, got %s" % type(args))
471 return CallLuxiMethod(self._SendMethodCall, method, args,
472 version=constants.LUXI_VERSION)
474 def SetQueueDrainFlag(self, drain_flag):
475 return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
477 def SetWatcherPause(self, until):
478 return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
480 def SubmitJob(self, ops):
481 ops_state = map(lambda op: op.__getstate__(), ops)
482 return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
484 def SubmitManyJobs(self, jobs):
487 jobs_state.append([op.__getstate__() for op in ops])
488 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
490 def CancelJob(self, job_id):
491 return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
493 def ArchiveJob(self, job_id):
494 return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
496 def ChangeJobPriority(self, job_id, priority):
497 return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
499 def AutoArchiveJobs(self, age):
500 timeout = (DEF_RWTO - 1) / 2
501 return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
503 def WaitForJobChangeOnce(self, job_id, fields,
504 prev_job_info, prev_log_serial,
505 timeout=WFJC_TIMEOUT):
506 """Waits for changes on a job.
508 @param job_id: Job ID
510 @param fields: List of field names to be observed
511 @type prev_job_info: None or list
512 @param prev_job_info: Previously received job information
513 @type prev_log_serial: None or int/long
514 @param prev_log_serial: Highest log serial number previously received
515 @type timeout: int/float
516 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
517 be capped to that value)
520 assert timeout >= 0, "Timeout can not be negative"
521 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
522 (job_id, fields, prev_job_info,
524 min(WFJC_TIMEOUT, timeout)))
526 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
528 result = self.WaitForJobChangeOnce(job_id, fields,
529 prev_job_info, prev_log_serial)
530 if result != constants.JOB_NOTCHANGED:
534 def Query(self, what, fields, qfilter):
535 """Query for resources/items.
537 @param what: One of L{constants.QR_VIA_LUXI}
538 @type fields: List of strings
539 @param fields: List of requested fields
540 @type qfilter: None or list
541 @param qfilter: Query filter
542 @rtype: L{objects.QueryResponse}
545 result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
546 return objects.QueryResponse.FromDict(result)
548 def QueryFields(self, what, fields):
549 """Query for available fields.
551 @param what: One of L{constants.QR_VIA_LUXI}
552 @type fields: None or list of strings
553 @param fields: List of requested fields
554 @rtype: L{objects.QueryFieldsResponse}
557 result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
558 return objects.QueryFieldsResponse.FromDict(result)
560 def QueryJobs(self, job_ids, fields):
561 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
563 def QueryInstances(self, names, fields, use_locking):
564 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
566 def QueryNodes(self, names, fields, use_locking):
567 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
569 def QueryGroups(self, names, fields, use_locking):
570 return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
572 def QueryNetworks(self, names, fields, use_locking):
573 return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
575 def QueryExports(self, nodes, use_locking):
576 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
578 def QueryClusterInfo(self):
579 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
581 def QueryConfigValues(self, fields):
582 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
584 def QueryTags(self, kind, name):
585 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))