4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
46 KEY_SUCCESS = "success"
49 REQ_SUBMIT_JOB = "SubmitJob"
50 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
51 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
52 REQ_CANCEL_JOB = "CancelJob"
53 REQ_ARCHIVE_JOB = "ArchiveJob"
54 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
55 REQ_QUERY_JOBS = "QueryJobs"
56 REQ_QUERY_INSTANCES = "QueryInstances"
57 REQ_QUERY_NODES = "QueryNodes"
58 REQ_QUERY_EXPORTS = "QueryExports"
59 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
60 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
61 REQ_QUERY_TAGS = "QueryTags"
62 REQ_QUERY_LOCKS = "QueryLocks"
63 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
64 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
69 # WaitForJobChange timeout
70 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
73 class ProtocolError(errors.GenericError):
74 """Denotes an error in the LUXI protocol."""
77 class ConnectionClosedError(ProtocolError):
78 """Connection closed error."""
81 class TimeoutError(ProtocolError):
82 """Operation timeout error."""
85 class RequestError(ProtocolError):
88 This signifies an error in the request format or request handling,
89 but not (e.g.) an error in starting up an instance.
91 Some common conditions that can trigger this exception:
92 - job submission failed because the job data was wrong
93 - query failed because required fields were missing
98 class NoMasterError(ProtocolError):
99 """The master cannot be reached.
101 This means that the master daemon is not running or the socket has
107 class PermissionError(ProtocolError):
108 """Permission denied while connecting to the master socket.
110 This means the user doesn't have the proper rights.
116 """Low-level transport class.
118 This is used on the client side.
120 This could be replace by any other class that provides the same
121 semantics to the Client. This means:
122 - can send messages and receive messages
123 - safe for multithreading
127 def __init__(self, address, timeouts=None):
128 """Constructor for the Client class.
131 - address: a valid address the the used transport class
132 - timeout: a list of timeouts, to be used on connect and read/write
134 There are two timeouts used since we might want to wait for a long
135 time for a response, but the connect timeout should be lower.
137 If not passed, we use a default of 10 and respectively 60 seconds.
139 Note that on reading data, since the timeout applies to an
140 invidual receive, it might be that the total duration is longer
141 than timeout value passed (we make a hard limit at twice the read
145 self.address = address
147 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
149 self._ctimeout, self._rwtimeout = timeouts
153 self._msgs = collections.deque()
156 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
160 utils.Retry(self._Connect, 1.0, self._ctimeout,
161 args=(self.socket, address, self._ctimeout))
162 except utils.RetryTimeout:
163 raise TimeoutError("Connect timed out")
165 self.socket.settimeout(self._rwtimeout)
166 except (socket.error, NoMasterError):
167 if self.socket is not None:
173 def _Connect(sock, address, timeout):
174 sock.settimeout(timeout)
176 sock.connect(address)
177 except socket.timeout, err:
178 raise TimeoutError("Connect timed out: %s" % str(err))
179 except socket.error, err:
180 error_code = err.args[0]
181 if error_code in (errno.ENOENT, errno.ECONNREFUSED):
182 raise NoMasterError(address)
183 elif error_code in (errno.EPERM, errno.EACCES):
184 raise PermissionError(address)
185 elif error_code == errno.EAGAIN:
186 # Server's socket backlog is full at the moment
187 raise utils.RetryAgain()
190 def _CheckSocket(self):
191 """Make sure we are connected.
194 if self.socket is None:
195 raise ProtocolError("Connection is closed")
200 This just sends a message and doesn't wait for the response.
203 if constants.LUXI_EOM in msg:
204 raise ProtocolError("Message terminator found in payload")
208 # TODO: sendall is not guaranteed to send everything
209 self.socket.sendall(msg + constants.LUXI_EOM)
210 except socket.timeout, err:
211 raise TimeoutError("Sending timeout: %s" % str(err))
214 """Try to receive a message from the socket.
216 In case we already have messages queued, we just return from the
217 queue. Otherwise, we try to read data with a _rwtimeout network
218 timeout, and making sure we don't go over 2x_rwtimeout as a global
223 etime = time.time() + self._rwtimeout
224 while not self._msgs:
225 if time.time() > etime:
226 raise TimeoutError("Extended receive timeout")
229 data = self.socket.recv(4096)
230 except socket.error, err:
231 if err.args and err.args[0] == errno.EAGAIN:
234 except socket.timeout, err:
235 raise TimeoutError("Receive timeout: %s" % str(err))
238 raise ConnectionClosedError("Connection closed while reading")
239 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
240 self._buffer = new_msgs.pop()
241 self._msgs.extend(new_msgs)
242 return self._msgs.popleft()
245 """Send a message and wait for the response.
247 This is just a wrapper over Send and Recv.
254 """Close the socket"""
255 if self.socket is not None:
260 def ParseRequest(msg):
261 """Parses a LUXI request message.
265 request = serializer.LoadJson(msg)
266 except ValueError, err:
267 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
269 logging.debug("LUXI request: %s", request)
271 if not isinstance(request, dict):
272 logging.error("LUXI request not a dict: %r", msg)
273 raise ProtocolError("Invalid LUXI request (not a dict)")
275 method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
276 args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
278 if method is None or args is None:
279 logging.error("LUXI request missing method or arguments: %r", msg)
280 raise ProtocolError(("Invalid LUXI request (no method or arguments"
281 " in request): %r") % msg)
283 return (method, args)
286 def ParseResponse(msg):
287 """Parses a LUXI response message.
292 data = serializer.LoadJson(msg)
293 except Exception, err:
294 raise ProtocolError("Error while deserializing response: %s" % str(err))
297 if not (isinstance(data, dict) and
298 KEY_SUCCESS in data and
300 raise ProtocolError("Invalid response from server: %r" % data)
302 return (data[KEY_SUCCESS], data[KEY_RESULT])
305 def FormatResponse(success, result):
306 """Formats a LUXI response message.
310 KEY_SUCCESS: success,
314 logging.debug("LUXI response: %s", response)
316 return serializer.DumpJson(response)
319 def FormatRequest(method, args):
320 """Formats a LUXI request message.
329 # Serialize the request
330 return serializer.DumpJson(request, indent=False)
333 def CallLuxiMethod(transport_cb, method, args):
334 """Send a LUXI request via a transport and return the response.
337 assert callable(transport_cb)
339 request_msg = FormatRequest(method, args)
341 # Send request and wait for response
342 response_msg = transport_cb(request_msg)
344 (success, result) = ParseResponse(response_msg)
349 errors.MaybeRaise(result)
350 raise RequestError(result)
353 class Client(object):
354 """High-level client implementation.
356 This uses a backing Transport-like class on top of which it
357 implements data serialization/deserialization.
360 def __init__(self, address=None, timeouts=None, transport=Transport):
361 """Constructor for the Client class.
364 - address: a valid address the the used transport class
365 - timeout: a list of timeouts, to be used on connect and read/write
366 - transport: a Transport-like class
369 If timeout is not passed, the default timeouts of the transport
374 address = constants.MASTER_SOCKET
375 self.address = address
376 self.timeouts = timeouts
377 self.transport_class = transport
378 self.transport = None
379 self._InitTransport()
381 def _InitTransport(self):
382 """(Re)initialize the transport if needed.
385 if self.transport is None:
386 self.transport = self.transport_class(self.address,
387 timeouts=self.timeouts)
389 def _CloseTransport(self):
390 """Close the transport, ignoring errors.
393 if self.transport is None:
396 old_transp = self.transport
397 self.transport = None
399 except Exception: # pylint: disable-msg=W0703
402 def _SendMethodCall(self, data):
403 # Send request and wait for response
405 self._InitTransport()
406 return self.transport.Call(data)
408 self._CloseTransport()
411 def CallMethod(self, method, args):
412 """Send a generic request and return the response.
415 return CallLuxiMethod(self._SendMethodCall, method, args)
417 def SetQueueDrainFlag(self, drain_flag):
418 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
420 def SetWatcherPause(self, until):
421 return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
423 def SubmitJob(self, ops):
424 ops_state = map(lambda op: op.__getstate__(), ops)
425 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
427 def SubmitManyJobs(self, jobs):
430 jobs_state.append([op.__getstate__() for op in ops])
431 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
433 def CancelJob(self, job_id):
434 return self.CallMethod(REQ_CANCEL_JOB, job_id)
436 def ArchiveJob(self, job_id):
437 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
439 def AutoArchiveJobs(self, age):
440 timeout = (DEF_RWTO - 1) / 2
441 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
443 def WaitForJobChangeOnce(self, job_id, fields,
444 prev_job_info, prev_log_serial,
445 timeout=WFJC_TIMEOUT):
446 """Waits for changes on a job.
448 @param job_id: Job ID
450 @param fields: List of field names to be observed
451 @type prev_job_info: None or list
452 @param prev_job_info: Previously received job information
453 @type prev_log_serial: None or int/long
454 @param prev_log_serial: Highest log serial number previously received
455 @type timeout: int/float
456 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
457 be capped to that value)
460 assert timeout >= 0, "Timeout can not be negative"
461 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
462 (job_id, fields, prev_job_info,
464 min(WFJC_TIMEOUT, timeout)))
466 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
468 result = self.WaitForJobChangeOnce(job_id, fields,
469 prev_job_info, prev_log_serial)
470 if result != constants.JOB_NOTCHANGED:
474 def QueryJobs(self, job_ids, fields):
475 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
477 def QueryInstances(self, names, fields, use_locking):
478 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
480 def QueryNodes(self, names, fields, use_locking):
481 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
483 def QueryExports(self, nodes, use_locking):
484 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
486 def QueryClusterInfo(self):
487 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
489 def QueryConfigValues(self, fields):
490 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
492 def QueryTags(self, kind, name):
493 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
495 def QueryLocks(self, fields, sync):
496 return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))