4 # Copyright (C) 2006, 2007, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
42 from ganeti import objects
47 KEY_SUCCESS = "success"
49 KEY_VERSION = "version"
51 REQ_SUBMIT_JOB = "SubmitJob"
52 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
53 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
54 REQ_CANCEL_JOB = "CancelJob"
55 REQ_ARCHIVE_JOB = "ArchiveJob"
56 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
58 REQ_QUERY_FIELDS = "QueryFields"
59 REQ_QUERY_JOBS = "QueryJobs"
60 REQ_QUERY_INSTANCES = "QueryInstances"
61 REQ_QUERY_NODES = "QueryNodes"
62 REQ_QUERY_GROUPS = "QueryGroups"
63 REQ_QUERY_EXPORTS = "QueryExports"
64 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
65 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
66 REQ_QUERY_TAGS = "QueryTags"
67 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
68 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
73 # WaitForJobChange timeout
74 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
77 class ProtocolError(errors.LuxiError):
78 """Denotes an error in the LUXI protocol."""
81 class ConnectionClosedError(ProtocolError):
82 """Connection closed error."""
85 class TimeoutError(ProtocolError):
86 """Operation timeout error."""
89 class RequestError(ProtocolError):
92 This signifies an error in the request format or request handling,
93 but not (e.g.) an error in starting up an instance.
95 Some common conditions that can trigger this exception:
96 - job submission failed because the job data was wrong
97 - query failed because required fields were missing
102 class NoMasterError(ProtocolError):
103 """The master cannot be reached.
105 This means that the master daemon is not running or the socket has
111 class PermissionError(ProtocolError):
112 """Permission denied while connecting to the master socket.
114 This means the user doesn't have the proper rights.
120 """Low-level transport class.
122 This is used on the client side.
124 This could be replace by any other class that provides the same
125 semantics to the Client. This means:
126 - can send messages and receive messages
127 - safe for multithreading
131 def __init__(self, address, timeouts=None):
132 """Constructor for the Client class.
135 - address: a valid address the the used transport class
136 - timeout: a list of timeouts, to be used on connect and read/write
138 There are two timeouts used since we might want to wait for a long
139 time for a response, but the connect timeout should be lower.
141 If not passed, we use a default of 10 and respectively 60 seconds.
143 Note that on reading data, since the timeout applies to an
144 invidual receive, it might be that the total duration is longer
145 than timeout value passed (we make a hard limit at twice the read
149 self.address = address
151 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
153 self._ctimeout, self._rwtimeout = timeouts
157 self._msgs = collections.deque()
160 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
164 utils.Retry(self._Connect, 1.0, self._ctimeout,
165 args=(self.socket, address, self._ctimeout))
166 except utils.RetryTimeout:
167 raise TimeoutError("Connect timed out")
169 self.socket.settimeout(self._rwtimeout)
170 except (socket.error, NoMasterError):
171 if self.socket is not None:
177 def _Connect(sock, address, timeout):
178 sock.settimeout(timeout)
180 sock.connect(address)
181 except socket.timeout, err:
182 raise TimeoutError("Connect timed out: %s" % str(err))
183 except socket.error, err:
184 error_code = err.args[0]
185 if error_code in (errno.ENOENT, errno.ECONNREFUSED):
186 raise NoMasterError(address)
187 elif error_code in (errno.EPERM, errno.EACCES):
188 raise PermissionError(address)
189 elif error_code == errno.EAGAIN:
190 # Server's socket backlog is full at the moment
191 raise utils.RetryAgain()
194 def _CheckSocket(self):
195 """Make sure we are connected.
198 if self.socket is None:
199 raise ProtocolError("Connection is closed")
204 This just sends a message and doesn't wait for the response.
207 if constants.LUXI_EOM in msg:
208 raise ProtocolError("Message terminator found in payload")
212 # TODO: sendall is not guaranteed to send everything
213 self.socket.sendall(msg + constants.LUXI_EOM)
214 except socket.timeout, err:
215 raise TimeoutError("Sending timeout: %s" % str(err))
218 """Try to receive a message from the socket.
220 In case we already have messages queued, we just return from the
221 queue. Otherwise, we try to read data with a _rwtimeout network
222 timeout, and making sure we don't go over 2x_rwtimeout as a global
227 etime = time.time() + self._rwtimeout
228 while not self._msgs:
229 if time.time() > etime:
230 raise TimeoutError("Extended receive timeout")
233 data = self.socket.recv(4096)
234 except socket.timeout, err:
235 raise TimeoutError("Receive timeout: %s" % str(err))
236 except socket.error, err:
237 if err.args and err.args[0] == errno.EAGAIN:
242 raise ConnectionClosedError("Connection closed while reading")
243 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
244 self._buffer = new_msgs.pop()
245 self._msgs.extend(new_msgs)
246 return self._msgs.popleft()
249 """Send a message and wait for the response.
251 This is just a wrapper over Send and Recv.
258 """Close the socket"""
259 if self.socket is not None:
264 def ParseRequest(msg):
265 """Parses a LUXI request message.
269 request = serializer.LoadJson(msg)
270 except ValueError, err:
271 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
273 logging.debug("LUXI request: %s", request)
275 if not isinstance(request, dict):
276 logging.error("LUXI request not a dict: %r", msg)
277 raise ProtocolError("Invalid LUXI request (not a dict)")
279 method = request.get(KEY_METHOD, None) # pylint: disable=E1103
280 args = request.get(KEY_ARGS, None) # pylint: disable=E1103
281 version = request.get(KEY_VERSION, None) # pylint: disable=E1103
283 if method is None or args is None:
284 logging.error("LUXI request missing method or arguments: %r", msg)
285 raise ProtocolError(("Invalid LUXI request (no method or arguments"
286 " in request): %r") % msg)
288 return (method, args, version)
291 def ParseResponse(msg):
292 """Parses a LUXI response message.
297 data = serializer.LoadJson(msg)
298 except KeyboardInterrupt:
300 except Exception, err:
301 raise ProtocolError("Error while deserializing response: %s" % str(err))
304 if not (isinstance(data, dict) and
305 KEY_SUCCESS in data and
307 raise ProtocolError("Invalid response from server: %r" % data)
309 return (data[KEY_SUCCESS], data[KEY_RESULT],
310 data.get(KEY_VERSION, None)) # pylint: disable=E1103
313 def FormatResponse(success, result, version=None):
314 """Formats a LUXI response message.
318 KEY_SUCCESS: success,
322 if version is not None:
323 response[KEY_VERSION] = version
325 logging.debug("LUXI response: %s", response)
327 return serializer.DumpJson(response)
330 def FormatRequest(method, args, version=None):
331 """Formats a LUXI request message.
340 if version is not None:
341 request[KEY_VERSION] = version
343 # Serialize the request
344 return serializer.DumpJson(request)
347 def CallLuxiMethod(transport_cb, method, args, version=None):
348 """Send a LUXI request via a transport and return the response.
351 assert callable(transport_cb)
353 request_msg = FormatRequest(method, args, version=version)
355 # Send request and wait for response
356 response_msg = transport_cb(request_msg)
358 (success, result, resp_version) = ParseResponse(response_msg)
360 # Verify version if there was one in the response
361 if resp_version is not None and resp_version != version:
362 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
363 (version, resp_version))
368 errors.MaybeRaise(result)
369 raise RequestError(result)
372 class Client(object):
373 """High-level client implementation.
375 This uses a backing Transport-like class on top of which it
376 implements data serialization/deserialization.
379 def __init__(self, address=None, timeouts=None, transport=Transport):
380 """Constructor for the Client class.
383 - address: a valid address the the used transport class
384 - timeout: a list of timeouts, to be used on connect and read/write
385 - transport: a Transport-like class
388 If timeout is not passed, the default timeouts of the transport
393 address = constants.MASTER_SOCKET
394 self.address = address
395 self.timeouts = timeouts
396 self.transport_class = transport
397 self.transport = None
398 self._InitTransport()
400 def _InitTransport(self):
401 """(Re)initialize the transport if needed.
404 if self.transport is None:
405 self.transport = self.transport_class(self.address,
406 timeouts=self.timeouts)
408 def _CloseTransport(self):
409 """Close the transport, ignoring errors.
412 if self.transport is None:
415 old_transp = self.transport
416 self.transport = None
418 except Exception: # pylint: disable=W0703
421 def _SendMethodCall(self, data):
422 # Send request and wait for response
424 self._InitTransport()
425 return self.transport.Call(data)
427 self._CloseTransport()
431 """Close the underlying connection.
434 self._CloseTransport()
436 def CallMethod(self, method, args):
437 """Send a generic request and return the response.
440 if not isinstance(args, (list, tuple)):
441 raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
442 " expected list, got %s" % type(args))
443 return CallLuxiMethod(self._SendMethodCall, method, args,
444 version=constants.LUXI_VERSION)
446 def SetQueueDrainFlag(self, drain_flag):
447 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, (drain_flag, ))
449 def SetWatcherPause(self, until):
450 return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
452 def SubmitJob(self, ops):
453 ops_state = map(lambda op: op.__getstate__(), ops)
454 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
456 def SubmitManyJobs(self, jobs):
459 jobs_state.append([op.__getstate__() for op in ops])
460 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
462 def CancelJob(self, job_id):
463 return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
465 def ArchiveJob(self, job_id):
466 return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
468 def AutoArchiveJobs(self, age):
469 timeout = (DEF_RWTO - 1) / 2
470 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
472 def WaitForJobChangeOnce(self, job_id, fields,
473 prev_job_info, prev_log_serial,
474 timeout=WFJC_TIMEOUT):
475 """Waits for changes on a job.
477 @param job_id: Job ID
479 @param fields: List of field names to be observed
480 @type prev_job_info: None or list
481 @param prev_job_info: Previously received job information
482 @type prev_log_serial: None or int/long
483 @param prev_log_serial: Highest log serial number previously received
484 @type timeout: int/float
485 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
486 be capped to that value)
489 assert timeout >= 0, "Timeout can not be negative"
490 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
491 (job_id, fields, prev_job_info,
493 min(WFJC_TIMEOUT, timeout)))
495 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
497 result = self.WaitForJobChangeOnce(job_id, fields,
498 prev_job_info, prev_log_serial)
499 if result != constants.JOB_NOTCHANGED:
503 def Query(self, what, fields, qfilter):
504 """Query for resources/items.
506 @param what: One of L{constants.QR_VIA_LUXI}
507 @type fields: List of strings
508 @param fields: List of requested fields
509 @type qfilter: None or list
510 @param qfilter: Query filter
511 @rtype: L{objects.QueryResponse}
514 result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
515 return objects.QueryResponse.FromDict(result)
517 def QueryFields(self, what, fields):
518 """Query for available fields.
520 @param what: One of L{constants.QR_VIA_LUXI}
521 @type fields: None or list of strings
522 @param fields: List of requested fields
523 @rtype: L{objects.QueryFieldsResponse}
526 result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
527 return objects.QueryFieldsResponse.FromDict(result)
529 def QueryJobs(self, job_ids, fields):
530 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
532 def QueryInstances(self, names, fields, use_locking):
533 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
535 def QueryNodes(self, names, fields, use_locking):
536 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
538 def QueryGroups(self, names, fields, use_locking):
539 return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
541 def QueryExports(self, nodes, use_locking):
542 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
544 def QueryClusterInfo(self):
545 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
547 def QueryConfigValues(self, fields):
548 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
550 def QueryTags(self, kind, name):
551 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))