4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
39 from ganeti import serializer
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import utils
43 from ganeti import objects
48 KEY_SUCCESS = "success"
50 KEY_VERSION = "version"
52 REQ_SUBMIT_JOB = "SubmitJob"
53 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
54 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
55 REQ_CANCEL_JOB = "CancelJob"
56 REQ_ARCHIVE_JOB = "ArchiveJob"
57 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
59 REQ_QUERY_FIELDS = "QueryFields"
60 REQ_QUERY_JOBS = "QueryJobs"
61 REQ_QUERY_INSTANCES = "QueryInstances"
62 REQ_QUERY_NODES = "QueryNodes"
63 REQ_QUERY_GROUPS = "QueryGroups"
64 REQ_QUERY_EXPORTS = "QueryExports"
65 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
66 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
67 REQ_QUERY_TAGS = "QueryTags"
68 REQ_QUERY_LOCKS = "QueryLocks"
69 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
70 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
75 # WaitForJobChange timeout
76 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
79 class ProtocolError(errors.LuxiError):
80 """Denotes an error in the LUXI protocol."""
83 class ConnectionClosedError(ProtocolError):
84 """Connection closed error."""
87 class TimeoutError(ProtocolError):
88 """Operation timeout error."""
91 class RequestError(ProtocolError):
94 This signifies an error in the request format or request handling,
95 but not (e.g.) an error in starting up an instance.
97 Some common conditions that can trigger this exception:
98 - job submission failed because the job data was wrong
99 - query failed because required fields were missing
104 class NoMasterError(ProtocolError):
105 """The master cannot be reached.
107 This means that the master daemon is not running or the socket has
113 class PermissionError(ProtocolError):
114 """Permission denied while connecting to the master socket.
116 This means the user doesn't have the proper rights.
122 """Low-level transport class.
124 This is used on the client side.
126 This could be replace by any other class that provides the same
127 semantics to the Client. This means:
128 - can send messages and receive messages
129 - safe for multithreading
133 def __init__(self, address, timeouts=None):
134 """Constructor for the Client class.
137 - address: a valid address the the used transport class
138 - timeout: a list of timeouts, to be used on connect and read/write
140 There are two timeouts used since we might want to wait for a long
141 time for a response, but the connect timeout should be lower.
143 If not passed, we use a default of 10 and respectively 60 seconds.
145 Note that on reading data, since the timeout applies to an
146 invidual receive, it might be that the total duration is longer
147 than timeout value passed (we make a hard limit at twice the read
151 self.address = address
153 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
155 self._ctimeout, self._rwtimeout = timeouts
159 self._msgs = collections.deque()
162 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
166 utils.Retry(self._Connect, 1.0, self._ctimeout,
167 args=(self.socket, address, self._ctimeout))
168 except utils.RetryTimeout:
169 raise TimeoutError("Connect timed out")
171 self.socket.settimeout(self._rwtimeout)
172 except (socket.error, NoMasterError):
173 if self.socket is not None:
179 def _Connect(sock, address, timeout):
180 sock.settimeout(timeout)
182 sock.connect(address)
183 except socket.timeout, err:
184 raise TimeoutError("Connect timed out: %s" % str(err))
185 except socket.error, err:
186 error_code = err.args[0]
187 if error_code in (errno.ENOENT, errno.ECONNREFUSED):
188 raise NoMasterError(address)
189 elif error_code in (errno.EPERM, errno.EACCES):
190 raise PermissionError(address)
191 elif error_code == errno.EAGAIN:
192 # Server's socket backlog is full at the moment
193 raise utils.RetryAgain()
196 def _CheckSocket(self):
197 """Make sure we are connected.
200 if self.socket is None:
201 raise ProtocolError("Connection is closed")
206 This just sends a message and doesn't wait for the response.
209 if constants.LUXI_EOM in msg:
210 raise ProtocolError("Message terminator found in payload")
214 # TODO: sendall is not guaranteed to send everything
215 self.socket.sendall(msg + constants.LUXI_EOM)
216 except socket.timeout, err:
217 raise TimeoutError("Sending timeout: %s" % str(err))
220 """Try to receive a message from the socket.
222 In case we already have messages queued, we just return from the
223 queue. Otherwise, we try to read data with a _rwtimeout network
224 timeout, and making sure we don't go over 2x_rwtimeout as a global
229 etime = time.time() + self._rwtimeout
230 while not self._msgs:
231 if time.time() > etime:
232 raise TimeoutError("Extended receive timeout")
235 data = self.socket.recv(4096)
236 except socket.timeout, err:
237 raise TimeoutError("Receive timeout: %s" % str(err))
238 except socket.error, err:
239 if err.args and err.args[0] == errno.EAGAIN:
244 raise ConnectionClosedError("Connection closed while reading")
245 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
246 self._buffer = new_msgs.pop()
247 self._msgs.extend(new_msgs)
248 return self._msgs.popleft()
251 """Send a message and wait for the response.
253 This is just a wrapper over Send and Recv.
260 """Close the socket"""
261 if self.socket is not None:
266 def ParseRequest(msg):
267 """Parses a LUXI request message.
271 request = serializer.LoadJson(msg)
272 except ValueError, err:
273 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
275 logging.debug("LUXI request: %s", request)
277 if not isinstance(request, dict):
278 logging.error("LUXI request not a dict: %r", msg)
279 raise ProtocolError("Invalid LUXI request (not a dict)")
281 method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
282 args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
283 version = request.get(KEY_VERSION, None) # pylint: disable-msg=E1103
285 if method is None or args is None:
286 logging.error("LUXI request missing method or arguments: %r", msg)
287 raise ProtocolError(("Invalid LUXI request (no method or arguments"
288 " in request): %r") % msg)
290 return (method, args, version)
293 def ParseResponse(msg):
294 """Parses a LUXI response message.
299 data = serializer.LoadJson(msg)
300 except Exception, err:
301 raise ProtocolError("Error while deserializing response: %s" % str(err))
304 if not (isinstance(data, dict) and
305 KEY_SUCCESS in data and
307 raise ProtocolError("Invalid response from server: %r" % data)
309 return (data[KEY_SUCCESS], data[KEY_RESULT],
310 data.get(KEY_VERSION, None)) # pylint: disable-msg=E1103
313 def FormatResponse(success, result, version=None):
314 """Formats a LUXI response message.
318 KEY_SUCCESS: success,
322 if version is not None:
323 response[KEY_VERSION] = version
325 logging.debug("LUXI response: %s", response)
327 return serializer.DumpJson(response)
330 def FormatRequest(method, args, version=None):
331 """Formats a LUXI request message.
340 if version is not None:
341 request[KEY_VERSION] = version
343 # Serialize the request
344 return serializer.DumpJson(request, indent=False)
347 def CallLuxiMethod(transport_cb, method, args, version=None):
348 """Send a LUXI request via a transport and return the response.
351 assert callable(transport_cb)
353 request_msg = FormatRequest(method, args, version=version)
355 # Send request and wait for response
356 response_msg = transport_cb(request_msg)
358 (success, result, resp_version) = ParseResponse(response_msg)
360 # Verify version if there was one in the response
361 if resp_version is not None and resp_version != version:
362 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
363 (version, resp_version))
368 errors.MaybeRaise(result)
369 raise RequestError(result)
372 class Client(object):
373 """High-level client implementation.
375 This uses a backing Transport-like class on top of which it
376 implements data serialization/deserialization.
379 def __init__(self, address=None, timeouts=None, transport=Transport):
380 """Constructor for the Client class.
383 - address: a valid address the the used transport class
384 - timeout: a list of timeouts, to be used on connect and read/write
385 - transport: a Transport-like class
388 If timeout is not passed, the default timeouts of the transport
393 address = constants.MASTER_SOCKET
394 self.address = address
395 self.timeouts = timeouts
396 self.transport_class = transport
397 self.transport = None
398 self._InitTransport()
400 def _InitTransport(self):
401 """(Re)initialize the transport if needed.
404 if self.transport is None:
405 self.transport = self.transport_class(self.address,
406 timeouts=self.timeouts)
408 def _CloseTransport(self):
409 """Close the transport, ignoring errors.
412 if self.transport is None:
415 old_transp = self.transport
416 self.transport = None
418 except Exception: # pylint: disable-msg=W0703
421 def _SendMethodCall(self, data):
422 # Send request and wait for response
424 self._InitTransport()
425 return self.transport.Call(data)
427 self._CloseTransport()
431 """Close the underlying connection.
434 self._CloseTransport()
436 def CallMethod(self, method, args):
437 """Send a generic request and return the response.
440 return CallLuxiMethod(self._SendMethodCall, method, args,
441 version=constants.LUXI_VERSION)
443 def SetQueueDrainFlag(self, drain_flag):
444 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
446 def SetWatcherPause(self, until):
447 return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
449 def SubmitJob(self, ops):
450 ops_state = map(lambda op: op.__getstate__(), ops)
451 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
453 def SubmitManyJobs(self, jobs):
456 jobs_state.append([op.__getstate__() for op in ops])
457 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
459 def CancelJob(self, job_id):
460 return self.CallMethod(REQ_CANCEL_JOB, job_id)
462 def ArchiveJob(self, job_id):
463 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
465 def AutoArchiveJobs(self, age):
466 timeout = (DEF_RWTO - 1) / 2
467 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
469 def WaitForJobChangeOnce(self, job_id, fields,
470 prev_job_info, prev_log_serial,
471 timeout=WFJC_TIMEOUT):
472 """Waits for changes on a job.
474 @param job_id: Job ID
476 @param fields: List of field names to be observed
477 @type prev_job_info: None or list
478 @param prev_job_info: Previously received job information
479 @type prev_log_serial: None or int/long
480 @param prev_log_serial: Highest log serial number previously received
481 @type timeout: int/float
482 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
483 be capped to that value)
486 assert timeout >= 0, "Timeout can not be negative"
487 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
488 (job_id, fields, prev_job_info,
490 min(WFJC_TIMEOUT, timeout)))
492 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
494 result = self.WaitForJobChangeOnce(job_id, fields,
495 prev_job_info, prev_log_serial)
496 if result != constants.JOB_NOTCHANGED:
500 def Query(self, what, fields, filter_):
501 """Query for resources/items.
503 @param what: One of L{constants.QR_OP_LUXI}
504 @type fields: List of strings
505 @param fields: List of requested fields
506 @type filter_: None or list
507 @param filter_: Query filter
508 @rtype: L{objects.QueryResponse}
511 req = objects.QueryRequest(what=what, fields=fields, filter=filter_)
512 result = self.CallMethod(REQ_QUERY, req.ToDict())
513 return objects.QueryResponse.FromDict(result)
515 def QueryFields(self, what, fields):
516 """Query for available fields.
518 @param what: One of L{constants.QR_OP_LUXI}
519 @type fields: None or list of strings
520 @param fields: List of requested fields
521 @rtype: L{objects.QueryFieldsResponse}
524 req = objects.QueryFieldsRequest(what=what, fields=fields)
525 result = self.CallMethod(REQ_QUERY_FIELDS, req.ToDict())
526 return objects.QueryFieldsResponse.FromDict(result)
528 def QueryJobs(self, job_ids, fields):
529 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
531 def QueryInstances(self, names, fields, use_locking):
532 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
534 def QueryNodes(self, names, fields, use_locking):
535 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
537 def QueryGroups(self, names, fields, use_locking):
538 return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
540 def QueryExports(self, nodes, use_locking):
541 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
543 def QueryClusterInfo(self):
544 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
546 def QueryConfigValues(self, fields):
547 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
549 def QueryTags(self, kind, name):
550 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
552 def QueryLocks(self, fields, sync):
553 warnings.warn("This LUXI call is deprecated and will be removed, use"
554 " Query(\"%s\", ...) instead" % constants.QR_LOCK)
555 return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))