4 # Copyright (C) 2006, 2007, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
39 from ganeti import serializer
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import utils
43 from ganeti import objects
48 KEY_SUCCESS = "success"
50 KEY_VERSION = "version"
52 REQ_SUBMIT_JOB = "SubmitJob"
53 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
54 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
55 REQ_CANCEL_JOB = "CancelJob"
56 REQ_ARCHIVE_JOB = "ArchiveJob"
57 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
59 REQ_QUERY_FIELDS = "QueryFields"
60 REQ_QUERY_JOBS = "QueryJobs"
61 REQ_QUERY_INSTANCES = "QueryInstances"
62 REQ_QUERY_NODES = "QueryNodes"
63 REQ_QUERY_GROUPS = "QueryGroups"
64 REQ_QUERY_EXPORTS = "QueryExports"
65 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
66 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
67 REQ_QUERY_TAGS = "QueryTags"
68 REQ_QUERY_LOCKS = "QueryLocks"
69 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
70 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
75 # WaitForJobChange timeout
76 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
79 class ProtocolError(errors.LuxiError):
80 """Denotes an error in the LUXI protocol."""
83 class ConnectionClosedError(ProtocolError):
84 """Connection closed error."""
87 class TimeoutError(ProtocolError):
88 """Operation timeout error."""
91 class RequestError(ProtocolError):
94 This signifies an error in the request format or request handling,
95 but not (e.g.) an error in starting up an instance.
97 Some common conditions that can trigger this exception:
98 - job submission failed because the job data was wrong
99 - query failed because required fields were missing
104 class NoMasterError(ProtocolError):
105 """The master cannot be reached.
107 This means that the master daemon is not running or the socket has
113 class PermissionError(ProtocolError):
114 """Permission denied while connecting to the master socket.
116 This means the user doesn't have the proper rights.
122 """Low-level transport class.
124 This is used on the client side.
126 This could be replace by any other class that provides the same
127 semantics to the Client. This means:
128 - can send messages and receive messages
129 - safe for multithreading
133 def __init__(self, address, timeouts=None):
134 """Constructor for the Client class.
137 - address: a valid address the the used transport class
138 - timeout: a list of timeouts, to be used on connect and read/write
140 There are two timeouts used since we might want to wait for a long
141 time for a response, but the connect timeout should be lower.
143 If not passed, we use a default of 10 and respectively 60 seconds.
145 Note that on reading data, since the timeout applies to an
146 invidual receive, it might be that the total duration is longer
147 than timeout value passed (we make a hard limit at twice the read
151 self.address = address
153 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
155 self._ctimeout, self._rwtimeout = timeouts
159 self._msgs = collections.deque()
162 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
166 utils.Retry(self._Connect, 1.0, self._ctimeout,
167 args=(self.socket, address, self._ctimeout))
168 except utils.RetryTimeout:
169 raise TimeoutError("Connect timed out")
171 self.socket.settimeout(self._rwtimeout)
172 except (socket.error, NoMasterError):
173 if self.socket is not None:
179 def _Connect(sock, address, timeout):
180 sock.settimeout(timeout)
182 sock.connect(address)
183 except socket.timeout, err:
184 raise TimeoutError("Connect timed out: %s" % str(err))
185 except socket.error, err:
186 error_code = err.args[0]
187 if error_code in (errno.ENOENT, errno.ECONNREFUSED):
188 raise NoMasterError(address)
189 elif error_code in (errno.EPERM, errno.EACCES):
190 raise PermissionError(address)
191 elif error_code == errno.EAGAIN:
192 # Server's socket backlog is full at the moment
193 raise utils.RetryAgain()
196 def _CheckSocket(self):
197 """Make sure we are connected.
200 if self.socket is None:
201 raise ProtocolError("Connection is closed")
206 This just sends a message and doesn't wait for the response.
209 if constants.LUXI_EOM in msg:
210 raise ProtocolError("Message terminator found in payload")
214 # TODO: sendall is not guaranteed to send everything
215 self.socket.sendall(msg + constants.LUXI_EOM)
216 except socket.timeout, err:
217 raise TimeoutError("Sending timeout: %s" % str(err))
220 """Try to receive a message from the socket.
222 In case we already have messages queued, we just return from the
223 queue. Otherwise, we try to read data with a _rwtimeout network
224 timeout, and making sure we don't go over 2x_rwtimeout as a global
229 etime = time.time() + self._rwtimeout
230 while not self._msgs:
231 if time.time() > etime:
232 raise TimeoutError("Extended receive timeout")
235 data = self.socket.recv(4096)
236 except socket.timeout, err:
237 raise TimeoutError("Receive timeout: %s" % str(err))
238 except socket.error, err:
239 if err.args and err.args[0] == errno.EAGAIN:
244 raise ConnectionClosedError("Connection closed while reading")
245 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
246 self._buffer = new_msgs.pop()
247 self._msgs.extend(new_msgs)
248 return self._msgs.popleft()
251 """Send a message and wait for the response.
253 This is just a wrapper over Send and Recv.
260 """Close the socket"""
261 if self.socket is not None:
266 def ParseRequest(msg):
267 """Parses a LUXI request message.
271 request = serializer.LoadJson(msg)
272 except ValueError, err:
273 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
275 logging.debug("LUXI request: %s", request)
277 if not isinstance(request, dict):
278 logging.error("LUXI request not a dict: %r", msg)
279 raise ProtocolError("Invalid LUXI request (not a dict)")
281 method = request.get(KEY_METHOD, None) # pylint: disable=E1103
282 args = request.get(KEY_ARGS, None) # pylint: disable=E1103
283 version = request.get(KEY_VERSION, None) # pylint: disable=E1103
285 if method is None or args is None:
286 logging.error("LUXI request missing method or arguments: %r", msg)
287 raise ProtocolError(("Invalid LUXI request (no method or arguments"
288 " in request): %r") % msg)
290 return (method, args, version)
293 def ParseResponse(msg):
294 """Parses a LUXI response message.
299 data = serializer.LoadJson(msg)
300 except KeyboardInterrupt:
302 except Exception, err:
303 raise ProtocolError("Error while deserializing response: %s" % str(err))
306 if not (isinstance(data, dict) and
307 KEY_SUCCESS in data and
309 raise ProtocolError("Invalid response from server: %r" % data)
311 return (data[KEY_SUCCESS], data[KEY_RESULT],
312 data.get(KEY_VERSION, None)) # pylint: disable=E1103
315 def FormatResponse(success, result, version=None):
316 """Formats a LUXI response message.
320 KEY_SUCCESS: success,
324 if version is not None:
325 response[KEY_VERSION] = version
327 logging.debug("LUXI response: %s", response)
329 return serializer.DumpJson(response)
332 def FormatRequest(method, args, version=None):
333 """Formats a LUXI request message.
342 if version is not None:
343 request[KEY_VERSION] = version
345 # Serialize the request
346 return serializer.DumpJson(request, indent=False)
349 def CallLuxiMethod(transport_cb, method, args, version=None):
350 """Send a LUXI request via a transport and return the response.
353 assert callable(transport_cb)
355 request_msg = FormatRequest(method, args, version=version)
357 # Send request and wait for response
358 response_msg = transport_cb(request_msg)
360 (success, result, resp_version) = ParseResponse(response_msg)
362 # Verify version if there was one in the response
363 if resp_version is not None and resp_version != version:
364 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
365 (version, resp_version))
370 errors.MaybeRaise(result)
371 raise RequestError(result)
374 class Client(object):
375 """High-level client implementation.
377 This uses a backing Transport-like class on top of which it
378 implements data serialization/deserialization.
381 def __init__(self, address=None, timeouts=None, transport=Transport):
382 """Constructor for the Client class.
385 - address: a valid address the the used transport class
386 - timeout: a list of timeouts, to be used on connect and read/write
387 - transport: a Transport-like class
390 If timeout is not passed, the default timeouts of the transport
395 address = constants.MASTER_SOCKET
396 self.address = address
397 self.timeouts = timeouts
398 self.transport_class = transport
399 self.transport = None
400 self._InitTransport()
402 def _InitTransport(self):
403 """(Re)initialize the transport if needed.
406 if self.transport is None:
407 self.transport = self.transport_class(self.address,
408 timeouts=self.timeouts)
410 def _CloseTransport(self):
411 """Close the transport, ignoring errors.
414 if self.transport is None:
417 old_transp = self.transport
418 self.transport = None
420 except Exception: # pylint: disable=W0703
423 def _SendMethodCall(self, data):
424 # Send request and wait for response
426 self._InitTransport()
427 return self.transport.Call(data)
429 self._CloseTransport()
433 """Close the underlying connection.
436 self._CloseTransport()
438 def CallMethod(self, method, args):
439 """Send a generic request and return the response.
442 return CallLuxiMethod(self._SendMethodCall, method, args,
443 version=constants.LUXI_VERSION)
445 def SetQueueDrainFlag(self, drain_flag):
446 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
448 def SetWatcherPause(self, until):
449 return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
451 def SubmitJob(self, ops):
452 ops_state = map(lambda op: op.__getstate__(), ops)
453 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
455 def SubmitManyJobs(self, jobs):
458 jobs_state.append([op.__getstate__() for op in ops])
459 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
461 def CancelJob(self, job_id):
462 return self.CallMethod(REQ_CANCEL_JOB, job_id)
464 def ArchiveJob(self, job_id):
465 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
467 def AutoArchiveJobs(self, age):
468 timeout = (DEF_RWTO - 1) / 2
469 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
471 def WaitForJobChangeOnce(self, job_id, fields,
472 prev_job_info, prev_log_serial,
473 timeout=WFJC_TIMEOUT):
474 """Waits for changes on a job.
476 @param job_id: Job ID
478 @param fields: List of field names to be observed
479 @type prev_job_info: None or list
480 @param prev_job_info: Previously received job information
481 @type prev_log_serial: None or int/long
482 @param prev_log_serial: Highest log serial number previously received
483 @type timeout: int/float
484 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
485 be capped to that value)
488 assert timeout >= 0, "Timeout can not be negative"
489 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
490 (job_id, fields, prev_job_info,
492 min(WFJC_TIMEOUT, timeout)))
494 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
496 result = self.WaitForJobChangeOnce(job_id, fields,
497 prev_job_info, prev_log_serial)
498 if result != constants.JOB_NOTCHANGED:
502 def Query(self, what, fields, filter_):
503 """Query for resources/items.
505 @param what: One of L{constants.QR_VIA_LUXI}
506 @type fields: List of strings
507 @param fields: List of requested fields
508 @type filter_: None or list
509 @param filter_: Query filter
510 @rtype: L{objects.QueryResponse}
513 req = objects.QueryRequest(what=what, fields=fields, filter=filter_)
514 result = self.CallMethod(REQ_QUERY, req.ToDict())
515 return objects.QueryResponse.FromDict(result)
517 def QueryFields(self, what, fields):
518 """Query for available fields.
520 @param what: One of L{constants.QR_VIA_LUXI}
521 @type fields: None or list of strings
522 @param fields: List of requested fields
523 @rtype: L{objects.QueryFieldsResponse}
526 req = objects.QueryFieldsRequest(what=what, fields=fields)
527 result = self.CallMethod(REQ_QUERY_FIELDS, req.ToDict())
528 return objects.QueryFieldsResponse.FromDict(result)
530 def QueryJobs(self, job_ids, fields):
531 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
533 def QueryInstances(self, names, fields, use_locking):
534 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
536 def QueryNodes(self, names, fields, use_locking):
537 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
539 def QueryGroups(self, names, fields, use_locking):
540 return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
542 def QueryExports(self, nodes, use_locking):
543 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
545 def QueryClusterInfo(self):
546 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
548 def QueryConfigValues(self, fields):
549 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
551 def QueryTags(self, kind, name):
552 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
554 def QueryLocks(self, fields, sync):
555 warnings.warn("This LUXI call is deprecated and will be removed, use"
556 " Query(\"%s\", ...) instead" % constants.QR_LOCK)
557 return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))