4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
45 KEY_SUCCESS = "success"
48 REQ_SUBMIT_JOB = "SubmitJob"
49 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
50 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
51 REQ_CANCEL_JOB = "CancelJob"
52 REQ_ARCHIVE_JOB = "ArchiveJob"
53 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
54 REQ_QUERY_JOBS = "QueryJobs"
55 REQ_QUERY_INSTANCES = "QueryInstances"
56 REQ_QUERY_NODES = "QueryNodes"
57 REQ_QUERY_EXPORTS = "QueryExports"
58 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
59 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
60 REQ_QUERY_TAGS = "QueryTags"
61 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
62 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
68 class ProtocolError(errors.GenericError):
69 """Denotes an error in the LUXI protocol"""
72 class ConnectionClosedError(ProtocolError):
73 """Connection closed error"""
76 class TimeoutError(ProtocolError):
77 """Operation timeout error"""
80 class RequestError(ProtocolError):
83 This signifies an error in the request format or request handling,
84 but not (e.g.) an error in starting up an instance.
86 Some common conditions that can trigger this exception:
87 - job submission failed because the job data was wrong
88 - query failed because required fields were missing
93 class NoMasterError(ProtocolError):
94 """The master cannot be reached
96 This means that the master daemon is not running or the socket has
103 """Low-level transport class.
105 This is used on the client side.
107 This could be replace by any other class that provides the same
108 semantics to the Client. This means:
109 - can send messages and receive messages
110 - safe for multithreading
114 def __init__(self, address, timeouts=None, eom=None):
115 """Constructor for the Client class.
118 - address: a valid address the the used transport class
119 - timeout: a list of timeouts, to be used on connect and read/write
120 - eom: an identifier to be used as end-of-message which the
121 upper-layer will guarantee that this identifier will not appear
124 There are two timeouts used since we might want to wait for a long
125 time for a response, but the connect timeout should be lower.
127 If not passed, we use a default of 10 and respectively 60 seconds.
129 Note that on reading data, since the timeout applies to an
130 invidual receive, it might be that the total duration is longer
131 than timeout value passed (we make a hard limit at twice the read
135 self.address = address
137 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
139 self._ctimeout, self._rwtimeout = timeouts
143 self._msgs = collections.deque()
151 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
152 self.socket.settimeout(self._ctimeout)
154 self.socket.connect(address)
155 except socket.timeout, err:
156 raise TimeoutError("Connect timed out: %s" % str(err))
157 except socket.error, err:
158 if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
159 raise NoMasterError(address)
161 self.socket.settimeout(self._rwtimeout)
162 except (socket.error, NoMasterError):
163 if self.socket is not None:
168 def _CheckSocket(self):
169 """Make sure we are connected.
172 if self.socket is None:
173 raise ProtocolError("Connection is closed")
178 This just sends a message and doesn't wait for the response.
182 raise ProtocolError("Message terminator found in payload")
186 # TODO: sendall is not guaranteed to send everything
187 self.socket.sendall(msg + self.eom)
188 except socket.timeout, err:
189 raise TimeoutError("Sending timeout: %s" % str(err))
192 """Try to receive a message from the socket.
194 In case we already have messages queued, we just return from the
195 queue. Otherwise, we try to read data with a _rwtimeout network
196 timeout, and making sure we don't go over 2x_rwtimeout as a global
201 etime = time.time() + self._rwtimeout
202 while not self._msgs:
203 if time.time() > etime:
204 raise TimeoutError("Extended receive timeout")
207 data = self.socket.recv(4096)
208 except socket.error, err:
209 if err.args and err.args[0] == errno.EAGAIN:
212 except socket.timeout, err:
213 raise TimeoutError("Receive timeout: %s" % str(err))
216 raise ConnectionClosedError("Connection closed while reading")
217 new_msgs = (self._buffer + data).split(self.eom)
218 self._buffer = new_msgs.pop()
219 self._msgs.extend(new_msgs)
220 return self._msgs.popleft()
223 """Send a message and wait for the response.
225 This is just a wrapper over Send and Recv.
232 """Close the socket"""
233 if self.socket is not None:
238 def ParseRequest(msg):
239 """Parses a LUXI request message.
243 request = serializer.LoadJson(msg)
244 except ValueError, err:
245 raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
247 logging.debug("LUXI request: %s", request)
249 if not isinstance(request, dict):
250 logging.error("LUXI request not a dict: %r", msg)
251 raise ProtocolError("Invalid LUXI request (not a dict)")
253 method = request.get(KEY_METHOD, None)
254 args = request.get(KEY_ARGS, None)
255 if method is None or args is None:
256 logging.error("LUXI request missing method or arguments: %r", msg)
257 raise ProtocolError(("Invalid LUXI request (no method or arguments"
258 " in request): %r") % msg)
260 return (method, args)
263 def ParseResponse(msg):
264 """Parses a LUXI response message.
269 data = serializer.LoadJson(msg)
270 except Exception, err:
271 raise ProtocolError("Error while deserializing response: %s" % str(err))
274 if not (isinstance(data, dict) and
275 KEY_SUCCESS in data and
277 raise ProtocolError("Invalid response from server: %r" % data)
279 return (data[KEY_SUCCESS], data[KEY_RESULT])
282 def FormatResponse(success, result):
283 """Formats a LUXI response message.
287 KEY_SUCCESS: success,
291 logging.debug("LUXI response: %s", response)
293 return serializer.DumpJson(response)
296 def FormatRequest(method, args):
297 """Formats a LUXI request message.
306 # Serialize the request
307 return serializer.DumpJson(request, indent=False)
310 def CallLuxiMethod(transport_cb, method, args):
311 """Send a LUXI request via a transport and return the response.
314 assert callable(transport_cb)
316 request_msg = FormatRequest(method, args)
318 # Send request and wait for response
319 response_msg = transport_cb(request_msg)
321 (success, result) = ParseResponse(response_msg)
326 errors.MaybeRaise(result)
327 raise RequestError(result)
330 class Client(object):
331 """High-level client implementation.
333 This uses a backing Transport-like class on top of which it
334 implements data serialization/deserialization.
337 def __init__(self, address=None, timeouts=None, transport=Transport):
338 """Constructor for the Client class.
341 - address: a valid address the the used transport class
342 - timeout: a list of timeouts, to be used on connect and read/write
343 - transport: a Transport-like class
346 If timeout is not passed, the default timeouts of the transport
351 address = constants.MASTER_SOCKET
352 self.address = address
353 self.timeouts = timeouts
354 self.transport_class = transport
355 self.transport = None
356 self._InitTransport()
358 def _InitTransport(self):
359 """(Re)initialize the transport if needed.
362 if self.transport is None:
363 self.transport = self.transport_class(self.address,
364 timeouts=self.timeouts)
366 def _CloseTransport(self):
367 """Close the transport, ignoring errors.
370 if self.transport is None:
373 old_transp = self.transport
374 self.transport = None
376 except Exception: # pylint: disable-msg=W0703
379 def _SendMethodCall(self, data):
380 # Send request and wait for response
382 self._InitTransport()
383 return self.transport.Call(data)
385 self._CloseTransport()
388 def CallMethod(self, method, args):
389 """Send a generic request and return the response.
392 return CallLuxiMethod(self._SendMethodCall, method, args)
394 def SetQueueDrainFlag(self, drain_flag):
395 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
397 def SetWatcherPause(self, until):
398 return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
400 def SubmitJob(self, ops):
401 ops_state = map(lambda op: op.__getstate__(), ops)
402 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
404 def SubmitManyJobs(self, jobs):
407 jobs_state.append([op.__getstate__() for op in ops])
408 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
410 def CancelJob(self, job_id):
411 return self.CallMethod(REQ_CANCEL_JOB, job_id)
413 def ArchiveJob(self, job_id):
414 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
416 def AutoArchiveJobs(self, age):
417 timeout = (DEF_RWTO - 1) / 2
418 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
420 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
421 timeout = (DEF_RWTO - 1) / 2
423 result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
424 (job_id, fields, prev_job_info,
425 prev_log_serial, timeout))
426 if result != constants.JOB_NOTCHANGED:
430 def QueryJobs(self, job_ids, fields):
431 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
433 def QueryInstances(self, names, fields, use_locking):
434 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
436 def QueryNodes(self, names, fields, use_locking):
437 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
439 def QueryExports(self, nodes, use_locking):
440 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
442 def QueryClusterInfo(self):
443 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
445 def QueryConfigValues(self, fields):
446 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
448 def QueryTags(self, kind, name):
449 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
452 # TODO: class Server(object)