4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
45 KEY_SUCCESS = "success"
48 REQ_SUBMIT_JOB = "SubmitJob"
49 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
50 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
51 REQ_CANCEL_JOB = "CancelJob"
52 REQ_ARCHIVE_JOB = "ArchiveJob"
53 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
54 REQ_QUERY_JOBS = "QueryJobs"
55 REQ_QUERY_INSTANCES = "QueryInstances"
56 REQ_QUERY_NODES = "QueryNodes"
57 REQ_QUERY_EXPORTS = "QueryExports"
58 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
59 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
60 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
61 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
67 class ProtocolError(errors.GenericError):
68 """Denotes an error in the LUXI protocol"""
71 class ConnectionClosedError(ProtocolError):
72 """Connection closed error"""
75 class TimeoutError(ProtocolError):
76 """Operation timeout error"""
79 class RequestError(ProtocolError):
82 This signifies an error in the request format or request handling,
83 but not (e.g.) an error in starting up an instance.
85 Some common conditions that can trigger this exception:
86 - job submission failed because the job data was wrong
87 - query failed because required fields were missing
92 class NoMasterError(ProtocolError):
93 """The master cannot be reached
95 This means that the master daemon is not running or the socket has
102 """Low-level transport class.
104 This is used on the client side.
106 This could be replace by any other class that provides the same
107 semantics to the Client. This means:
108 - can send messages and receive messages
109 - safe for multithreading
113 def __init__(self, address, timeouts=None, eom=None):
114 """Constructor for the Client class.
117 - address: a valid address the the used transport class
118 - timeout: a list of timeouts, to be used on connect and read/write
119 - eom: an identifier to be used as end-of-message which the
120 upper-layer will guarantee that this identifier will not appear
123 There are two timeouts used since we might want to wait for a long
124 time for a response, but the connect timeout should be lower.
126 If not passed, we use a default of 10 and respectively 60 seconds.
128 Note that on reading data, since the timeout applies to an
129 invidual receive, it might be that the total duration is longer
130 than timeout value passed (we make a hard limit at twice the read
134 self.address = address
136 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
138 self._ctimeout, self._rwtimeout = timeouts
142 self._msgs = collections.deque()
150 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
151 self.socket.settimeout(self._ctimeout)
153 self.socket.connect(address)
154 except socket.timeout, err:
155 raise TimeoutError("Connect timed out: %s" % str(err))
156 except socket.error, err:
157 if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
158 raise NoMasterError(address)
160 self.socket.settimeout(self._rwtimeout)
161 except (socket.error, NoMasterError):
162 if self.socket is not None:
167 def _CheckSocket(self):
168 """Make sure we are connected.
171 if self.socket is None:
172 raise ProtocolError("Connection is closed")
177 This just sends a message and doesn't wait for the response.
181 raise ProtocolError("Message terminator found in payload")
185 # TODO: sendall is not guaranteed to send everything
186 self.socket.sendall(msg + self.eom)
187 except socket.timeout, err:
188 raise TimeoutError("Sending timeout: %s" % str(err))
191 """Try to receive a message from the socket.
193 In case we already have messages queued, we just return from the
194 queue. Otherwise, we try to read data with a _rwtimeout network
195 timeout, and making sure we don't go over 2x_rwtimeout as a global
200 etime = time.time() + self._rwtimeout
201 while not self._msgs:
202 if time.time() > etime:
203 raise TimeoutError("Extended receive timeout")
206 data = self.socket.recv(4096)
207 except socket.error, err:
208 if err.args and err.args[0] == errno.EAGAIN:
211 except socket.timeout, err:
212 raise TimeoutError("Receive timeout: %s" % str(err))
215 raise ConnectionClosedError("Connection closed while reading")
216 new_msgs = (self._buffer + data).split(self.eom)
217 self._buffer = new_msgs.pop()
218 self._msgs.extend(new_msgs)
219 return self._msgs.popleft()
222 """Send a message and wait for the response.
224 This is just a wrapper over Send and Recv.
231 """Close the socket"""
232 if self.socket is not None:
237 def ParseRequest(msg):
238 """Parses a LUXI request message.
242 request = serializer.LoadJson(msg)
243 except ValueError, err:
244 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
246 logging.debug("LUXI request: %s", request)
248 if not isinstance(request, dict):
249 logging.error("LUXI request not a dict: %r", msg)
250 raise ProtocolError("Invalid LUXI request (not a dict)")
252 method = request.get(KEY_METHOD, None)
253 args = request.get(KEY_ARGS, None)
254 if method is None or args is None:
255 logging.error("LUXI request missing method or arguments: %r", msg)
256 raise ProtocolError(("Invalid LUXI request (no method or arguments"
257 " in request): %r") % msg)
259 return (method, args)
262 def ParseResponse(msg):
263 """Parses a LUXI response message.
268 data = serializer.LoadJson(msg)
269 except Exception, err:
270 raise ProtocolError("Error while deserializing response: %s" % str(err))
273 if not (isinstance(data, dict) and
274 KEY_SUCCESS in data and
276 raise ProtocolError("Invalid response from server: %r" % data)
278 return (data[KEY_SUCCESS], data[KEY_RESULT])
281 def FormatResponse(success, result):
282 """Formats a LUXI response message.
286 KEY_SUCCESS: success,
290 logging.debug("LUXI response: %s", response)
292 return serializer.DumpJson(response)
295 def FormatRequest(method, args):
296 """Formats a LUXI request message.
305 # Serialize the request
306 return serializer.DumpJson(request, indent=False)
309 def CallLuxiMethod(transport_cb, method, args):
310 """Send a LUXI request via a transport and return the response.
313 assert callable(transport_cb)
315 request_msg = FormatRequest(method, args)
317 # Send request and wait for response
318 response_msg = transport_cb(request_msg)
320 (success, result) = ParseResponse(response_msg)
325 errors.MaybeRaise(result)
326 raise RequestError(result)
329 class Client(object):
330 """High-level client implementation.
332 This uses a backing Transport-like class on top of which it
333 implements data serialization/deserialization.
336 def __init__(self, address=None, timeouts=None, transport=Transport):
337 """Constructor for the Client class.
340 - address: a valid address the the used transport class
341 - timeout: a list of timeouts, to be used on connect and read/write
342 - transport: a Transport-like class
345 If timeout is not passed, the default timeouts of the transport
350 address = constants.MASTER_SOCKET
351 self.address = address
352 self.timeouts = timeouts
353 self.transport_class = transport
354 self.transport = None
355 self._InitTransport()
357 def _InitTransport(self):
358 """(Re)initialize the transport if needed.
361 if self.transport is None:
362 self.transport = self.transport_class(self.address,
363 timeouts=self.timeouts)
365 def _CloseTransport(self):
366 """Close the transport, ignoring errors.
369 if self.transport is None:
372 old_transp = self.transport
373 self.transport = None
375 except Exception: # pylint: disable-msg=W0703
378 def _SendMethodCall(self, data):
379 # Send request and wait for response
381 self._InitTransport()
382 return self.transport.Call(data)
384 self._CloseTransport()
387 def CallMethod(self, method, args):
388 """Send a generic request and return the response.
391 return CallLuxiMethod(self._SendMethodCall, method, args)
393 def SetQueueDrainFlag(self, drain_flag):
394 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
396 def SetWatcherPause(self, until):
397 return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
399 def SubmitJob(self, ops):
400 ops_state = map(lambda op: op.__getstate__(), ops)
401 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
403 def SubmitManyJobs(self, jobs):
406 jobs_state.append([op.__getstate__() for op in ops])
407 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
409 def CancelJob(self, job_id):
410 return self.CallMethod(REQ_CANCEL_JOB, job_id)
412 def ArchiveJob(self, job_id):
413 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
415 def AutoArchiveJobs(self, age):
416 timeout = (DEF_RWTO - 1) / 2
417 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
419 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
420 timeout = (DEF_RWTO - 1) / 2
422 result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
423 (job_id, fields, prev_job_info,
424 prev_log_serial, timeout))
425 if result != constants.JOB_NOTCHANGED:
429 def QueryJobs(self, job_ids, fields):
430 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
432 def QueryInstances(self, names, fields, use_locking):
433 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
435 def QueryNodes(self, names, fields, use_locking):
436 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
438 def QueryExports(self, nodes, use_locking):
439 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
441 def QueryClusterInfo(self):
442 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
444 def QueryConfigValues(self, fields):
445 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
448 # TODO: class Server(object)