4 # Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
42 from ganeti import objects
43 from ganeti import pathutils
48 KEY_SUCCESS = "success"
50 KEY_VERSION = "version"
52 REQ_SUBMIT_JOB = "SubmitJob"
53 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
54 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
55 REQ_CANCEL_JOB = "CancelJob"
56 REQ_ARCHIVE_JOB = "ArchiveJob"
57 REQ_CHANGE_JOB_PRIORITY = "ChangeJobPriority"
58 REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
60 REQ_QUERY_FIELDS = "QueryFields"
61 REQ_QUERY_JOBS = "QueryJobs"
62 REQ_QUERY_INSTANCES = "QueryInstances"
63 REQ_QUERY_NODES = "QueryNodes"
64 REQ_QUERY_GROUPS = "QueryGroups"
65 REQ_QUERY_NETWORKS = "QueryNetworks"
66 REQ_QUERY_EXPORTS = "QueryExports"
67 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
68 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
69 REQ_QUERY_TAGS = "QueryTags"
70 REQ_SET_DRAIN_FLAG = "SetDrainFlag"
71 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
73 #: List of all LUXI requests
76 REQ_AUTO_ARCHIVE_JOBS,
78 REQ_CHANGE_JOB_PRIORITY,
80 REQ_QUERY_CLUSTER_INFO,
81 REQ_QUERY_CONFIG_VALUES,
90 REQ_SET_WATCHER_PAUSE,
93 REQ_WAIT_FOR_JOB_CHANGE,
99 # WaitForJobChange timeout
100 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
103 class ProtocolError(errors.LuxiError):
104 """Denotes an error in the LUXI protocol."""
107 class ConnectionClosedError(ProtocolError):
108 """Connection closed error."""
111 class TimeoutError(ProtocolError):
112 """Operation timeout error."""
115 class RequestError(ProtocolError):
118 This signifies an error in the request format or request handling,
119 but not (e.g.) an error in starting up an instance.
121 Some common conditions that can trigger this exception:
122 - job submission failed because the job data was wrong
123 - query failed because required fields were missing
128 class NoMasterError(ProtocolError):
129 """The master cannot be reached.
131 This means that the master daemon is not running or the socket has
137 class PermissionError(ProtocolError):
138 """Permission denied while connecting to the master socket.
140 This means the user doesn't have the proper rights.
146 """Low-level transport class.
148 This is used on the client side.
150 This could be replace by any other class that provides the same
151 semantics to the Client. This means:
152 - can send messages and receive messages
153 - safe for multithreading
157 def __init__(self, address, timeouts=None):
158 """Constructor for the Client class.
161 - address: a valid address the the used transport class
162 - timeout: a list of timeouts, to be used on connect and read/write
164 There are two timeouts used since we might want to wait for a long
165 time for a response, but the connect timeout should be lower.
167 If not passed, we use a default of 10 and respectively 60 seconds.
169 Note that on reading data, since the timeout applies to an
170 invidual receive, it might be that the total duration is longer
171 than timeout value passed (we make a hard limit at twice the read
175 self.address = address
177 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
179 self._ctimeout, self._rwtimeout = timeouts
183 self._msgs = collections.deque()
186 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
190 utils.Retry(self._Connect, 1.0, self._ctimeout,
191 args=(self.socket, address, self._ctimeout))
192 except utils.RetryTimeout:
193 raise TimeoutError("Connect timed out")
195 self.socket.settimeout(self._rwtimeout)
196 except (socket.error, NoMasterError):
197 if self.socket is not None:
203 def _Connect(sock, address, timeout):
204 sock.settimeout(timeout)
206 sock.connect(address)
207 except socket.timeout, err:
208 raise TimeoutError("Connect timed out: %s" % str(err))
209 except socket.error, err:
210 error_code = err.args[0]
211 if error_code in (errno.ENOENT, errno.ECONNREFUSED):
212 raise NoMasterError(address)
213 elif error_code in (errno.EPERM, errno.EACCES):
214 raise PermissionError(address)
215 elif error_code == errno.EAGAIN:
216 # Server's socket backlog is full at the moment
217 raise utils.RetryAgain()
220 def _CheckSocket(self):
221 """Make sure we are connected.
224 if self.socket is None:
225 raise ProtocolError("Connection is closed")
230 This just sends a message and doesn't wait for the response.
233 if constants.LUXI_EOM in msg:
234 raise ProtocolError("Message terminator found in payload")
238 # TODO: sendall is not guaranteed to send everything
239 self.socket.sendall(msg + constants.LUXI_EOM)
240 except socket.timeout, err:
241 raise TimeoutError("Sending timeout: %s" % str(err))
244 """Try to receive a message from the socket.
246 In case we already have messages queued, we just return from the
247 queue. Otherwise, we try to read data with a _rwtimeout network
248 timeout, and making sure we don't go over 2x_rwtimeout as a global
253 etime = time.time() + self._rwtimeout
254 while not self._msgs:
255 if time.time() > etime:
256 raise TimeoutError("Extended receive timeout")
259 data = self.socket.recv(4096)
260 except socket.timeout, err:
261 raise TimeoutError("Receive timeout: %s" % str(err))
262 except socket.error, err:
263 if err.args and err.args[0] == errno.EAGAIN:
268 raise ConnectionClosedError("Connection closed while reading")
269 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
270 self._buffer = new_msgs.pop()
271 self._msgs.extend(new_msgs)
272 return self._msgs.popleft()
275 """Send a message and wait for the response.
277 This is just a wrapper over Send and Recv.
284 """Close the socket"""
285 if self.socket is not None:
290 def ParseRequest(msg):
291 """Parses a LUXI request message.
295 request = serializer.LoadJson(msg)
296 except ValueError, err:
297 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
299 logging.debug("LUXI request: %s", request)
301 if not isinstance(request, dict):
302 logging.error("LUXI request not a dict: %r", msg)
303 raise ProtocolError("Invalid LUXI request (not a dict)")
305 method = request.get(KEY_METHOD, None) # pylint: disable=E1103
306 args = request.get(KEY_ARGS, None) # pylint: disable=E1103
307 version = request.get(KEY_VERSION, None) # pylint: disable=E1103
309 if method is None or args is None:
310 logging.error("LUXI request missing method or arguments: %r", msg)
311 raise ProtocolError(("Invalid LUXI request (no method or arguments"
312 " in request): %r") % msg)
314 return (method, args, version)
317 def ParseResponse(msg):
318 """Parses a LUXI response message.
323 data = serializer.LoadJson(msg)
324 except KeyboardInterrupt:
326 except Exception, err:
327 raise ProtocolError("Error while deserializing response: %s" % str(err))
330 if not (isinstance(data, dict) and
331 KEY_SUCCESS in data and
333 raise ProtocolError("Invalid response from server: %r" % data)
335 return (data[KEY_SUCCESS], data[KEY_RESULT],
336 data.get(KEY_VERSION, None)) # pylint: disable=E1103
339 def FormatResponse(success, result, version=None):
340 """Formats a LUXI response message.
344 KEY_SUCCESS: success,
348 if version is not None:
349 response[KEY_VERSION] = version
351 logging.debug("LUXI response: %s", response)
353 return serializer.DumpJson(response)
356 def FormatRequest(method, args, version=None):
357 """Formats a LUXI request message.
366 if version is not None:
367 request[KEY_VERSION] = version
369 # Serialize the request
370 return serializer.DumpJson(request)
373 def CallLuxiMethod(transport_cb, method, args, version=None):
374 """Send a LUXI request via a transport and return the response.
377 assert callable(transport_cb)
379 request_msg = FormatRequest(method, args, version=version)
381 # Send request and wait for response
382 response_msg = transport_cb(request_msg)
384 (success, result, resp_version) = ParseResponse(response_msg)
386 # Verify version if there was one in the response
387 if resp_version is not None and resp_version != version:
388 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
389 (version, resp_version))
394 errors.MaybeRaise(result)
395 raise RequestError(result)
398 class Client(object):
399 """High-level client implementation.
401 This uses a backing Transport-like class on top of which it
402 implements data serialization/deserialization.
405 def __init__(self, address=None, timeouts=None, transport=Transport):
406 """Constructor for the Client class.
409 - address: a valid address the the used transport class
410 - timeout: a list of timeouts, to be used on connect and read/write
411 - transport: a Transport-like class
414 If timeout is not passed, the default timeouts of the transport
419 address = pathutils.MASTER_SOCKET
420 self.address = address
421 self.timeouts = timeouts
422 self.transport_class = transport
423 self.transport = None
424 self._InitTransport()
426 def _InitTransport(self):
427 """(Re)initialize the transport if needed.
430 if self.transport is None:
431 self.transport = self.transport_class(self.address,
432 timeouts=self.timeouts)
434 def _CloseTransport(self):
435 """Close the transport, ignoring errors.
438 if self.transport is None:
441 old_transp = self.transport
442 self.transport = None
444 except Exception: # pylint: disable=W0703
447 def _SendMethodCall(self, data):
448 # Send request and wait for response
450 self._InitTransport()
451 return self.transport.Call(data)
453 self._CloseTransport()
457 """Close the underlying connection.
460 self._CloseTransport()
462 def CallMethod(self, method, args):
463 """Send a generic request and return the response.
466 if not isinstance(args, (list, tuple)):
467 raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
468 " expected list, got %s" % type(args))
469 return CallLuxiMethod(self._SendMethodCall, method, args,
470 version=constants.LUXI_VERSION)
472 def SetQueueDrainFlag(self, drain_flag):
473 return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
475 def SetWatcherPause(self, until):
476 return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
478 def SubmitJob(self, ops):
479 ops_state = map(lambda op: op.__getstate__(), ops)
480 return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
482 def SubmitManyJobs(self, jobs):
485 jobs_state.append([op.__getstate__() for op in ops])
486 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
488 def CancelJob(self, job_id):
489 return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
491 def ArchiveJob(self, job_id):
492 return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
494 def ChangeJobPriority(self, job_id, priority):
495 return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
497 def AutoArchiveJobs(self, age):
498 timeout = (DEF_RWTO - 1) / 2
499 return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
501 def WaitForJobChangeOnce(self, job_id, fields,
502 prev_job_info, prev_log_serial,
503 timeout=WFJC_TIMEOUT):
504 """Waits for changes on a job.
506 @param job_id: Job ID
508 @param fields: List of field names to be observed
509 @type prev_job_info: None or list
510 @param prev_job_info: Previously received job information
511 @type prev_log_serial: None or int/long
512 @param prev_log_serial: Highest log serial number previously received
513 @type timeout: int/float
514 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
515 be capped to that value)
518 assert timeout >= 0, "Timeout can not be negative"
519 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
520 (job_id, fields, prev_job_info,
522 min(WFJC_TIMEOUT, timeout)))
524 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
526 result = self.WaitForJobChangeOnce(job_id, fields,
527 prev_job_info, prev_log_serial)
528 if result != constants.JOB_NOTCHANGED:
532 def Query(self, what, fields, qfilter):
533 """Query for resources/items.
535 @param what: One of L{constants.QR_VIA_LUXI}
536 @type fields: List of strings
537 @param fields: List of requested fields
538 @type qfilter: None or list
539 @param qfilter: Query filter
540 @rtype: L{objects.QueryResponse}
543 result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
544 return objects.QueryResponse.FromDict(result)
546 def QueryFields(self, what, fields):
547 """Query for available fields.
549 @param what: One of L{constants.QR_VIA_LUXI}
550 @type fields: None or list of strings
551 @param fields: List of requested fields
552 @rtype: L{objects.QueryFieldsResponse}
555 result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
556 return objects.QueryFieldsResponse.FromDict(result)
558 def QueryJobs(self, job_ids, fields):
559 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
561 def QueryInstances(self, names, fields, use_locking):
562 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
564 def QueryNodes(self, names, fields, use_locking):
565 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
567 def QueryGroups(self, names, fields, use_locking):
568 return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
570 def QueryNetworks(self, names, fields, use_locking):
571 return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
573 def QueryExports(self, nodes, use_locking):
574 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
576 def QueryClusterInfo(self):
577 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
579 def QueryConfigValues(self, fields):
580 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
582 def QueryTags(self, kind, name):
583 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))