4 # Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
38 from ganeti import compat
39 from ganeti import serializer
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import utils
43 from ganeti import objects
44 from ganeti import pathutils
49 KEY_SUCCESS = "success"
51 KEY_VERSION = "version"
53 REQ_SUBMIT_JOB = "SubmitJob"
54 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
55 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
56 REQ_CANCEL_JOB = "CancelJob"
57 REQ_ARCHIVE_JOB = "ArchiveJob"
58 REQ_CHANGE_JOB_PRIORITY = "ChangeJobPriority"
59 REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
61 REQ_QUERY_FIELDS = "QueryFields"
62 REQ_QUERY_JOBS = "QueryJobs"
63 REQ_QUERY_INSTANCES = "QueryInstances"
64 REQ_QUERY_NODES = "QueryNodes"
65 REQ_QUERY_GROUPS = "QueryGroups"
66 REQ_QUERY_NETWORKS = "QueryNetworks"
67 REQ_QUERY_EXPORTS = "QueryExports"
68 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
69 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
70 REQ_QUERY_TAGS = "QueryTags"
71 REQ_SET_DRAIN_FLAG = "SetDrainFlag"
72 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
74 #: List of all LUXI requests
75 REQ_ALL = compat.UniqueFrozenset([
77 REQ_AUTO_ARCHIVE_JOBS,
79 REQ_CHANGE_JOB_PRIORITY,
81 REQ_QUERY_CLUSTER_INFO,
82 REQ_QUERY_CONFIG_VALUES,
91 REQ_SET_WATCHER_PAUSE,
94 REQ_WAIT_FOR_JOB_CHANGE,
100 # WaitForJobChange timeout
101 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
104 class ProtocolError(errors.LuxiError):
105 """Denotes an error in the LUXI protocol."""
108 class ConnectionClosedError(ProtocolError):
109 """Connection closed error."""
112 class TimeoutError(ProtocolError):
113 """Operation timeout error."""
116 class RequestError(ProtocolError):
119 This signifies an error in the request format or request handling,
120 but not (e.g.) an error in starting up an instance.
122 Some common conditions that can trigger this exception:
123 - job submission failed because the job data was wrong
124 - query failed because required fields were missing
129 class NoMasterError(ProtocolError):
130 """The master cannot be reached.
132 This means that the master daemon is not running or the socket has
138 class PermissionError(ProtocolError):
139 """Permission denied while connecting to the master socket.
141 This means the user doesn't have the proper rights.
147 """Low-level transport class.
149 This is used on the client side.
151 This could be replace by any other class that provides the same
152 semantics to the Client. This means:
153 - can send messages and receive messages
154 - safe for multithreading
158 def __init__(self, address, timeouts=None):
159 """Constructor for the Client class.
162 - address: a valid address the the used transport class
163 - timeout: a list of timeouts, to be used on connect and read/write
165 There are two timeouts used since we might want to wait for a long
166 time for a response, but the connect timeout should be lower.
168 If not passed, we use a default of 10 and respectively 60 seconds.
170 Note that on reading data, since the timeout applies to an
171 invidual receive, it might be that the total duration is longer
172 than timeout value passed (we make a hard limit at twice the read
176 self.address = address
178 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
180 self._ctimeout, self._rwtimeout = timeouts
184 self._msgs = collections.deque()
187 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
191 utils.Retry(self._Connect, 1.0, self._ctimeout,
192 args=(self.socket, address, self._ctimeout))
193 except utils.RetryTimeout:
194 raise TimeoutError("Connect timed out")
196 self.socket.settimeout(self._rwtimeout)
197 except (socket.error, NoMasterError):
198 if self.socket is not None:
204 def _Connect(sock, address, timeout):
205 sock.settimeout(timeout)
207 sock.connect(address)
208 except socket.timeout, err:
209 raise TimeoutError("Connect timed out: %s" % str(err))
210 except socket.error, err:
211 error_code = err.args[0]
212 if error_code in (errno.ENOENT, errno.ECONNREFUSED):
213 raise NoMasterError(address)
214 elif error_code in (errno.EPERM, errno.EACCES):
215 raise PermissionError(address)
216 elif error_code == errno.EAGAIN:
217 # Server's socket backlog is full at the moment
218 raise utils.RetryAgain()
221 def _CheckSocket(self):
222 """Make sure we are connected.
225 if self.socket is None:
226 raise ProtocolError("Connection is closed")
231 This just sends a message and doesn't wait for the response.
234 if constants.LUXI_EOM in msg:
235 raise ProtocolError("Message terminator found in payload")
239 # TODO: sendall is not guaranteed to send everything
240 self.socket.sendall(msg + constants.LUXI_EOM)
241 except socket.timeout, err:
242 raise TimeoutError("Sending timeout: %s" % str(err))
245 """Try to receive a message from the socket.
247 In case we already have messages queued, we just return from the
248 queue. Otherwise, we try to read data with a _rwtimeout network
249 timeout, and making sure we don't go over 2x_rwtimeout as a global
254 etime = time.time() + self._rwtimeout
255 while not self._msgs:
256 if time.time() > etime:
257 raise TimeoutError("Extended receive timeout")
260 data = self.socket.recv(4096)
261 except socket.timeout, err:
262 raise TimeoutError("Receive timeout: %s" % str(err))
263 except socket.error, err:
264 if err.args and err.args[0] == errno.EAGAIN:
269 raise ConnectionClosedError("Connection closed while reading")
270 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
271 self._buffer = new_msgs.pop()
272 self._msgs.extend(new_msgs)
273 return self._msgs.popleft()
276 """Send a message and wait for the response.
278 This is just a wrapper over Send and Recv.
285 """Close the socket"""
286 if self.socket is not None:
291 def ParseRequest(msg):
292 """Parses a LUXI request message.
296 request = serializer.LoadJson(msg)
297 except ValueError, err:
298 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
300 logging.debug("LUXI request: %s", request)
302 if not isinstance(request, dict):
303 logging.error("LUXI request not a dict: %r", msg)
304 raise ProtocolError("Invalid LUXI request (not a dict)")
306 method = request.get(KEY_METHOD, None) # pylint: disable=E1103
307 args = request.get(KEY_ARGS, None) # pylint: disable=E1103
308 version = request.get(KEY_VERSION, None) # pylint: disable=E1103
310 if method is None or args is None:
311 logging.error("LUXI request missing method or arguments: %r", msg)
312 raise ProtocolError(("Invalid LUXI request (no method or arguments"
313 " in request): %r") % msg)
315 return (method, args, version)
318 def ParseResponse(msg):
319 """Parses a LUXI response message.
324 data = serializer.LoadJson(msg)
325 except KeyboardInterrupt:
327 except Exception, err:
328 raise ProtocolError("Error while deserializing response: %s" % str(err))
331 if not (isinstance(data, dict) and
332 KEY_SUCCESS in data and
334 raise ProtocolError("Invalid response from server: %r" % data)
336 return (data[KEY_SUCCESS], data[KEY_RESULT],
337 data.get(KEY_VERSION, None)) # pylint: disable=E1103
340 def FormatResponse(success, result, version=None):
341 """Formats a LUXI response message.
345 KEY_SUCCESS: success,
349 if version is not None:
350 response[KEY_VERSION] = version
352 logging.debug("LUXI response: %s", response)
354 return serializer.DumpJson(response)
357 def FormatRequest(method, args, version=None):
358 """Formats a LUXI request message.
367 if version is not None:
368 request[KEY_VERSION] = version
370 # Serialize the request
371 return serializer.DumpJson(request)
374 def CallLuxiMethod(transport_cb, method, args, version=None):
375 """Send a LUXI request via a transport and return the response.
378 assert callable(transport_cb)
380 request_msg = FormatRequest(method, args, version=version)
382 # Send request and wait for response
383 response_msg = transport_cb(request_msg)
385 (success, result, resp_version) = ParseResponse(response_msg)
387 # Verify version if there was one in the response
388 if resp_version is not None and resp_version != version:
389 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
390 (version, resp_version))
395 errors.MaybeRaise(result)
396 raise RequestError(result)
399 class Client(object):
400 """High-level client implementation.
402 This uses a backing Transport-like class on top of which it
403 implements data serialization/deserialization.
406 def __init__(self, address=None, timeouts=None, transport=Transport):
407 """Constructor for the Client class.
410 - address: a valid address the the used transport class
411 - timeout: a list of timeouts, to be used on connect and read/write
412 - transport: a Transport-like class
415 If timeout is not passed, the default timeouts of the transport
420 address = pathutils.MASTER_SOCKET
421 self.address = address
422 self.timeouts = timeouts
423 self.transport_class = transport
424 self.transport = None
425 self._InitTransport()
427 def _InitTransport(self):
428 """(Re)initialize the transport if needed.
431 if self.transport is None:
432 self.transport = self.transport_class(self.address,
433 timeouts=self.timeouts)
435 def _CloseTransport(self):
436 """Close the transport, ignoring errors.
439 if self.transport is None:
442 old_transp = self.transport
443 self.transport = None
445 except Exception: # pylint: disable=W0703
448 def _SendMethodCall(self, data):
449 # Send request and wait for response
451 self._InitTransport()
452 return self.transport.Call(data)
454 self._CloseTransport()
458 """Close the underlying connection.
461 self._CloseTransport()
463 def CallMethod(self, method, args):
464 """Send a generic request and return the response.
467 if not isinstance(args, (list, tuple)):
468 raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
469 " expected list, got %s" % type(args))
470 return CallLuxiMethod(self._SendMethodCall, method, args,
471 version=constants.LUXI_VERSION)
473 def SetQueueDrainFlag(self, drain_flag):
474 return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
476 def SetWatcherPause(self, until):
477 return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
479 def SubmitJob(self, ops):
480 ops_state = map(lambda op: op.__getstate__(), ops)
481 return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
483 def SubmitManyJobs(self, jobs):
486 jobs_state.append([op.__getstate__() for op in ops])
487 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
489 def CancelJob(self, job_id):
490 return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
492 def ArchiveJob(self, job_id):
493 return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
495 def ChangeJobPriority(self, job_id, priority):
496 return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
498 def AutoArchiveJobs(self, age):
499 timeout = (DEF_RWTO - 1) / 2
500 return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
502 def WaitForJobChangeOnce(self, job_id, fields,
503 prev_job_info, prev_log_serial,
504 timeout=WFJC_TIMEOUT):
505 """Waits for changes on a job.
507 @param job_id: Job ID
509 @param fields: List of field names to be observed
510 @type prev_job_info: None or list
511 @param prev_job_info: Previously received job information
512 @type prev_log_serial: None or int/long
513 @param prev_log_serial: Highest log serial number previously received
514 @type timeout: int/float
515 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
516 be capped to that value)
519 assert timeout >= 0, "Timeout can not be negative"
520 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
521 (job_id, fields, prev_job_info,
523 min(WFJC_TIMEOUT, timeout)))
525 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
527 result = self.WaitForJobChangeOnce(job_id, fields,
528 prev_job_info, prev_log_serial)
529 if result != constants.JOB_NOTCHANGED:
533 def Query(self, what, fields, qfilter):
534 """Query for resources/items.
536 @param what: One of L{constants.QR_VIA_LUXI}
537 @type fields: List of strings
538 @param fields: List of requested fields
539 @type qfilter: None or list
540 @param qfilter: Query filter
541 @rtype: L{objects.QueryResponse}
544 result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
545 return objects.QueryResponse.FromDict(result)
547 def QueryFields(self, what, fields):
548 """Query for available fields.
550 @param what: One of L{constants.QR_VIA_LUXI}
551 @type fields: None or list of strings
552 @param fields: List of requested fields
553 @rtype: L{objects.QueryFieldsResponse}
556 result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
557 return objects.QueryFieldsResponse.FromDict(result)
559 def QueryJobs(self, job_ids, fields):
560 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
562 def QueryInstances(self, names, fields, use_locking):
563 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
565 def QueryNodes(self, names, fields, use_locking):
566 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
568 def QueryGroups(self, names, fields, use_locking):
569 return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
571 def QueryNetworks(self, names, fields, use_locking):
572 return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
574 def QueryExports(self, nodes, use_locking):
575 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
577 def QueryClusterInfo(self):
578 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
580 def QueryConfigValues(self, fields):
581 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
583 def QueryTags(self, kind, name):
584 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))