4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
46 KEY_SUCCESS = "success"
49 REQ_SUBMIT_JOB = "SubmitJob"
50 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
51 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
52 REQ_CANCEL_JOB = "CancelJob"
53 REQ_ARCHIVE_JOB = "ArchiveJob"
54 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
55 REQ_QUERY_JOBS = "QueryJobs"
56 REQ_QUERY_INSTANCES = "QueryInstances"
57 REQ_QUERY_NODES = "QueryNodes"
58 REQ_QUERY_EXPORTS = "QueryExports"
59 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
60 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
61 REQ_QUERY_TAGS = "QueryTags"
62 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
63 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
68 # WaitForJobChange timeout
69 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
72 class ProtocolError(errors.GenericError):
73 """Denotes an error in the LUXI protocol"""
76 class ConnectionClosedError(ProtocolError):
77 """Connection closed error"""
80 class TimeoutError(ProtocolError):
81 """Operation timeout error"""
84 class RequestError(ProtocolError):
87 This signifies an error in the request format or request handling,
88 but not (e.g.) an error in starting up an instance.
90 Some common conditions that can trigger this exception:
91 - job submission failed because the job data was wrong
92 - query failed because required fields were missing
97 class NoMasterError(ProtocolError):
98 """The master cannot be reached
100 This means that the master daemon is not running or the socket has
107 """Low-level transport class.
109 This is used on the client side.
111 This could be replace by any other class that provides the same
112 semantics to the Client. This means:
113 - can send messages and receive messages
114 - safe for multithreading
118 def __init__(self, address, timeouts=None):
119 """Constructor for the Client class.
122 - address: a valid address the the used transport class
123 - timeout: a list of timeouts, to be used on connect and read/write
125 There are two timeouts used since we might want to wait for a long
126 time for a response, but the connect timeout should be lower.
128 If not passed, we use a default of 10 and respectively 60 seconds.
130 Note that on reading data, since the timeout applies to an
131 invidual receive, it might be that the total duration is longer
132 than timeout value passed (we make a hard limit at twice the read
136 self.address = address
138 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
140 self._ctimeout, self._rwtimeout = timeouts
144 self._msgs = collections.deque()
147 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
151 utils.Retry(self._Connect, 1.0, self._ctimeout,
152 args=(self.socket, address, self._ctimeout))
153 except utils.RetryTimeout:
154 raise TimeoutError("Connect timed out")
156 self.socket.settimeout(self._rwtimeout)
157 except (socket.error, NoMasterError):
158 if self.socket is not None:
164 def _Connect(sock, address, timeout):
165 sock.settimeout(timeout)
167 sock.connect(address)
168 except socket.timeout, err:
169 raise TimeoutError("Connect timed out: %s" % str(err))
170 except socket.error, err:
171 if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
172 raise NoMasterError(address)
173 if err.args[0] == errno.EAGAIN:
174 # Server's socket backlog is full at the moment
175 raise utils.RetryAgain()
178 def _CheckSocket(self):
179 """Make sure we are connected.
182 if self.socket is None:
183 raise ProtocolError("Connection is closed")
188 This just sends a message and doesn't wait for the response.
191 if constants.LUXI_EOM in msg:
192 raise ProtocolError("Message terminator found in payload")
196 # TODO: sendall is not guaranteed to send everything
197 self.socket.sendall(msg + constants.LUXI_EOM)
198 except socket.timeout, err:
199 raise TimeoutError("Sending timeout: %s" % str(err))
202 """Try to receive a message from the socket.
204 In case we already have messages queued, we just return from the
205 queue. Otherwise, we try to read data with a _rwtimeout network
206 timeout, and making sure we don't go over 2x_rwtimeout as a global
211 etime = time.time() + self._rwtimeout
212 while not self._msgs:
213 if time.time() > etime:
214 raise TimeoutError("Extended receive timeout")
217 data = self.socket.recv(4096)
218 except socket.error, err:
219 if err.args and err.args[0] == errno.EAGAIN:
222 except socket.timeout, err:
223 raise TimeoutError("Receive timeout: %s" % str(err))
226 raise ConnectionClosedError("Connection closed while reading")
227 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
228 self._buffer = new_msgs.pop()
229 self._msgs.extend(new_msgs)
230 return self._msgs.popleft()
233 """Send a message and wait for the response.
235 This is just a wrapper over Send and Recv.
242 """Close the socket"""
243 if self.socket is not None:
248 def ParseRequest(msg):
249 """Parses a LUXI request message.
253 request = serializer.LoadJson(msg)
254 except ValueError, err:
255 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
257 logging.debug("LUXI request: %s", request)
259 if not isinstance(request, dict):
260 logging.error("LUXI request not a dict: %r", msg)
261 raise ProtocolError("Invalid LUXI request (not a dict)")
263 method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
264 args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
266 if method is None or args is None:
267 logging.error("LUXI request missing method or arguments: %r", msg)
268 raise ProtocolError(("Invalid LUXI request (no method or arguments"
269 " in request): %r") % msg)
271 return (method, args)
274 def ParseResponse(msg):
275 """Parses a LUXI response message.
280 data = serializer.LoadJson(msg)
281 except Exception, err:
282 raise ProtocolError("Error while deserializing response: %s" % str(err))
285 if not (isinstance(data, dict) and
286 KEY_SUCCESS in data and
288 raise ProtocolError("Invalid response from server: %r" % data)
290 return (data[KEY_SUCCESS], data[KEY_RESULT])
293 def FormatResponse(success, result):
294 """Formats a LUXI response message.
298 KEY_SUCCESS: success,
302 logging.debug("LUXI response: %s", response)
304 return serializer.DumpJson(response)
307 def FormatRequest(method, args):
308 """Formats a LUXI request message.
317 # Serialize the request
318 return serializer.DumpJson(request, indent=False)
321 def CallLuxiMethod(transport_cb, method, args):
322 """Send a LUXI request via a transport and return the response.
325 assert callable(transport_cb)
327 request_msg = FormatRequest(method, args)
329 # Send request and wait for response
330 response_msg = transport_cb(request_msg)
332 (success, result) = ParseResponse(response_msg)
337 errors.MaybeRaise(result)
338 raise RequestError(result)
341 class Client(object):
342 """High-level client implementation.
344 This uses a backing Transport-like class on top of which it
345 implements data serialization/deserialization.
348 def __init__(self, address=None, timeouts=None, transport=Transport):
349 """Constructor for the Client class.
352 - address: a valid address the the used transport class
353 - timeout: a list of timeouts, to be used on connect and read/write
354 - transport: a Transport-like class
357 If timeout is not passed, the default timeouts of the transport
362 address = constants.MASTER_SOCKET
363 self.address = address
364 self.timeouts = timeouts
365 self.transport_class = transport
366 self.transport = None
367 self._InitTransport()
369 def _InitTransport(self):
370 """(Re)initialize the transport if needed.
373 if self.transport is None:
374 self.transport = self.transport_class(self.address,
375 timeouts=self.timeouts)
377 def _CloseTransport(self):
378 """Close the transport, ignoring errors.
381 if self.transport is None:
384 old_transp = self.transport
385 self.transport = None
387 except Exception: # pylint: disable-msg=W0703
390 def _SendMethodCall(self, data):
391 # Send request and wait for response
393 self._InitTransport()
394 return self.transport.Call(data)
396 self._CloseTransport()
399 def CallMethod(self, method, args):
400 """Send a generic request and return the response.
403 return CallLuxiMethod(self._SendMethodCall, method, args)
405 def SetQueueDrainFlag(self, drain_flag):
406 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
408 def SetWatcherPause(self, until):
409 return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
411 def SubmitJob(self, ops):
412 ops_state = map(lambda op: op.__getstate__(), ops)
413 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
415 def SubmitManyJobs(self, jobs):
418 jobs_state.append([op.__getstate__() for op in ops])
419 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
421 def CancelJob(self, job_id):
422 return self.CallMethod(REQ_CANCEL_JOB, job_id)
424 def ArchiveJob(self, job_id):
425 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
427 def AutoArchiveJobs(self, age):
428 timeout = (DEF_RWTO - 1) / 2
429 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
431 def WaitForJobChangeOnce(self, job_id, fields,
432 prev_job_info, prev_log_serial,
433 timeout=WFJC_TIMEOUT):
434 """Waits for changes on a job.
436 @param job_id: Job ID
438 @param fields: List of field names to be observed
439 @type prev_job_info: None or list
440 @param prev_job_info: Previously received job information
441 @type prev_log_serial: None or int/long
442 @param prev_log_serial: Highest log serial number previously received
443 @type timeout: int/float
444 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
445 be capped to that value)
448 assert timeout >= 0, "Timeout can not be negative"
449 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
450 (job_id, fields, prev_job_info,
452 min(WFJC_TIMEOUT, timeout)))
454 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
456 result = self.WaitForJobChangeOnce(job_id, fields,
457 prev_job_info, prev_log_serial)
458 if result != constants.JOB_NOTCHANGED:
462 def QueryJobs(self, job_ids, fields):
463 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
465 def QueryInstances(self, names, fields, use_locking):
466 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
468 def QueryNodes(self, names, fields, use_locking):
469 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
471 def QueryExports(self, nodes, use_locking):
472 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
474 def QueryClusterInfo(self):
475 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
477 def QueryConfigValues(self, fields):
478 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
480 def QueryTags(self, kind, name):
481 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))