4 # Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
42 from ganeti import objects
47 KEY_SUCCESS = "success"
49 KEY_VERSION = "version"
51 REQ_SUBMIT_JOB = "SubmitJob"
52 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
53 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
54 REQ_CANCEL_JOB = "CancelJob"
55 REQ_ARCHIVE_JOB = "ArchiveJob"
56 REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
58 REQ_QUERY_FIELDS = "QueryFields"
59 REQ_QUERY_JOBS = "QueryJobs"
60 REQ_QUERY_INSTANCES = "QueryInstances"
61 REQ_QUERY_NODES = "QueryNodes"
62 REQ_QUERY_GROUPS = "QueryGroups"
63 REQ_QUERY_EXPORTS = "QueryExports"
64 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
65 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
66 REQ_QUERY_TAGS = "QueryTags"
67 REQ_SET_DRAIN_FLAG = "SetDrainFlag"
68 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
70 #: List of all LUXI requests
73 REQ_AUTO_ARCHIVE_JOBS,
76 REQ_QUERY_CLUSTER_INFO,
77 REQ_QUERY_CONFIG_VALUES,
86 REQ_SET_WATCHER_PAUSE,
89 REQ_WAIT_FOR_JOB_CHANGE,
95 # WaitForJobChange timeout
96 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
99 class ProtocolError(errors.LuxiError):
100 """Denotes an error in the LUXI protocol."""
103 class ConnectionClosedError(ProtocolError):
104 """Connection closed error."""
107 class TimeoutError(ProtocolError):
108 """Operation timeout error."""
111 class RequestError(ProtocolError):
114 This signifies an error in the request format or request handling,
115 but not (e.g.) an error in starting up an instance.
117 Some common conditions that can trigger this exception:
118 - job submission failed because the job data was wrong
119 - query failed because required fields were missing
124 class NoMasterError(ProtocolError):
125 """The master cannot be reached.
127 This means that the master daemon is not running or the socket has
133 class PermissionError(ProtocolError):
134 """Permission denied while connecting to the master socket.
136 This means the user doesn't have the proper rights.
142 """Low-level transport class.
144 This is used on the client side.
146 This could be replace by any other class that provides the same
147 semantics to the Client. This means:
148 - can send messages and receive messages
149 - safe for multithreading
153 def __init__(self, address, timeouts=None):
154 """Constructor for the Client class.
157 - address: a valid address the the used transport class
158 - timeout: a list of timeouts, to be used on connect and read/write
160 There are two timeouts used since we might want to wait for a long
161 time for a response, but the connect timeout should be lower.
163 If not passed, we use a default of 10 and respectively 60 seconds.
165 Note that on reading data, since the timeout applies to an
166 invidual receive, it might be that the total duration is longer
167 than timeout value passed (we make a hard limit at twice the read
171 self.address = address
173 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
175 self._ctimeout, self._rwtimeout = timeouts
179 self._msgs = collections.deque()
182 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
186 utils.Retry(self._Connect, 1.0, self._ctimeout,
187 args=(self.socket, address, self._ctimeout))
188 except utils.RetryTimeout:
189 raise TimeoutError("Connect timed out")
191 self.socket.settimeout(self._rwtimeout)
192 except (socket.error, NoMasterError):
193 if self.socket is not None:
199 def _Connect(sock, address, timeout):
200 sock.settimeout(timeout)
202 sock.connect(address)
203 except socket.timeout, err:
204 raise TimeoutError("Connect timed out: %s" % str(err))
205 except socket.error, err:
206 error_code = err.args[0]
207 if error_code in (errno.ENOENT, errno.ECONNREFUSED):
208 raise NoMasterError(address)
209 elif error_code in (errno.EPERM, errno.EACCES):
210 raise PermissionError(address)
211 elif error_code == errno.EAGAIN:
212 # Server's socket backlog is full at the moment
213 raise utils.RetryAgain()
216 def _CheckSocket(self):
217 """Make sure we are connected.
220 if self.socket is None:
221 raise ProtocolError("Connection is closed")
226 This just sends a message and doesn't wait for the response.
229 if constants.LUXI_EOM in msg:
230 raise ProtocolError("Message terminator found in payload")
234 # TODO: sendall is not guaranteed to send everything
235 self.socket.sendall(msg + constants.LUXI_EOM)
236 except socket.timeout, err:
237 raise TimeoutError("Sending timeout: %s" % str(err))
240 """Try to receive a message from the socket.
242 In case we already have messages queued, we just return from the
243 queue. Otherwise, we try to read data with a _rwtimeout network
244 timeout, and making sure we don't go over 2x_rwtimeout as a global
249 etime = time.time() + self._rwtimeout
250 while not self._msgs:
251 if time.time() > etime:
252 raise TimeoutError("Extended receive timeout")
255 data = self.socket.recv(4096)
256 except socket.timeout, err:
257 raise TimeoutError("Receive timeout: %s" % str(err))
258 except socket.error, err:
259 if err.args and err.args[0] == errno.EAGAIN:
264 raise ConnectionClosedError("Connection closed while reading")
265 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
266 self._buffer = new_msgs.pop()
267 self._msgs.extend(new_msgs)
268 return self._msgs.popleft()
271 """Send a message and wait for the response.
273 This is just a wrapper over Send and Recv.
280 """Close the socket"""
281 if self.socket is not None:
286 def ParseRequest(msg):
287 """Parses a LUXI request message.
291 request = serializer.LoadJson(msg)
292 except ValueError, err:
293 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
295 logging.debug("LUXI request: %s", request)
297 if not isinstance(request, dict):
298 logging.error("LUXI request not a dict: %r", msg)
299 raise ProtocolError("Invalid LUXI request (not a dict)")
301 method = request.get(KEY_METHOD, None) # pylint: disable=E1103
302 args = request.get(KEY_ARGS, None) # pylint: disable=E1103
303 version = request.get(KEY_VERSION, None) # pylint: disable=E1103
305 if method is None or args is None:
306 logging.error("LUXI request missing method or arguments: %r", msg)
307 raise ProtocolError(("Invalid LUXI request (no method or arguments"
308 " in request): %r") % msg)
310 return (method, args, version)
313 def ParseResponse(msg):
314 """Parses a LUXI response message.
319 data = serializer.LoadJson(msg)
320 except KeyboardInterrupt:
322 except Exception, err:
323 raise ProtocolError("Error while deserializing response: %s" % str(err))
326 if not (isinstance(data, dict) and
327 KEY_SUCCESS in data and
329 raise ProtocolError("Invalid response from server: %r" % data)
331 return (data[KEY_SUCCESS], data[KEY_RESULT],
332 data.get(KEY_VERSION, None)) # pylint: disable=E1103
335 def FormatResponse(success, result, version=None):
336 """Formats a LUXI response message.
340 KEY_SUCCESS: success,
344 if version is not None:
345 response[KEY_VERSION] = version
347 logging.debug("LUXI response: %s", response)
349 return serializer.DumpJson(response)
352 def FormatRequest(method, args, version=None):
353 """Formats a LUXI request message.
362 if version is not None:
363 request[KEY_VERSION] = version
365 # Serialize the request
366 return serializer.DumpJson(request)
369 def CallLuxiMethod(transport_cb, method, args, version=None):
370 """Send a LUXI request via a transport and return the response.
373 assert callable(transport_cb)
375 request_msg = FormatRequest(method, args, version=version)
377 # Send request and wait for response
378 response_msg = transport_cb(request_msg)
380 (success, result, resp_version) = ParseResponse(response_msg)
382 # Verify version if there was one in the response
383 if resp_version is not None and resp_version != version:
384 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
385 (version, resp_version))
390 errors.MaybeRaise(result)
391 raise RequestError(result)
394 class Client(object):
395 """High-level client implementation.
397 This uses a backing Transport-like class on top of which it
398 implements data serialization/deserialization.
401 def __init__(self, address=None, timeouts=None, transport=Transport):
402 """Constructor for the Client class.
405 - address: a valid address the the used transport class
406 - timeout: a list of timeouts, to be used on connect and read/write
407 - transport: a Transport-like class
410 If timeout is not passed, the default timeouts of the transport
415 address = constants.MASTER_SOCKET
416 self.address = address
417 self.timeouts = timeouts
418 self.transport_class = transport
419 self.transport = None
420 self._InitTransport()
422 def _InitTransport(self):
423 """(Re)initialize the transport if needed.
426 if self.transport is None:
427 self.transport = self.transport_class(self.address,
428 timeouts=self.timeouts)
430 def _CloseTransport(self):
431 """Close the transport, ignoring errors.
434 if self.transport is None:
437 old_transp = self.transport
438 self.transport = None
440 except Exception: # pylint: disable=W0703
443 def _SendMethodCall(self, data):
444 # Send request and wait for response
446 self._InitTransport()
447 return self.transport.Call(data)
449 self._CloseTransport()
453 """Close the underlying connection.
456 self._CloseTransport()
458 def CallMethod(self, method, args):
459 """Send a generic request and return the response.
462 if not isinstance(args, (list, tuple)):
463 raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
464 " expected list, got %s" % type(args))
465 return CallLuxiMethod(self._SendMethodCall, method, args,
466 version=constants.LUXI_VERSION)
468 def SetQueueDrainFlag(self, drain_flag):
469 return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
471 def SetWatcherPause(self, until):
472 return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
474 def SubmitJob(self, ops):
475 ops_state = map(lambda op: op.__getstate__(), ops)
476 return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
478 def SubmitManyJobs(self, jobs):
481 jobs_state.append([op.__getstate__() for op in ops])
482 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
484 def CancelJob(self, job_id):
485 return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
487 def ArchiveJob(self, job_id):
488 return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
490 def AutoArchiveJobs(self, age):
491 timeout = (DEF_RWTO - 1) / 2
492 return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
494 def WaitForJobChangeOnce(self, job_id, fields,
495 prev_job_info, prev_log_serial,
496 timeout=WFJC_TIMEOUT):
497 """Waits for changes on a job.
499 @param job_id: Job ID
501 @param fields: List of field names to be observed
502 @type prev_job_info: None or list
503 @param prev_job_info: Previously received job information
504 @type prev_log_serial: None or int/long
505 @param prev_log_serial: Highest log serial number previously received
506 @type timeout: int/float
507 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
508 be capped to that value)
511 assert timeout >= 0, "Timeout can not be negative"
512 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
513 (job_id, fields, prev_job_info,
515 min(WFJC_TIMEOUT, timeout)))
517 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
519 result = self.WaitForJobChangeOnce(job_id, fields,
520 prev_job_info, prev_log_serial)
521 if result != constants.JOB_NOTCHANGED:
525 def Query(self, what, fields, qfilter):
526 """Query for resources/items.
528 @param what: One of L{constants.QR_VIA_LUXI}
529 @type fields: List of strings
530 @param fields: List of requested fields
531 @type qfilter: None or list
532 @param qfilter: Query filter
533 @rtype: L{objects.QueryResponse}
536 result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
537 return objects.QueryResponse.FromDict(result)
539 def QueryFields(self, what, fields):
540 """Query for available fields.
542 @param what: One of L{constants.QR_VIA_LUXI}
543 @type fields: None or list of strings
544 @param fields: List of requested fields
545 @rtype: L{objects.QueryFieldsResponse}
548 result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
549 return objects.QueryFieldsResponse.FromDict(result)
551 def QueryJobs(self, job_ids, fields):
552 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
554 def QueryInstances(self, names, fields, use_locking):
555 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
557 def QueryNodes(self, names, fields, use_locking):
558 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
560 def QueryGroups(self, names, fields, use_locking):
561 return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
563 def QueryExports(self, nodes, use_locking):
564 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
566 def QueryClusterInfo(self):
567 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
569 def QueryConfigValues(self, fields):
570 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
572 def QueryTags(self, kind, name):
573 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))