4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocl. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also be used by the master daemon.
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
44 KEY_SUCCESS = "success"
47 REQ_SUBMIT_JOB = "SubmitJob"
48 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
49 REQ_CANCEL_JOB = "CancelJob"
50 REQ_ARCHIVE_JOB = "ArchiveJob"
51 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
52 REQ_QUERY_JOBS = "QueryJobs"
53 REQ_QUERY_INSTANCES = "QueryInstances"
54 REQ_QUERY_NODES = "QueryNodes"
55 REQ_QUERY_EXPORTS = "QueryExports"
56 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
62 class ProtocolError(Exception):
63 """Denotes an error in the server communication"""
66 class ConnectionClosedError(ProtocolError):
67 """Connection closed error"""
70 class TimeoutError(ProtocolError):
71 """Operation timeout error"""
74 class EncodingError(ProtocolError):
75 """Encoding failure on the sending side"""
78 class DecodingError(ProtocolError):
79 """Decoding failure on the receiving side"""
82 class RequestError(ProtocolError):
85 This signifies an error in the request format or request handling,
86 but not (e.g.) an error in starting up an instance.
88 Some common conditions that can trigger this exception:
89 - job submission failed because the job data was wrong
90 - query failed because required fields were missing
95 class NoMasterError(ProtocolError):
96 """The master cannot be reached
98 This means that the master daemon is not running or the socket has
105 """Low-level transport class.
107 This is used on the client side.
109 This could be replace by any other class that provides the same
110 semantics to the Client. This means:
111 - can send messages and receive messages
112 - safe for multithreading
116 def __init__(self, address, timeouts=None, eom=None):
117 """Constructor for the Client class.
120 - address: a valid address the the used transport class
121 - timeout: a list of timeouts, to be used on connect and read/write
122 - eom: an identifier to be used as end-of-message which the
123 upper-layer will guarantee that this identifier will not appear
126 There are two timeouts used since we might want to wait for a long
127 time for a response, but the connect timeout should be lower.
129 If not passed, we use a default of 10 and respectively 60 seconds.
131 Note that on reading data, since the timeout applies to an
132 invidual receive, it might be that the total duration is longer
133 than timeout value passed (we make a hard limit at twice the read
137 self.address = address
139 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
141 self._ctimeout, self._rwtimeout = timeouts
145 self._msgs = collections.deque()
153 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
154 self.socket.settimeout(self._ctimeout)
156 self.socket.connect(address)
157 except socket.timeout, err:
158 raise TimeoutError("Connect timed out: %s" % str(err))
159 except socket.error, err:
160 if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
161 raise NoMasterError((address,))
163 self.socket.settimeout(self._rwtimeout)
164 except (socket.error, NoMasterError):
165 if self.socket is not None:
170 def _CheckSocket(self):
171 """Make sure we are connected.
174 if self.socket is None:
175 raise ProtocolError("Connection is closed")
180 This just sends a message and doesn't wait for the response.
184 raise EncodingError("Message terminator found in payload")
187 self.socket.sendall(msg + self.eom)
188 except socket.timeout, err:
189 raise TimeoutError("Sending timeout: %s" % str(err))
192 """Try to receive a messae from the socket.
194 In case we already have messages queued, we just return from the
195 queue. Otherwise, we try to read data with a _rwtimeout network
196 timeout, and making sure we don't go over 2x_rwtimeout as a global
201 etime = time.time() + self._rwtimeout
202 while not self._msgs:
203 if time.time() > etime:
204 raise TimeoutError("Extended receive timeout")
206 data = self.socket.recv(4096)
207 except socket.timeout, err:
208 raise TimeoutError("Receive timeout: %s" % str(err))
210 raise ConnectionClosedError("Connection closed while reading")
211 new_msgs = (self._buffer + data).split(self.eom)
212 self._buffer = new_msgs.pop()
213 self._msgs.extend(new_msgs)
214 return self._msgs.popleft()
217 """Send a message and wait for the response.
219 This is just a wrapper over Send and Recv.
226 """Close the socket"""
227 if self.socket is not None:
232 class Client(object):
233 """High-level client implementation.
235 This uses a backing Transport-like class on top of which it
236 implements data serialization/deserialization.
239 def __init__(self, address=None, timeouts=None, transport=Transport):
240 """Constructor for the Client class.
243 - address: a valid address the the used transport class
244 - timeout: a list of timeouts, to be used on connect and read/write
245 - transport: a Transport-like class
248 If timeout is not passed, the default timeouts of the transport
253 address = constants.MASTER_SOCKET
254 self.transport = transport(address, timeouts=timeouts)
256 def CallMethod(self, method, args):
257 """Send a generic request and return the response.
266 # Send request and wait for response
267 result = self.transport.Call(serializer.DumpJson(request, indent=False))
269 data = serializer.LoadJson(result)
270 except Exception, err:
271 raise ProtocolError("Error while deserializing response: %s" % str(err))
274 if (not isinstance(data, dict) or
275 KEY_SUCCESS not in data or
276 KEY_RESULT not in data):
277 raise DecodingError("Invalid response from server: %s" % str(data))
279 result = data[KEY_RESULT]
281 if not data[KEY_SUCCESS]:
282 # TODO: decide on a standard exception
283 if (isinstance(result, (tuple, list)) and len(result) == 2 and
284 isinstance(result[1], (tuple, list))):
285 # custom ganeti errors
286 err_class = errors.GetErrorClass(result[0])
287 if err_class is not None:
288 raise err_class, tuple(result[1])
290 raise RequestError(result)
294 def SubmitJob(self, ops):
295 ops_state = map(lambda op: op.__getstate__(), ops)
296 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
298 def CancelJob(self, job_id):
299 return self.CallMethod(REQ_CANCEL_JOB, job_id)
301 def ArchiveJob(self, job_id):
302 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
304 def AutoArchiveJobs(self, age):
305 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, age)
307 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
308 timeout = (DEF_RWTO - 1) / 2
310 result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
311 (job_id, fields, prev_job_info,
312 prev_log_serial, timeout))
313 if result != constants.JOB_NOTCHANGED:
317 def QueryJobs(self, job_ids, fields):
318 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
320 def QueryInstances(self, names, fields):
321 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields))
323 def QueryNodes(self, names, fields):
324 return self.CallMethod(REQ_QUERY_NODES, (names, fields))
326 def QueryExports(self, nodes):
327 return self.CallMethod(REQ_QUERY_EXPORTS, nodes)
329 def QueryConfigValues(self, fields):
330 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
332 # TODO: class Server(object)