4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
46 KEY_SUCCESS = "success"
48 KEY_VERSION = "version"
50 REQ_SUBMIT_JOB = "SubmitJob"
51 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
52 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
53 REQ_CANCEL_JOB = "CancelJob"
54 REQ_ARCHIVE_JOB = "ArchiveJob"
55 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
56 REQ_QUERY_JOBS = "QueryJobs"
57 REQ_QUERY_INSTANCES = "QueryInstances"
58 REQ_QUERY_NODES = "QueryNodes"
59 REQ_QUERY_EXPORTS = "QueryExports"
60 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
61 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
62 REQ_QUERY_TAGS = "QueryTags"
63 REQ_QUERY_LOCKS = "QueryLocks"
64 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
65 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
70 # WaitForJobChange timeout
71 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
74 class ProtocolError(errors.LuxiError):
75 """Denotes an error in the LUXI protocol."""
78 class ConnectionClosedError(ProtocolError):
79 """Connection closed error."""
82 class TimeoutError(ProtocolError):
83 """Operation timeout error."""
86 class RequestError(ProtocolError):
89 This signifies an error in the request format or request handling,
90 but not (e.g.) an error in starting up an instance.
92 Some common conditions that can trigger this exception:
93 - job submission failed because the job data was wrong
94 - query failed because required fields were missing
99 class NoMasterError(ProtocolError):
100 """The master cannot be reached.
102 This means that the master daemon is not running or the socket has
108 class PermissionError(ProtocolError):
109 """Permission denied while connecting to the master socket.
111 This means the user doesn't have the proper rights.
117 """Low-level transport class.
119 This is used on the client side.
121 This could be replace by any other class that provides the same
122 semantics to the Client. This means:
123 - can send messages and receive messages
124 - safe for multithreading
128 def __init__(self, address, timeouts=None):
129 """Constructor for the Client class.
132 - address: a valid address the the used transport class
133 - timeout: a list of timeouts, to be used on connect and read/write
135 There are two timeouts used since we might want to wait for a long
136 time for a response, but the connect timeout should be lower.
138 If not passed, we use a default of 10 and respectively 60 seconds.
140 Note that on reading data, since the timeout applies to an
141 invidual receive, it might be that the total duration is longer
142 than timeout value passed (we make a hard limit at twice the read
146 self.address = address
148 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
150 self._ctimeout, self._rwtimeout = timeouts
154 self._msgs = collections.deque()
157 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
161 utils.Retry(self._Connect, 1.0, self._ctimeout,
162 args=(self.socket, address, self._ctimeout))
163 except utils.RetryTimeout:
164 raise TimeoutError("Connect timed out")
166 self.socket.settimeout(self._rwtimeout)
167 except (socket.error, NoMasterError):
168 if self.socket is not None:
174 def _Connect(sock, address, timeout):
175 sock.settimeout(timeout)
177 sock.connect(address)
178 except socket.timeout, err:
179 raise TimeoutError("Connect timed out: %s" % str(err))
180 except socket.error, err:
181 error_code = err.args[0]
182 if error_code in (errno.ENOENT, errno.ECONNREFUSED):
183 raise NoMasterError(address)
184 elif error_code in (errno.EPERM, errno.EACCES):
185 raise PermissionError(address)
186 elif error_code == errno.EAGAIN:
187 # Server's socket backlog is full at the moment
188 raise utils.RetryAgain()
191 def _CheckSocket(self):
192 """Make sure we are connected.
195 if self.socket is None:
196 raise ProtocolError("Connection is closed")
201 This just sends a message and doesn't wait for the response.
204 if constants.LUXI_EOM in msg:
205 raise ProtocolError("Message terminator found in payload")
209 # TODO: sendall is not guaranteed to send everything
210 self.socket.sendall(msg + constants.LUXI_EOM)
211 except socket.timeout, err:
212 raise TimeoutError("Sending timeout: %s" % str(err))
215 """Try to receive a message from the socket.
217 In case we already have messages queued, we just return from the
218 queue. Otherwise, we try to read data with a _rwtimeout network
219 timeout, and making sure we don't go over 2x_rwtimeout as a global
224 etime = time.time() + self._rwtimeout
225 while not self._msgs:
226 if time.time() > etime:
227 raise TimeoutError("Extended receive timeout")
230 data = self.socket.recv(4096)
231 except socket.timeout, err:
232 raise TimeoutError("Receive timeout: %s" % str(err))
233 except socket.error, err:
234 if err.args and err.args[0] == errno.EAGAIN:
239 raise ConnectionClosedError("Connection closed while reading")
240 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
241 self._buffer = new_msgs.pop()
242 self._msgs.extend(new_msgs)
243 return self._msgs.popleft()
246 """Send a message and wait for the response.
248 This is just a wrapper over Send and Recv.
255 """Close the socket"""
256 if self.socket is not None:
261 def ParseRequest(msg):
262 """Parses a LUXI request message.
266 request = serializer.LoadJson(msg)
267 except ValueError, err:
268 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
270 logging.debug("LUXI request: %s", request)
272 if not isinstance(request, dict):
273 logging.error("LUXI request not a dict: %r", msg)
274 raise ProtocolError("Invalid LUXI request (not a dict)")
276 method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
277 args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
278 version = request.get(KEY_VERSION, None) # pylint: disable-msg=E1103
280 if method is None or args is None:
281 logging.error("LUXI request missing method or arguments: %r", msg)
282 raise ProtocolError(("Invalid LUXI request (no method or arguments"
283 " in request): %r") % msg)
285 return (method, args, version)
288 def ParseResponse(msg):
289 """Parses a LUXI response message.
294 data = serializer.LoadJson(msg)
295 except Exception, err:
296 raise ProtocolError("Error while deserializing response: %s" % str(err))
299 if not (isinstance(data, dict) and
300 KEY_SUCCESS in data and
302 raise ProtocolError("Invalid response from server: %r" % data)
304 return (data[KEY_SUCCESS], data[KEY_RESULT],
305 data.get(KEY_VERSION, None)) # pylint: disable-msg=E1103
308 def FormatResponse(success, result, version=None):
309 """Formats a LUXI response message.
313 KEY_SUCCESS: success,
317 if version is not None:
318 response[KEY_VERSION] = version
320 logging.debug("LUXI response: %s", response)
322 return serializer.DumpJson(response)
325 def FormatRequest(method, args, version=None):
326 """Formats a LUXI request message.
335 if version is not None:
336 request[KEY_VERSION] = version
338 # Serialize the request
339 return serializer.DumpJson(request, indent=False)
342 def CallLuxiMethod(transport_cb, method, args, version=None):
343 """Send a LUXI request via a transport and return the response.
346 assert callable(transport_cb)
348 request_msg = FormatRequest(method, args, version=version)
350 # Send request and wait for response
351 response_msg = transport_cb(request_msg)
353 (success, result, resp_version) = ParseResponse(response_msg)
355 # Verify version if there was one in the response
356 if resp_version is not None and resp_version != version:
357 raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
358 (version, resp_version))
363 errors.MaybeRaise(result)
364 raise RequestError(result)
367 class Client(object):
368 """High-level client implementation.
370 This uses a backing Transport-like class on top of which it
371 implements data serialization/deserialization.
374 def __init__(self, address=None, timeouts=None, transport=Transport):
375 """Constructor for the Client class.
378 - address: a valid address the the used transport class
379 - timeout: a list of timeouts, to be used on connect and read/write
380 - transport: a Transport-like class
383 If timeout is not passed, the default timeouts of the transport
388 address = constants.MASTER_SOCKET
389 self.address = address
390 self.timeouts = timeouts
391 self.transport_class = transport
392 self.transport = None
393 self._InitTransport()
395 def _InitTransport(self):
396 """(Re)initialize the transport if needed.
399 if self.transport is None:
400 self.transport = self.transport_class(self.address,
401 timeouts=self.timeouts)
403 def _CloseTransport(self):
404 """Close the transport, ignoring errors.
407 if self.transport is None:
410 old_transp = self.transport
411 self.transport = None
413 except Exception: # pylint: disable-msg=W0703
416 def _SendMethodCall(self, data):
417 # Send request and wait for response
419 self._InitTransport()
420 return self.transport.Call(data)
422 self._CloseTransport()
425 def CallMethod(self, method, args):
426 """Send a generic request and return the response.
429 return CallLuxiMethod(self._SendMethodCall, method, args,
430 version=constants.LUXI_VERSION)
432 def SetQueueDrainFlag(self, drain_flag):
433 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
435 def SetWatcherPause(self, until):
436 return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
438 def SubmitJob(self, ops):
439 ops_state = map(lambda op: op.__getstate__(), ops)
440 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
442 def SubmitManyJobs(self, jobs):
445 jobs_state.append([op.__getstate__() for op in ops])
446 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
448 def CancelJob(self, job_id):
449 return self.CallMethod(REQ_CANCEL_JOB, job_id)
451 def ArchiveJob(self, job_id):
452 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
454 def AutoArchiveJobs(self, age):
455 timeout = (DEF_RWTO - 1) / 2
456 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
458 def WaitForJobChangeOnce(self, job_id, fields,
459 prev_job_info, prev_log_serial,
460 timeout=WFJC_TIMEOUT):
461 """Waits for changes on a job.
463 @param job_id: Job ID
465 @param fields: List of field names to be observed
466 @type prev_job_info: None or list
467 @param prev_job_info: Previously received job information
468 @type prev_log_serial: None or int/long
469 @param prev_log_serial: Highest log serial number previously received
470 @type timeout: int/float
471 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
472 be capped to that value)
475 assert timeout >= 0, "Timeout can not be negative"
476 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
477 (job_id, fields, prev_job_info,
479 min(WFJC_TIMEOUT, timeout)))
481 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
483 result = self.WaitForJobChangeOnce(job_id, fields,
484 prev_job_info, prev_log_serial)
485 if result != constants.JOB_NOTCHANGED:
489 def QueryJobs(self, job_ids, fields):
490 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
492 def QueryInstances(self, names, fields, use_locking):
493 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
495 def QueryNodes(self, names, fields, use_locking):
496 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
498 def QueryExports(self, nodes, use_locking):
499 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
501 def QueryClusterInfo(self):
502 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
504 def QueryConfigValues(self, fields):
505 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
507 def QueryTags(self, kind, name):
508 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
510 def QueryLocks(self, fields, sync):
511 return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))