4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
44 KEY_SUCCESS = "success"
47 REQ_SUBMIT_JOB = "SubmitJob"
48 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
49 REQ_CANCEL_JOB = "CancelJob"
50 REQ_ARCHIVE_JOB = "ArchiveJob"
51 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
52 REQ_QUERY_JOBS = "QueryJobs"
53 REQ_QUERY_INSTANCES = "QueryInstances"
54 REQ_QUERY_NODES = "QueryNodes"
55 REQ_QUERY_EXPORTS = "QueryExports"
56 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
57 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
58 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
64 class ProtocolError(Exception):
65 """Denotes an error in the server communication"""
68 class ConnectionClosedError(ProtocolError):
69 """Connection closed error"""
72 class TimeoutError(ProtocolError):
73 """Operation timeout error"""
76 class EncodingError(ProtocolError):
77 """Encoding failure on the sending side"""
80 class DecodingError(ProtocolError):
81 """Decoding failure on the receiving side"""
84 class RequestError(ProtocolError):
87 This signifies an error in the request format or request handling,
88 but not (e.g.) an error in starting up an instance.
90 Some common conditions that can trigger this exception:
91 - job submission failed because the job data was wrong
92 - query failed because required fields were missing
97 class NoMasterError(ProtocolError):
98 """The master cannot be reached
100 This means that the master daemon is not running or the socket has
107 """Low-level transport class.
109 This is used on the client side.
111 This could be replace by any other class that provides the same
112 semantics to the Client. This means:
113 - can send messages and receive messages
114 - safe for multithreading
118 def __init__(self, address, timeouts=None, eom=None):
119 """Constructor for the Client class.
122 - address: a valid address the the used transport class
123 - timeout: a list of timeouts, to be used on connect and read/write
124 - eom: an identifier to be used as end-of-message which the
125 upper-layer will guarantee that this identifier will not appear
128 There are two timeouts used since we might want to wait for a long
129 time for a response, but the connect timeout should be lower.
131 If not passed, we use a default of 10 and respectively 60 seconds.
133 Note that on reading data, since the timeout applies to an
134 invidual receive, it might be that the total duration is longer
135 than timeout value passed (we make a hard limit at twice the read
139 self.address = address
141 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
143 self._ctimeout, self._rwtimeout = timeouts
147 self._msgs = collections.deque()
155 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
156 self.socket.settimeout(self._ctimeout)
158 self.socket.connect(address)
159 except socket.timeout, err:
160 raise TimeoutError("Connect timed out: %s" % str(err))
161 except socket.error, err:
162 if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
163 raise NoMasterError((address,))
165 self.socket.settimeout(self._rwtimeout)
166 except (socket.error, NoMasterError):
167 if self.socket is not None:
172 def _CheckSocket(self):
173 """Make sure we are connected.
176 if self.socket is None:
177 raise ProtocolError("Connection is closed")
182 This just sends a message and doesn't wait for the response.
186 raise EncodingError("Message terminator found in payload")
189 # TODO: sendall is not guaranteed to send everything
190 self.socket.sendall(msg + self.eom)
191 except socket.timeout, err:
192 raise TimeoutError("Sending timeout: %s" % str(err))
195 """Try to receive a message from the socket.
197 In case we already have messages queued, we just return from the
198 queue. Otherwise, we try to read data with a _rwtimeout network
199 timeout, and making sure we don't go over 2x_rwtimeout as a global
204 etime = time.time() + self._rwtimeout
205 while not self._msgs:
206 if time.time() > etime:
207 raise TimeoutError("Extended receive timeout")
210 data = self.socket.recv(4096)
211 except socket.error, err:
212 if err.args and err.args[0] == errno.EAGAIN:
215 except socket.timeout, err:
216 raise TimeoutError("Receive timeout: %s" % str(err))
219 raise ConnectionClosedError("Connection closed while reading")
220 new_msgs = (self._buffer + data).split(self.eom)
221 self._buffer = new_msgs.pop()
222 self._msgs.extend(new_msgs)
223 return self._msgs.popleft()
226 """Send a message and wait for the response.
228 This is just a wrapper over Send and Recv.
235 """Close the socket"""
236 if self.socket is not None:
241 class Client(object):
242 """High-level client implementation.
244 This uses a backing Transport-like class on top of which it
245 implements data serialization/deserialization.
248 def __init__(self, address=None, timeouts=None, transport=Transport):
249 """Constructor for the Client class.
252 - address: a valid address the the used transport class
253 - timeout: a list of timeouts, to be used on connect and read/write
254 - transport: a Transport-like class
257 If timeout is not passed, the default timeouts of the transport
262 address = constants.MASTER_SOCKET
263 self.address = address
264 self.timeouts = timeouts
265 self.transport_class = transport
266 self.transport = None
267 self._InitTransport()
269 def _InitTransport(self):
270 """(Re)initialize the transport if needed.
273 if self.transport is None:
274 self.transport = self.transport_class(self.address,
275 timeouts=self.timeouts)
277 def _CloseTransport(self):
278 """Close the transport, ignoring errors.
281 if self.transport is None:
284 old_transp = self.transport
285 self.transport = None
290 def CallMethod(self, method, args):
291 """Send a generic request and return the response.
300 # Serialize the request
301 send_data = serializer.DumpJson(request, indent=False)
303 # Send request and wait for response
305 self._InitTransport()
306 result = self.transport.Call(send_data)
308 self._CloseTransport()
313 data = serializer.LoadJson(result)
314 except Exception, err:
315 raise ProtocolError("Error while deserializing response: %s" % str(err))
318 if (not isinstance(data, dict) or
319 KEY_SUCCESS not in data or
320 KEY_RESULT not in data):
321 raise DecodingError("Invalid response from server: %s" % str(data))
323 result = data[KEY_RESULT]
325 if not data[KEY_SUCCESS]:
326 # TODO: decide on a standard exception
327 if (isinstance(result, (tuple, list)) and len(result) == 2 and
328 isinstance(result[1], (tuple, list))):
329 # custom ganeti errors
330 err_class = errors.GetErrorClass(result[0])
331 if err_class is not None:
332 raise err_class, tuple(result[1])
334 raise RequestError(result)
338 def SetQueueDrainFlag(self, drain_flag):
339 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
341 def SubmitJob(self, ops):
342 ops_state = map(lambda op: op.__getstate__(), ops)
343 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
345 def CancelJob(self, job_id):
346 return self.CallMethod(REQ_CANCEL_JOB, job_id)
348 def ArchiveJob(self, job_id):
349 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
351 def AutoArchiveJobs(self, age):
352 timeout = (DEF_RWTO - 1) / 2
353 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
355 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
356 timeout = (DEF_RWTO - 1) / 2
358 result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
359 (job_id, fields, prev_job_info,
360 prev_log_serial, timeout))
361 if result != constants.JOB_NOTCHANGED:
365 def QueryJobs(self, job_ids, fields):
366 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
368 def QueryInstances(self, names, fields, use_locking):
369 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
371 def QueryNodes(self, names, fields, use_locking):
372 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
374 def QueryExports(self, nodes, use_locking):
375 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
377 def QueryClusterInfo(self):
378 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
380 def QueryConfigValues(self, fields):
381 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
384 # TODO: class Server(object)