4 # Copyright (C) 2006, 2007, 2011, 2012, 2014 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
42 from ganeti import objects
43 from ganeti import pathutils
46 KEY_METHOD = constants.LUXI_KEY_METHOD
47 KEY_ARGS = constants.LUXI_KEY_ARGS
48 KEY_SUCCESS = constants.LUXI_KEY_SUCCESS
49 KEY_RESULT = constants.LUXI_KEY_RESULT
50 KEY_VERSION = constants.LUXI_KEY_VERSION
52 REQ_SUBMIT_JOB = constants.LUXI_REQ_SUBMIT_JOB
53 REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = constants.LUXI_REQ_SUBMIT_JOB_TO_DRAINED_QUEUE
54 REQ_SUBMIT_MANY_JOBS = constants.LUXI_REQ_SUBMIT_MANY_JOBS
55 REQ_WAIT_FOR_JOB_CHANGE = constants.LUXI_REQ_WAIT_FOR_JOB_CHANGE
56 REQ_CANCEL_JOB = constants.LUXI_REQ_CANCEL_JOB
57 REQ_ARCHIVE_JOB = constants.LUXI_REQ_ARCHIVE_JOB
58 REQ_CHANGE_JOB_PRIORITY = constants.LUXI_REQ_CHANGE_JOB_PRIORITY
59 REQ_AUTO_ARCHIVE_JOBS = constants.LUXI_REQ_AUTO_ARCHIVE_JOBS
60 REQ_QUERY = constants.LUXI_REQ_QUERY
61 REQ_QUERY_FIELDS = constants.LUXI_REQ_QUERY_FIELDS
62 REQ_QUERY_JOBS = constants.LUXI_REQ_QUERY_JOBS
63 REQ_QUERY_INSTANCES = constants.LUXI_REQ_QUERY_INSTANCES
64 REQ_QUERY_NODES = constants.LUXI_REQ_QUERY_NODES
65 REQ_QUERY_GROUPS = constants.LUXI_REQ_QUERY_GROUPS
66 REQ_QUERY_NETWORKS = constants.LUXI_REQ_QUERY_NETWORKS
67 REQ_QUERY_EXPORTS = constants.LUXI_REQ_QUERY_EXPORTS
68 REQ_QUERY_CONFIG_VALUES = constants.LUXI_REQ_QUERY_CONFIG_VALUES
69 REQ_QUERY_CLUSTER_INFO = constants.LUXI_REQ_QUERY_CLUSTER_INFO
70 REQ_QUERY_TAGS = constants.LUXI_REQ_QUERY_TAGS
71 REQ_SET_DRAIN_FLAG = constants.LUXI_REQ_SET_DRAIN_FLAG
72 REQ_SET_WATCHER_PAUSE = constants.LUXI_REQ_SET_WATCHER_PAUSE
73 REQ_ALL = constants.LUXI_REQ_ALL
75 DEF_CTMO = constants.LUXI_DEF_CTMO
76 DEF_RWTO = constants.LUXI_DEF_RWTO
77 WFJC_TIMEOUT = constants.LUXI_WFJC_TIMEOUT
80 class ProtocolError(errors.LuxiError):
81 """Denotes an error in the LUXI protocol."""
84 class ConnectionClosedError(ProtocolError):
85 """Connection closed error."""
88 class TimeoutError(ProtocolError):
89 """Operation timeout error."""
92 class RequestError(ProtocolError):
95 This signifies an error in the request format or request handling,
96 but not (e.g.) an error in starting up an instance.
98 Some common conditions that can trigger this exception:
99 - job submission failed because the job data was wrong
100 - query failed because required fields were missing
105 class NoMasterError(ProtocolError):
106 """The master cannot be reached.
108 This means that the master daemon is not running or the socket has
114 class PermissionError(ProtocolError):
115 """Permission denied while connecting to the master socket.
117 This means the user doesn't have the proper rights.
123 """Low-level transport class.
125 This is used on the client side.
127 This could be replace by any other class that provides the same
128 semantics to the Client. This means:
129 - can send messages and receive messages
130 - safe for multithreading
134 def __init__(self, address, timeouts=None):
135 """Constructor for the Client class.
138 - address: a valid address the the used transport class
139 - timeout: a list of timeouts, to be used on connect and read/write
141 There are two timeouts used since we might want to wait for a long
142 time for a response, but the connect timeout should be lower.
144 If not passed, we use a default of 10 and respectively 60 seconds.
146 Note that on reading data, since the timeout applies to an
147 invidual receive, it might be that the total duration is longer
148 than timeout value passed (we make a hard limit at twice the read
152 self.address = address
154 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
156 self._ctimeout, self._rwtimeout = timeouts
160 self._msgs = collections.deque()
163 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
167 utils.Retry(self._Connect, 1.0, self._ctimeout,
168 args=(self.socket, address, self._ctimeout))
169 except utils.RetryTimeout:
170 raise TimeoutError("Connect timed out")
172 self.socket.settimeout(self._rwtimeout)
173 except (socket.error, NoMasterError):
174 if self.socket is not None:
180 def _Connect(sock, address, timeout):
181 sock.settimeout(timeout)
183 sock.connect(address)
184 except socket.timeout, err:
185 raise TimeoutError("Connect timed out: %s" % str(err))
186 except socket.error, err:
187 error_code = err.args[0]
188 if error_code in (errno.ENOENT, errno.ECONNREFUSED):
189 raise NoMasterError(address)
190 elif error_code in (errno.EPERM, errno.EACCES):
191 raise PermissionError(address)
192 elif error_code == errno.EAGAIN:
193 # Server's socket backlog is full at the moment
194 raise utils.RetryAgain()
197 def _CheckSocket(self):
198 """Make sure we are connected.
201 if self.socket is None:
202 raise ProtocolError("Connection is closed")
207 This just sends a message and doesn't wait for the response.
210 if constants.LUXI_EOM in msg:
211 raise ProtocolError("Message terminator found in payload")
215 # TODO: sendall is not guaranteed to send everything
216 self.socket.sendall(msg + constants.LUXI_EOM)
217 except socket.timeout, err:
218 raise TimeoutError("Sending timeout: %s" % str(err))
221 """Try to receive a message from the socket.
223 In case we already have messages queued, we just return from the
224 queue. Otherwise, we try to read data with a _rwtimeout network
225 timeout, and making sure we don't go over 2x_rwtimeout as a global
230 etime = time.time() + self._rwtimeout
231 while not self._msgs:
232 if time.time() > etime:
233 raise TimeoutError("Extended receive timeout")
236 data = self.socket.recv(4096)
237 except socket.timeout, err:
238 raise TimeoutError("Receive timeout: %s" % str(err))
239 except socket.error, err:
240 if err.args and err.args[0] == errno.EAGAIN:
245 raise ConnectionClosedError("Connection closed while reading")
246 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
247 self._buffer = new_msgs.pop()
248 self._msgs.extend(new_msgs)
249 return self._msgs.popleft()
252 """Send a message and wait for the response.
254 This is just a wrapper over Send and Recv.
261 """Close the socket"""
262 if self.socket is not None:
267 def ParseRequest(msg):
268 """Parses a LUXI request message.
272 request = serializer.LoadJson(msg)
273 except ValueError, err:
274 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
276 logging.debug("LUXI request: %s", request)
278 if not isinstance(request, dict):
279 logging.error("LUXI request not a dict: %r", msg)
280 raise ProtocolError("Invalid LUXI request (not a dict)")
282 method = request.get(KEY_METHOD, None) # pylint: disable=E1103
283 args = request.get(KEY_ARGS, None) # pylint: disable=E1103
284 version = request.get(KEY_VERSION, None) # pylint: disable=E1103
286 if method is None or args is None:
287 logging.error("LUXI request missing method or arguments: %r", msg)
288 raise ProtocolError(("Invalid LUXI request (no method or arguments"
289 " in request): %r") % msg)
291 return (method, args, version)
294 def ParseResponse(msg):
295 """Parses a LUXI response message.
300 data = serializer.LoadJson(msg)
301 except KeyboardInterrupt:
303 except Exception, err:
304 raise ProtocolError("Error while deserializing response: %s" % str(err))
307 if not (isinstance(data, dict) and
308 KEY_SUCCESS in data and
310 raise ProtocolError("Invalid response from server: %r" % data)
312 return (data[KEY_SUCCESS], data[KEY_RESULT],
313 data.get(KEY_VERSION, None)) # pylint: disable=E1103
316 def FormatResponse(success, result, version=None):
317 """Formats a LUXI response message.
321 KEY_SUCCESS: success,
325 if version is not None:
326 response[KEY_VERSION] = version
328 logging.debug("LUXI response: %s", response)
330 return serializer.DumpJson(response)
333 def FormatRequest(method, args, version=None):
334 """Formats a LUXI request message.
343 if version is not None:
344 request[KEY_VERSION] = version
346 # Serialize the request
347 return serializer.DumpJson(request)
350 def CallLuxiMethod(transport_cb, method, args, version=None):
351 """Send a LUXI request via a transport and return the response.
354 assert callable(transport_cb)
356 request_msg = FormatRequest(method, args, version=version)
358 # Send request and wait for response
359 response_msg = transport_cb(request_msg)
361 (success, result, resp_version) = ParseResponse(response_msg)
363 # Verify version if there was one in the response
364 if resp_version is not None and resp_version != version:
365 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
366 (version, resp_version))
371 errors.MaybeRaise(result)
372 raise RequestError(result)
375 class Client(object):
376 """High-level client implementation.
378 This uses a backing Transport-like class on top of which it
379 implements data serialization/deserialization.
382 def __init__(self, address=None, timeouts=None, transport=Transport):
383 """Constructor for the Client class.
386 - address: a valid address the the used transport class
387 - timeout: a list of timeouts, to be used on connect and read/write
388 - transport: a Transport-like class
391 If timeout is not passed, the default timeouts of the transport
396 address = pathutils.MASTER_SOCKET
397 self.address = address
398 self.timeouts = timeouts
399 self.transport_class = transport
400 self.transport = None
401 self._InitTransport()
403 def _InitTransport(self):
404 """(Re)initialize the transport if needed.
407 if self.transport is None:
408 self.transport = self.transport_class(self.address,
409 timeouts=self.timeouts)
411 def _CloseTransport(self):
412 """Close the transport, ignoring errors.
415 if self.transport is None:
418 old_transp = self.transport
419 self.transport = None
421 except Exception: # pylint: disable=W0703
424 def _SendMethodCall(self, data):
425 # Send request and wait for response
427 self._InitTransport()
428 return self.transport.Call(data)
430 self._CloseTransport()
434 """Close the underlying connection.
437 self._CloseTransport()
439 def CallMethod(self, method, args):
440 """Send a generic request and return the response.
443 if not isinstance(args, (list, tuple)):
444 raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
445 " expected list, got %s" % type(args))
446 return CallLuxiMethod(self._SendMethodCall, method, args,
447 version=constants.LUXI_VERSION)
449 def SetQueueDrainFlag(self, drain_flag):
450 return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
452 def SetWatcherPause(self, until):
453 return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
455 def SubmitJob(self, ops):
456 ops_state = map(lambda op: op.__getstate__(), ops)
457 return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
459 def SubmitJobToDrainedQueue(self, ops):
460 ops_state = map(lambda op: op.__getstate__(), ops)
461 return self.CallMethod(REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, (ops_state, ))
463 def SubmitManyJobs(self, jobs):
466 jobs_state.append([op.__getstate__() for op in ops])
467 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
470 def _PrepareJobId(request_name, job_id):
474 raise RequestError("Invalid parameter passed to %s as job id: "
475 " expected integer, got value %s" %
476 (request_name, job_id))
478 def CancelJob(self, job_id):
479 job_id = Client._PrepareJobId(REQ_CANCEL_JOB, job_id)
480 return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
482 def ArchiveJob(self, job_id):
483 job_id = Client._PrepareJobId(REQ_ARCHIVE_JOB, job_id)
484 return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
486 def ChangeJobPriority(self, job_id, priority):
487 job_id = Client._PrepareJobId(REQ_CHANGE_JOB_PRIORITY, job_id)
488 return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
490 def AutoArchiveJobs(self, age):
491 timeout = (DEF_RWTO - 1) / 2
492 return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
494 def WaitForJobChangeOnce(self, job_id, fields,
495 prev_job_info, prev_log_serial,
496 timeout=WFJC_TIMEOUT):
497 """Waits for changes on a job.
499 @param job_id: Job ID
501 @param fields: List of field names to be observed
502 @type prev_job_info: None or list
503 @param prev_job_info: Previously received job information
504 @type prev_log_serial: None or int/long
505 @param prev_log_serial: Highest log serial number previously received
506 @type timeout: int/float
507 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
508 be capped to that value)
511 assert timeout >= 0, "Timeout can not be negative"
512 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
513 (job_id, fields, prev_job_info,
515 min(WFJC_TIMEOUT, timeout)))
517 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
518 job_id = Client._PrepareJobId(REQ_WAIT_FOR_JOB_CHANGE, job_id)
520 result = self.WaitForJobChangeOnce(job_id, fields,
521 prev_job_info, prev_log_serial)
522 if result != constants.JOB_NOTCHANGED:
526 def Query(self, what, fields, qfilter):
527 """Query for resources/items.
529 @param what: One of L{constants.QR_VIA_LUXI}
530 @type fields: List of strings
531 @param fields: List of requested fields
532 @type qfilter: None or list
533 @param qfilter: Query filter
534 @rtype: L{objects.QueryResponse}
537 result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
538 return objects.QueryResponse.FromDict(result)
540 def QueryFields(self, what, fields):
541 """Query for available fields.
543 @param what: One of L{constants.QR_VIA_LUXI}
544 @type fields: None or list of strings
545 @param fields: List of requested fields
546 @rtype: L{objects.QueryFieldsResponse}
549 result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
550 return objects.QueryFieldsResponse.FromDict(result)
552 def QueryJobs(self, job_ids, fields):
553 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
555 def QueryInstances(self, names, fields, use_locking):
556 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
558 def QueryNodes(self, names, fields, use_locking):
559 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
561 def QueryGroups(self, names, fields, use_locking):
562 return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
564 def QueryNetworks(self, names, fields, use_locking):
565 return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
567 def QueryExports(self, nodes, use_locking):
568 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
570 def QueryClusterInfo(self):
571 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
573 def QueryConfigValues(self, fields):
574 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
576 def QueryTags(self, kind, name):
577 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))