4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
44 KEY_SUCCESS = "success"
47 REQ_SUBMIT_JOB = "SubmitJob"
48 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
49 REQ_CANCEL_JOB = "CancelJob"
50 REQ_ARCHIVE_JOB = "ArchiveJob"
51 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
52 REQ_QUERY_JOBS = "QueryJobs"
53 REQ_QUERY_INSTANCES = "QueryInstances"
54 REQ_QUERY_NODES = "QueryNodes"
55 REQ_QUERY_EXPORTS = "QueryExports"
56 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
57 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
63 class ProtocolError(Exception):
64 """Denotes an error in the server communication"""
67 class ConnectionClosedError(ProtocolError):
68 """Connection closed error"""
71 class TimeoutError(ProtocolError):
72 """Operation timeout error"""
75 class EncodingError(ProtocolError):
76 """Encoding failure on the sending side"""
79 class DecodingError(ProtocolError):
80 """Decoding failure on the receiving side"""
83 class RequestError(ProtocolError):
86 This signifies an error in the request format or request handling,
87 but not (e.g.) an error in starting up an instance.
89 Some common conditions that can trigger this exception:
90 - job submission failed because the job data was wrong
91 - query failed because required fields were missing
96 class NoMasterError(ProtocolError):
97 """The master cannot be reached
99 This means that the master daemon is not running or the socket has
106 """Low-level transport class.
108 This is used on the client side.
110 This could be replace by any other class that provides the same
111 semantics to the Client. This means:
112 - can send messages and receive messages
113 - safe for multithreading
117 def __init__(self, address, timeouts=None, eom=None):
118 """Constructor for the Client class.
121 - address: a valid address the the used transport class
122 - timeout: a list of timeouts, to be used on connect and read/write
123 - eom: an identifier to be used as end-of-message which the
124 upper-layer will guarantee that this identifier will not appear
127 There are two timeouts used since we might want to wait for a long
128 time for a response, but the connect timeout should be lower.
130 If not passed, we use a default of 10 and respectively 60 seconds.
132 Note that on reading data, since the timeout applies to an
133 invidual receive, it might be that the total duration is longer
134 than timeout value passed (we make a hard limit at twice the read
138 self.address = address
140 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
142 self._ctimeout, self._rwtimeout = timeouts
146 self._msgs = collections.deque()
154 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
155 self.socket.settimeout(self._ctimeout)
157 self.socket.connect(address)
158 except socket.timeout, err:
159 raise TimeoutError("Connect timed out: %s" % str(err))
160 except socket.error, err:
161 if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
162 raise NoMasterError((address,))
164 self.socket.settimeout(self._rwtimeout)
165 except (socket.error, NoMasterError):
166 if self.socket is not None:
171 def _CheckSocket(self):
172 """Make sure we are connected.
175 if self.socket is None:
176 raise ProtocolError("Connection is closed")
181 This just sends a message and doesn't wait for the response.
185 raise EncodingError("Message terminator found in payload")
188 self.socket.sendall(msg + self.eom)
189 except socket.timeout, err:
190 raise TimeoutError("Sending timeout: %s" % str(err))
193 """Try to receive a messae from the socket.
195 In case we already have messages queued, we just return from the
196 queue. Otherwise, we try to read data with a _rwtimeout network
197 timeout, and making sure we don't go over 2x_rwtimeout as a global
202 etime = time.time() + self._rwtimeout
203 while not self._msgs:
204 if time.time() > etime:
205 raise TimeoutError("Extended receive timeout")
207 data = self.socket.recv(4096)
208 except socket.timeout, err:
209 raise TimeoutError("Receive timeout: %s" % str(err))
211 raise ConnectionClosedError("Connection closed while reading")
212 new_msgs = (self._buffer + data).split(self.eom)
213 self._buffer = new_msgs.pop()
214 self._msgs.extend(new_msgs)
215 return self._msgs.popleft()
218 """Send a message and wait for the response.
220 This is just a wrapper over Send and Recv.
227 """Close the socket"""
228 if self.socket is not None:
233 class Client(object):
234 """High-level client implementation.
236 This uses a backing Transport-like class on top of which it
237 implements data serialization/deserialization.
240 def __init__(self, address=None, timeouts=None, transport=Transport):
241 """Constructor for the Client class.
244 - address: a valid address the the used transport class
245 - timeout: a list of timeouts, to be used on connect and read/write
246 - transport: a Transport-like class
249 If timeout is not passed, the default timeouts of the transport
254 address = constants.MASTER_SOCKET
255 self.address = address
256 self.timeouts = timeouts
257 self.transport_class = transport
258 self.transport = None
259 self._InitTransport()
261 def _InitTransport(self):
262 """(Re)initialize the transport if needed.
265 if self.transport is None:
266 self.transport = self.transport_class(self.address,
267 timeouts=self.timeouts)
269 def _CloseTransport(self):
270 """Close the transport, ignoring errors.
273 if self.transport is None:
276 old_transp = self.transport
277 self.transport = None
279 except Exception, err:
282 def CallMethod(self, method, args):
283 """Send a generic request and return the response.
292 # Serialize the request
293 send_data = serializer.DumpJson(request, indent=False)
295 # Send request and wait for response
297 self._InitTransport()
298 result = self.transport.Call(send_data)
300 self._CloseTransport()
305 data = serializer.LoadJson(result)
306 except Exception, err:
307 raise ProtocolError("Error while deserializing response: %s" % str(err))
310 if (not isinstance(data, dict) or
311 KEY_SUCCESS not in data or
312 KEY_RESULT not in data):
313 raise DecodingError("Invalid response from server: %s" % str(data))
315 result = data[KEY_RESULT]
317 if not data[KEY_SUCCESS]:
318 # TODO: decide on a standard exception
319 if (isinstance(result, (tuple, list)) and len(result) == 2 and
320 isinstance(result[1], (tuple, list))):
321 # custom ganeti errors
322 err_class = errors.GetErrorClass(result[0])
323 if err_class is not None:
324 raise err_class, tuple(result[1])
326 raise RequestError(result)
330 def SetQueueDrainFlag(self, drain_flag):
331 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
333 def SubmitJob(self, ops):
334 ops_state = map(lambda op: op.__getstate__(), ops)
335 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
337 def CancelJob(self, job_id):
338 return self.CallMethod(REQ_CANCEL_JOB, job_id)
340 def ArchiveJob(self, job_id):
341 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
343 def AutoArchiveJobs(self, age):
344 timeout = (DEF_RWTO - 1) / 2
345 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
347 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
348 timeout = (DEF_RWTO - 1) / 2
350 result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
351 (job_id, fields, prev_job_info,
352 prev_log_serial, timeout))
353 if result != constants.JOB_NOTCHANGED:
357 def QueryJobs(self, job_ids, fields):
358 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
360 def QueryInstances(self, names, fields):
361 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields))
363 def QueryNodes(self, names, fields):
364 return self.CallMethod(REQ_QUERY_NODES, (names, fields))
366 def QueryExports(self, nodes):
367 return self.CallMethod(REQ_QUERY_EXPORTS, nodes)
369 def QueryConfigValues(self, fields):
370 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
373 # TODO: class Server(object)