4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
46 KEY_SUCCESS = "success"
48 KEY_VERSION = "version"
50 REQ_SUBMIT_JOB = "SubmitJob"
51 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
52 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
53 REQ_CANCEL_JOB = "CancelJob"
54 REQ_ARCHIVE_JOB = "ArchiveJob"
55 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
56 REQ_QUERY_JOBS = "QueryJobs"
57 REQ_QUERY_INSTANCES = "QueryInstances"
58 REQ_QUERY_NODES = "QueryNodes"
59 REQ_QUERY_GROUPS = "QueryGroups"
60 REQ_QUERY_EXPORTS = "QueryExports"
61 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
62 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
63 REQ_QUERY_TAGS = "QueryTags"
64 REQ_QUERY_LOCKS = "QueryLocks"
65 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
66 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
71 # WaitForJobChange timeout
72 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
75 class ProtocolError(errors.LuxiError):
76 """Denotes an error in the LUXI protocol."""
79 class ConnectionClosedError(ProtocolError):
80 """Connection closed error."""
83 class TimeoutError(ProtocolError):
84 """Operation timeout error."""
87 class RequestError(ProtocolError):
90 This signifies an error in the request format or request handling,
91 but not (e.g.) an error in starting up an instance.
93 Some common conditions that can trigger this exception:
94 - job submission failed because the job data was wrong
95 - query failed because required fields were missing
100 class NoMasterError(ProtocolError):
101 """The master cannot be reached.
103 This means that the master daemon is not running or the socket has
109 class PermissionError(ProtocolError):
110 """Permission denied while connecting to the master socket.
112 This means the user doesn't have the proper rights.
118 """Low-level transport class.
120 This is used on the client side.
122 This could be replace by any other class that provides the same
123 semantics to the Client. This means:
124 - can send messages and receive messages
125 - safe for multithreading
129 def __init__(self, address, timeouts=None):
130 """Constructor for the Client class.
133 - address: a valid address the the used transport class
134 - timeout: a list of timeouts, to be used on connect and read/write
136 There are two timeouts used since we might want to wait for a long
137 time for a response, but the connect timeout should be lower.
139 If not passed, we use a default of 10 and respectively 60 seconds.
141 Note that on reading data, since the timeout applies to an
142 invidual receive, it might be that the total duration is longer
143 than timeout value passed (we make a hard limit at twice the read
147 self.address = address
149 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
151 self._ctimeout, self._rwtimeout = timeouts
155 self._msgs = collections.deque()
158 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
162 utils.Retry(self._Connect, 1.0, self._ctimeout,
163 args=(self.socket, address, self._ctimeout))
164 except utils.RetryTimeout:
165 raise TimeoutError("Connect timed out")
167 self.socket.settimeout(self._rwtimeout)
168 except (socket.error, NoMasterError):
169 if self.socket is not None:
175 def _Connect(sock, address, timeout):
176 sock.settimeout(timeout)
178 sock.connect(address)
179 except socket.timeout, err:
180 raise TimeoutError("Connect timed out: %s" % str(err))
181 except socket.error, err:
182 error_code = err.args[0]
183 if error_code in (errno.ENOENT, errno.ECONNREFUSED):
184 raise NoMasterError(address)
185 elif error_code in (errno.EPERM, errno.EACCES):
186 raise PermissionError(address)
187 elif error_code == errno.EAGAIN:
188 # Server's socket backlog is full at the moment
189 raise utils.RetryAgain()
192 def _CheckSocket(self):
193 """Make sure we are connected.
196 if self.socket is None:
197 raise ProtocolError("Connection is closed")
202 This just sends a message and doesn't wait for the response.
205 if constants.LUXI_EOM in msg:
206 raise ProtocolError("Message terminator found in payload")
210 # TODO: sendall is not guaranteed to send everything
211 self.socket.sendall(msg + constants.LUXI_EOM)
212 except socket.timeout, err:
213 raise TimeoutError("Sending timeout: %s" % str(err))
216 """Try to receive a message from the socket.
218 In case we already have messages queued, we just return from the
219 queue. Otherwise, we try to read data with a _rwtimeout network
220 timeout, and making sure we don't go over 2x_rwtimeout as a global
225 etime = time.time() + self._rwtimeout
226 while not self._msgs:
227 if time.time() > etime:
228 raise TimeoutError("Extended receive timeout")
231 data = self.socket.recv(4096)
232 except socket.error, err:
233 if err.args and err.args[0] == errno.EAGAIN:
236 except socket.timeout, err:
237 raise TimeoutError("Receive timeout: %s" % str(err))
240 raise ConnectionClosedError("Connection closed while reading")
241 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
242 self._buffer = new_msgs.pop()
243 self._msgs.extend(new_msgs)
244 return self._msgs.popleft()
247 """Send a message and wait for the response.
249 This is just a wrapper over Send and Recv.
256 """Close the socket"""
257 if self.socket is not None:
262 def ParseRequest(msg):
263 """Parses a LUXI request message.
267 request = serializer.LoadJson(msg)
268 except ValueError, err:
269 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
271 logging.debug("LUXI request: %s", request)
273 if not isinstance(request, dict):
274 logging.error("LUXI request not a dict: %r", msg)
275 raise ProtocolError("Invalid LUXI request (not a dict)")
277 method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
278 args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
279 version = request.get(KEY_VERSION, None) # pylint: disable-msg=E1103
281 if method is None or args is None:
282 logging.error("LUXI request missing method or arguments: %r", msg)
283 raise ProtocolError(("Invalid LUXI request (no method or arguments"
284 " in request): %r") % msg)
286 return (method, args, version)
289 def ParseResponse(msg):
290 """Parses a LUXI response message.
295 data = serializer.LoadJson(msg)
296 except Exception, err:
297 raise ProtocolError("Error while deserializing response: %s" % str(err))
300 if not (isinstance(data, dict) and
301 KEY_SUCCESS in data and
303 raise ProtocolError("Invalid response from server: %r" % data)
305 return (data[KEY_SUCCESS], data[KEY_RESULT],
306 data.get(KEY_VERSION, None)) # pylint: disable-msg=E1103
309 def FormatResponse(success, result, version=None):
310 """Formats a LUXI response message.
314 KEY_SUCCESS: success,
318 if version is not None:
319 response[KEY_VERSION] = version
321 logging.debug("LUXI response: %s", response)
323 return serializer.DumpJson(response)
326 def FormatRequest(method, args, version=None):
327 """Formats a LUXI request message.
336 if version is not None:
337 request[KEY_VERSION] = version
339 # Serialize the request
340 return serializer.DumpJson(request, indent=False)
343 def CallLuxiMethod(transport_cb, method, args, version=None):
344 """Send a LUXI request via a transport and return the response.
347 assert callable(transport_cb)
349 request_msg = FormatRequest(method, args, version=version)
351 # Send request and wait for response
352 response_msg = transport_cb(request_msg)
354 (success, result, resp_version) = ParseResponse(response_msg)
356 # Verify version if there was one in the response
357 if resp_version is not None and resp_version != version:
358 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
359 (version, resp_version))
364 errors.MaybeRaise(result)
365 raise RequestError(result)
368 class Client(object):
369 """High-level client implementation.
371 This uses a backing Transport-like class on top of which it
372 implements data serialization/deserialization.
375 def __init__(self, address=None, timeouts=None, transport=Transport):
376 """Constructor for the Client class.
379 - address: a valid address the the used transport class
380 - timeout: a list of timeouts, to be used on connect and read/write
381 - transport: a Transport-like class
384 If timeout is not passed, the default timeouts of the transport
389 address = constants.MASTER_SOCKET
390 self.address = address
391 self.timeouts = timeouts
392 self.transport_class = transport
393 self.transport = None
394 self._InitTransport()
396 def _InitTransport(self):
397 """(Re)initialize the transport if needed.
400 if self.transport is None:
401 self.transport = self.transport_class(self.address,
402 timeouts=self.timeouts)
404 def _CloseTransport(self):
405 """Close the transport, ignoring errors.
408 if self.transport is None:
411 old_transp = self.transport
412 self.transport = None
414 except Exception: # pylint: disable-msg=W0703
417 def _SendMethodCall(self, data):
418 # Send request and wait for response
420 self._InitTransport()
421 return self.transport.Call(data)
423 self._CloseTransport()
426 def CallMethod(self, method, args):
427 """Send a generic request and return the response.
430 return CallLuxiMethod(self._SendMethodCall, method, args,
431 version=constants.LUXI_VERSION)
433 def SetQueueDrainFlag(self, drain_flag):
434 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
436 def SetWatcherPause(self, until):
437 return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
439 def SubmitJob(self, ops):
440 ops_state = map(lambda op: op.__getstate__(), ops)
441 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
443 def SubmitManyJobs(self, jobs):
446 jobs_state.append([op.__getstate__() for op in ops])
447 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
449 def CancelJob(self, job_id):
450 return self.CallMethod(REQ_CANCEL_JOB, job_id)
452 def ArchiveJob(self, job_id):
453 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
455 def AutoArchiveJobs(self, age):
456 timeout = (DEF_RWTO - 1) / 2
457 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
459 def WaitForJobChangeOnce(self, job_id, fields,
460 prev_job_info, prev_log_serial,
461 timeout=WFJC_TIMEOUT):
462 """Waits for changes on a job.
464 @param job_id: Job ID
466 @param fields: List of field names to be observed
467 @type prev_job_info: None or list
468 @param prev_job_info: Previously received job information
469 @type prev_log_serial: None or int/long
470 @param prev_log_serial: Highest log serial number previously received
471 @type timeout: int/float
472 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
473 be capped to that value)
476 assert timeout >= 0, "Timeout can not be negative"
477 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
478 (job_id, fields, prev_job_info,
480 min(WFJC_TIMEOUT, timeout)))
482 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
484 result = self.WaitForJobChangeOnce(job_id, fields,
485 prev_job_info, prev_log_serial)
486 if result != constants.JOB_NOTCHANGED:
490 def QueryJobs(self, job_ids, fields):
491 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
493 def QueryInstances(self, names, fields, use_locking):
494 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
496 def QueryNodes(self, names, fields, use_locking):
497 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
499 def QueryGroups(self, names, fields, use_locking):
500 return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
502 def QueryExports(self, nodes, use_locking):
503 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
505 def QueryClusterInfo(self):
506 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
508 def QueryConfigValues(self, fields):
509 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
511 def QueryTags(self, kind, name):
512 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
514 def QueryLocks(self, fields, sync):
515 return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))