4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
46 KEY_SUCCESS = "success"
49 REQ_SUBMIT_JOB = "SubmitJob"
50 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
51 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
52 REQ_CANCEL_JOB = "CancelJob"
53 REQ_ARCHIVE_JOB = "ArchiveJob"
54 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
55 REQ_QUERY_JOBS = "QueryJobs"
56 REQ_QUERY_INSTANCES = "QueryInstances"
57 REQ_QUERY_NODES = "QueryNodes"
58 REQ_QUERY_EXPORTS = "QueryExports"
59 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
60 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
61 REQ_QUERY_TAGS = "QueryTags"
62 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
63 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
68 # WaitForJobChange timeout
69 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
72 class ProtocolError(errors.GenericError):
73 """Denotes an error in the LUXI protocol."""
76 class ConnectionClosedError(ProtocolError):
77 """Connection closed error."""
80 class TimeoutError(ProtocolError):
81 """Operation timeout error."""
84 class RequestError(ProtocolError):
87 This signifies an error in the request format or request handling,
88 but not (e.g.) an error in starting up an instance.
90 Some common conditions that can trigger this exception:
91 - job submission failed because the job data was wrong
92 - query failed because required fields were missing
97 class NoMasterError(ProtocolError):
98 """The master cannot be reached.
100 This means that the master daemon is not running or the socket has
106 class PermissionError(ProtocolError):
107 """Permission denied while connecting to the master socket.
109 This means the user doesn't have the proper rights.
115 """Low-level transport class.
117 This is used on the client side.
119 This could be replace by any other class that provides the same
120 semantics to the Client. This means:
121 - can send messages and receive messages
122 - safe for multithreading
126 def __init__(self, address, timeouts=None):
127 """Constructor for the Client class.
130 - address: a valid address the the used transport class
131 - timeout: a list of timeouts, to be used on connect and read/write
133 There are two timeouts used since we might want to wait for a long
134 time for a response, but the connect timeout should be lower.
136 If not passed, we use a default of 10 and respectively 60 seconds.
138 Note that on reading data, since the timeout applies to an
139 invidual receive, it might be that the total duration is longer
140 than timeout value passed (we make a hard limit at twice the read
144 self.address = address
146 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
148 self._ctimeout, self._rwtimeout = timeouts
152 self._msgs = collections.deque()
155 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
159 utils.Retry(self._Connect, 1.0, self._ctimeout,
160 args=(self.socket, address, self._ctimeout))
161 except utils.RetryTimeout:
162 raise TimeoutError("Connect timed out")
164 self.socket.settimeout(self._rwtimeout)
165 except (socket.error, NoMasterError):
166 if self.socket is not None:
172 def _Connect(sock, address, timeout):
173 sock.settimeout(timeout)
175 sock.connect(address)
176 except socket.timeout, err:
177 raise TimeoutError("Connect timed out: %s" % str(err))
178 except socket.error, err:
179 error_code = err.args[0]
180 if error_code in (errno.ENOENT, errno.ECONNREFUSED):
181 raise NoMasterError(address)
182 elif error_code in (errno.EPERM, errno.EACCES):
183 raise PermissionError(address)
184 elif error_code == errno.EAGAIN:
185 # Server's socket backlog is full at the moment
186 raise utils.RetryAgain()
189 def _CheckSocket(self):
190 """Make sure we are connected.
193 if self.socket is None:
194 raise ProtocolError("Connection is closed")
199 This just sends a message and doesn't wait for the response.
202 if constants.LUXI_EOM in msg:
203 raise ProtocolError("Message terminator found in payload")
207 # TODO: sendall is not guaranteed to send everything
208 self.socket.sendall(msg + constants.LUXI_EOM)
209 except socket.timeout, err:
210 raise TimeoutError("Sending timeout: %s" % str(err))
213 """Try to receive a message from the socket.
215 In case we already have messages queued, we just return from the
216 queue. Otherwise, we try to read data with a _rwtimeout network
217 timeout, and making sure we don't go over 2x_rwtimeout as a global
222 etime = time.time() + self._rwtimeout
223 while not self._msgs:
224 if time.time() > etime:
225 raise TimeoutError("Extended receive timeout")
228 data = self.socket.recv(4096)
229 except socket.error, err:
230 if err.args and err.args[0] == errno.EAGAIN:
233 except socket.timeout, err:
234 raise TimeoutError("Receive timeout: %s" % str(err))
237 raise ConnectionClosedError("Connection closed while reading")
238 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
239 self._buffer = new_msgs.pop()
240 self._msgs.extend(new_msgs)
241 return self._msgs.popleft()
244 """Send a message and wait for the response.
246 This is just a wrapper over Send and Recv.
253 """Close the socket"""
254 if self.socket is not None:
259 def ParseRequest(msg):
260 """Parses a LUXI request message.
264 request = serializer.LoadJson(msg)
265 except ValueError, err:
266 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
268 logging.debug("LUXI request: %s", request)
270 if not isinstance(request, dict):
271 logging.error("LUXI request not a dict: %r", msg)
272 raise ProtocolError("Invalid LUXI request (not a dict)")
274 method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
275 args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
277 if method is None or args is None:
278 logging.error("LUXI request missing method or arguments: %r", msg)
279 raise ProtocolError(("Invalid LUXI request (no method or arguments"
280 " in request): %r") % msg)
282 return (method, args)
285 def ParseResponse(msg):
286 """Parses a LUXI response message.
291 data = serializer.LoadJson(msg)
292 except Exception, err:
293 raise ProtocolError("Error while deserializing response: %s" % str(err))
296 if not (isinstance(data, dict) and
297 KEY_SUCCESS in data and
299 raise ProtocolError("Invalid response from server: %r" % data)
301 return (data[KEY_SUCCESS], data[KEY_RESULT])
304 def FormatResponse(success, result):
305 """Formats a LUXI response message.
309 KEY_SUCCESS: success,
313 logging.debug("LUXI response: %s", response)
315 return serializer.DumpJson(response)
318 def FormatRequest(method, args):
319 """Formats a LUXI request message.
328 # Serialize the request
329 return serializer.DumpJson(request, indent=False)
332 def CallLuxiMethod(transport_cb, method, args):
333 """Send a LUXI request via a transport and return the response.
336 assert callable(transport_cb)
338 request_msg = FormatRequest(method, args)
340 # Send request and wait for response
341 response_msg = transport_cb(request_msg)
343 (success, result) = ParseResponse(response_msg)
348 errors.MaybeRaise(result)
349 raise RequestError(result)
352 class Client(object):
353 """High-level client implementation.
355 This uses a backing Transport-like class on top of which it
356 implements data serialization/deserialization.
359 def __init__(self, address=None, timeouts=None, transport=Transport):
360 """Constructor for the Client class.
363 - address: a valid address the the used transport class
364 - timeout: a list of timeouts, to be used on connect and read/write
365 - transport: a Transport-like class
368 If timeout is not passed, the default timeouts of the transport
373 address = constants.MASTER_SOCKET
374 self.address = address
375 self.timeouts = timeouts
376 self.transport_class = transport
377 self.transport = None
378 self._InitTransport()
380 def _InitTransport(self):
381 """(Re)initialize the transport if needed.
384 if self.transport is None:
385 self.transport = self.transport_class(self.address,
386 timeouts=self.timeouts)
388 def _CloseTransport(self):
389 """Close the transport, ignoring errors.
392 if self.transport is None:
395 old_transp = self.transport
396 self.transport = None
398 except Exception: # pylint: disable-msg=W0703
401 def _SendMethodCall(self, data):
402 # Send request and wait for response
404 self._InitTransport()
405 return self.transport.Call(data)
407 self._CloseTransport()
410 def CallMethod(self, method, args):
411 """Send a generic request and return the response.
414 return CallLuxiMethod(self._SendMethodCall, method, args)
416 def SetQueueDrainFlag(self, drain_flag):
417 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
419 def SetWatcherPause(self, until):
420 return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
422 def SubmitJob(self, ops):
423 ops_state = map(lambda op: op.__getstate__(), ops)
424 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
426 def SubmitManyJobs(self, jobs):
429 jobs_state.append([op.__getstate__() for op in ops])
430 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
432 def CancelJob(self, job_id):
433 return self.CallMethod(REQ_CANCEL_JOB, job_id)
435 def ArchiveJob(self, job_id):
436 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
438 def AutoArchiveJobs(self, age):
439 timeout = (DEF_RWTO - 1) / 2
440 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
442 def WaitForJobChangeOnce(self, job_id, fields,
443 prev_job_info, prev_log_serial,
444 timeout=WFJC_TIMEOUT):
445 """Waits for changes on a job.
447 @param job_id: Job ID
449 @param fields: List of field names to be observed
450 @type prev_job_info: None or list
451 @param prev_job_info: Previously received job information
452 @type prev_log_serial: None or int/long
453 @param prev_log_serial: Highest log serial number previously received
454 @type timeout: int/float
455 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
456 be capped to that value)
459 assert timeout >= 0, "Timeout can not be negative"
460 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
461 (job_id, fields, prev_job_info,
463 min(WFJC_TIMEOUT, timeout)))
465 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
467 result = self.WaitForJobChangeOnce(job_id, fields,
468 prev_job_info, prev_log_serial)
469 if result != constants.JOB_NOTCHANGED:
473 def QueryJobs(self, job_ids, fields):
474 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
476 def QueryInstances(self, names, fields, use_locking):
477 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
479 def QueryNodes(self, names, fields, use_locking):
480 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
482 def QueryExports(self, nodes, use_locking):
483 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
485 def QueryClusterInfo(self):
486 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
488 def QueryConfigValues(self, fields):
489 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
491 def QueryTags(self, kind, name):
492 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))