4 # Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
42 from ganeti import objects
47 KEY_SUCCESS = "success"
49 KEY_VERSION = "version"
51 REQ_SUBMIT_JOB = "SubmitJob"
52 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
53 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
54 REQ_CANCEL_JOB = "CancelJob"
55 REQ_ARCHIVE_JOB = "ArchiveJob"
56 REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
58 REQ_QUERY_FIELDS = "QueryFields"
59 REQ_QUERY_JOBS = "QueryJobs"
60 REQ_QUERY_INSTANCES = "QueryInstances"
61 REQ_QUERY_NODES = "QueryNodes"
62 REQ_QUERY_GROUPS = "QueryGroups"
63 REQ_QUERY_NETWORKS = "QueryNetworks"
64 REQ_QUERY_EXPORTS = "QueryExports"
65 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
66 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
67 REQ_QUERY_TAGS = "QueryTags"
68 REQ_SET_DRAIN_FLAG = "SetDrainFlag"
69 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
71 #: List of all LUXI requests
74 REQ_AUTO_ARCHIVE_JOBS,
77 REQ_QUERY_CLUSTER_INFO,
78 REQ_QUERY_CONFIG_VALUES,
87 REQ_SET_WATCHER_PAUSE,
90 REQ_WAIT_FOR_JOB_CHANGE,
96 # WaitForJobChange timeout
97 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
100 class ProtocolError(errors.LuxiError):
101 """Denotes an error in the LUXI protocol."""
104 class ConnectionClosedError(ProtocolError):
105 """Connection closed error."""
108 class TimeoutError(ProtocolError):
109 """Operation timeout error."""
112 class RequestError(ProtocolError):
115 This signifies an error in the request format or request handling,
116 but not (e.g.) an error in starting up an instance.
118 Some common conditions that can trigger this exception:
119 - job submission failed because the job data was wrong
120 - query failed because required fields were missing
125 class NoMasterError(ProtocolError):
126 """The master cannot be reached.
128 This means that the master daemon is not running or the socket has
134 class PermissionError(ProtocolError):
135 """Permission denied while connecting to the master socket.
137 This means the user doesn't have the proper rights.
143 """Low-level transport class.
145 This is used on the client side.
147 This could be replace by any other class that provides the same
148 semantics to the Client. This means:
149 - can send messages and receive messages
150 - safe for multithreading
154 def __init__(self, address, timeouts=None):
155 """Constructor for the Client class.
158 - address: a valid address the the used transport class
159 - timeout: a list of timeouts, to be used on connect and read/write
161 There are two timeouts used since we might want to wait for a long
162 time for a response, but the connect timeout should be lower.
164 If not passed, we use a default of 10 and respectively 60 seconds.
166 Note that on reading data, since the timeout applies to an
167 invidual receive, it might be that the total duration is longer
168 than timeout value passed (we make a hard limit at twice the read
172 self.address = address
174 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
176 self._ctimeout, self._rwtimeout = timeouts
180 self._msgs = collections.deque()
183 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
187 utils.Retry(self._Connect, 1.0, self._ctimeout,
188 args=(self.socket, address, self._ctimeout))
189 except utils.RetryTimeout:
190 raise TimeoutError("Connect timed out")
192 self.socket.settimeout(self._rwtimeout)
193 except (socket.error, NoMasterError):
194 if self.socket is not None:
200 def _Connect(sock, address, timeout):
201 sock.settimeout(timeout)
203 sock.connect(address)
204 except socket.timeout, err:
205 raise TimeoutError("Connect timed out: %s" % str(err))
206 except socket.error, err:
207 error_code = err.args[0]
208 if error_code in (errno.ENOENT, errno.ECONNREFUSED):
209 raise NoMasterError(address)
210 elif error_code in (errno.EPERM, errno.EACCES):
211 raise PermissionError(address)
212 elif error_code == errno.EAGAIN:
213 # Server's socket backlog is full at the moment
214 raise utils.RetryAgain()
217 def _CheckSocket(self):
218 """Make sure we are connected.
221 if self.socket is None:
222 raise ProtocolError("Connection is closed")
227 This just sends a message and doesn't wait for the response.
230 if constants.LUXI_EOM in msg:
231 raise ProtocolError("Message terminator found in payload")
235 # TODO: sendall is not guaranteed to send everything
236 self.socket.sendall(msg + constants.LUXI_EOM)
237 except socket.timeout, err:
238 raise TimeoutError("Sending timeout: %s" % str(err))
241 """Try to receive a message from the socket.
243 In case we already have messages queued, we just return from the
244 queue. Otherwise, we try to read data with a _rwtimeout network
245 timeout, and making sure we don't go over 2x_rwtimeout as a global
250 etime = time.time() + self._rwtimeout
251 while not self._msgs:
252 if time.time() > etime:
253 raise TimeoutError("Extended receive timeout")
256 data = self.socket.recv(4096)
257 except socket.timeout, err:
258 raise TimeoutError("Receive timeout: %s" % str(err))
259 except socket.error, err:
260 if err.args and err.args[0] == errno.EAGAIN:
265 raise ConnectionClosedError("Connection closed while reading")
266 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
267 self._buffer = new_msgs.pop()
268 self._msgs.extend(new_msgs)
269 return self._msgs.popleft()
272 """Send a message and wait for the response.
274 This is just a wrapper over Send and Recv.
281 """Close the socket"""
282 if self.socket is not None:
287 def ParseRequest(msg):
288 """Parses a LUXI request message.
292 request = serializer.LoadJson(msg)
293 except ValueError, err:
294 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
296 logging.debug("LUXI request: %s", request)
298 if not isinstance(request, dict):
299 logging.error("LUXI request not a dict: %r", msg)
300 raise ProtocolError("Invalid LUXI request (not a dict)")
302 method = request.get(KEY_METHOD, None) # pylint: disable=E1103
303 args = request.get(KEY_ARGS, None) # pylint: disable=E1103
304 version = request.get(KEY_VERSION, None) # pylint: disable=E1103
306 if method is None or args is None:
307 logging.error("LUXI request missing method or arguments: %r", msg)
308 raise ProtocolError(("Invalid LUXI request (no method or arguments"
309 " in request): %r") % msg)
311 return (method, args, version)
314 def ParseResponse(msg):
315 """Parses a LUXI response message.
320 data = serializer.LoadJson(msg)
321 except KeyboardInterrupt:
323 except Exception, err:
324 raise ProtocolError("Error while deserializing response: %s" % str(err))
327 if not (isinstance(data, dict) and
328 KEY_SUCCESS in data and
330 raise ProtocolError("Invalid response from server: %r" % data)
332 return (data[KEY_SUCCESS], data[KEY_RESULT],
333 data.get(KEY_VERSION, None)) # pylint: disable=E1103
336 def FormatResponse(success, result, version=None):
337 """Formats a LUXI response message.
341 KEY_SUCCESS: success,
345 if version is not None:
346 response[KEY_VERSION] = version
348 logging.debug("LUXI response: %s", response)
350 return serializer.DumpJson(response)
353 def FormatRequest(method, args, version=None):
354 """Formats a LUXI request message.
363 if version is not None:
364 request[KEY_VERSION] = version
366 # Serialize the request
367 return serializer.DumpJson(request)
370 def CallLuxiMethod(transport_cb, method, args, version=None):
371 """Send a LUXI request via a transport and return the response.
374 assert callable(transport_cb)
376 request_msg = FormatRequest(method, args, version=version)
378 # Send request and wait for response
379 response_msg = transport_cb(request_msg)
381 (success, result, resp_version) = ParseResponse(response_msg)
383 # Verify version if there was one in the response
384 if resp_version is not None and resp_version != version:
385 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
386 (version, resp_version))
391 errors.MaybeRaise(result)
392 raise RequestError(result)
395 class Client(object):
396 """High-level client implementation.
398 This uses a backing Transport-like class on top of which it
399 implements data serialization/deserialization.
402 def __init__(self, address=None, timeouts=None, transport=Transport):
403 """Constructor for the Client class.
406 - address: a valid address the the used transport class
407 - timeout: a list of timeouts, to be used on connect and read/write
408 - transport: a Transport-like class
411 If timeout is not passed, the default timeouts of the transport
416 address = constants.MASTER_SOCKET
417 self.address = address
418 self.timeouts = timeouts
419 self.transport_class = transport
420 self.transport = None
421 self._InitTransport()
423 def _InitTransport(self):
424 """(Re)initialize the transport if needed.
427 if self.transport is None:
428 self.transport = self.transport_class(self.address,
429 timeouts=self.timeouts)
431 def _CloseTransport(self):
432 """Close the transport, ignoring errors.
435 if self.transport is None:
438 old_transp = self.transport
439 self.transport = None
441 except Exception: # pylint: disable=W0703
444 def _SendMethodCall(self, data):
445 # Send request and wait for response
447 self._InitTransport()
448 return self.transport.Call(data)
450 self._CloseTransport()
454 """Close the underlying connection.
457 self._CloseTransport()
459 def CallMethod(self, method, args):
460 """Send a generic request and return the response.
463 if not isinstance(args, (list, tuple)):
464 raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
465 " expected list, got %s" % type(args))
466 return CallLuxiMethod(self._SendMethodCall, method, args,
467 version=constants.LUXI_VERSION)
469 def SetQueueDrainFlag(self, drain_flag):
470 return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
472 def SetWatcherPause(self, until):
473 return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
475 def SubmitJob(self, ops):
476 ops_state = map(lambda op: op.__getstate__(), ops)
477 return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
479 def SubmitManyJobs(self, jobs):
482 jobs_state.append([op.__getstate__() for op in ops])
483 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
485 def CancelJob(self, job_id):
486 return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
488 def ArchiveJob(self, job_id):
489 return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
491 def AutoArchiveJobs(self, age):
492 timeout = (DEF_RWTO - 1) / 2
493 return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
495 def WaitForJobChangeOnce(self, job_id, fields,
496 prev_job_info, prev_log_serial,
497 timeout=WFJC_TIMEOUT):
498 """Waits for changes on a job.
500 @param job_id: Job ID
502 @param fields: List of field names to be observed
503 @type prev_job_info: None or list
504 @param prev_job_info: Previously received job information
505 @type prev_log_serial: None or int/long
506 @param prev_log_serial: Highest log serial number previously received
507 @type timeout: int/float
508 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
509 be capped to that value)
512 assert timeout >= 0, "Timeout can not be negative"
513 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
514 (job_id, fields, prev_job_info,
516 min(WFJC_TIMEOUT, timeout)))
518 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
520 result = self.WaitForJobChangeOnce(job_id, fields,
521 prev_job_info, prev_log_serial)
522 if result != constants.JOB_NOTCHANGED:
526 def Query(self, what, fields, qfilter):
527 """Query for resources/items.
529 @param what: One of L{constants.QR_VIA_LUXI}
530 @type fields: List of strings
531 @param fields: List of requested fields
532 @type qfilter: None or list
533 @param qfilter: Query filter
534 @rtype: L{objects.QueryResponse}
537 result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
538 return objects.QueryResponse.FromDict(result)
540 def QueryFields(self, what, fields):
541 """Query for available fields.
543 @param what: One of L{constants.QR_VIA_LUXI}
544 @type fields: None or list of strings
545 @param fields: List of requested fields
546 @rtype: L{objects.QueryFieldsResponse}
549 result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
550 return objects.QueryFieldsResponse.FromDict(result)
552 def QueryJobs(self, job_ids, fields):
553 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
555 def QueryInstances(self, names, fields, use_locking):
556 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
558 def QueryNodes(self, names, fields, use_locking):
559 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
561 def QueryGroups(self, names, fields, use_locking):
562 return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
564 def QueryNetworks(self, names, fields, use_locking):
565 return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
567 def QueryExports(self, nodes, use_locking):
568 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
570 def QueryClusterInfo(self):
571 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
573 def QueryConfigValues(self, fields):
574 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
576 def QueryTags(self, kind, name):
577 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))