4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocl. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also be used by the master daemon.
38 from ganeti import opcodes
39 from ganeti import serializer
40 from ganeti import constants
45 KEY_SUCCESS = "success"
48 REQ_SUBMIT_JOB = "SubmitJob"
49 REQ_CANCEL_JOB = "CancelJob"
50 REQ_ARCHIVE_JOB = "ArchiveJob"
51 REQ_QUERY_JOBS = "QueryJobs"
57 class ProtocolError(Exception):
58 """Denotes an error in the server communication"""
61 class ConnectionClosedError(ProtocolError):
62 """Connection closed error"""
65 class TimeoutError(ProtocolError):
66 """Operation timeout error"""
69 class EncodingError(ProtocolError):
70 """Encoding failure on the sending side"""
73 class DecodingError(ProtocolError):
74 """Decoding failure on the receiving side"""
77 class RequestError(ProtocolError):
80 This signifies an error in the request format or request handling,
81 but not (e.g.) an error in starting up an instance.
83 Some common conditions that can trigger this exception:
84 - job submission failed because the job data was wrong
85 - query failed because required fields were missing
90 class NoMasterError(ProtocolError):
91 """The master cannot be reached
93 This means that the master daemon is not running or the socket has
100 """Low-level transport class.
102 This is used on the client side.
104 This could be replace by any other class that provides the same
105 semantics to the Client. This means:
106 - can send messages and receive messages
107 - safe for multithreading
111 def __init__(self, address, timeouts=None, eom=None):
112 """Constructor for the Client class.
115 - address: a valid address the the used transport class
116 - timeout: a list of timeouts, to be used on connect and read/write
117 - eom: an identifier to be used as end-of-message which the
118 upper-layer will guarantee that this identifier will not appear
121 There are two timeouts used since we might want to wait for a long
122 time for a response, but the connect timeout should be lower.
124 If not passed, we use a default of 10 and respectively 60 seconds.
126 Note that on reading data, since the timeout applies to an
127 invidual receive, it might be that the total duration is longer
128 than timeout value passed (we make a hard limit at twice the read
132 self.address = address
134 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
136 self._ctimeout, self._rwtimeout = timeouts
140 self._msgs = collections.deque()
148 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
149 self.socket.settimeout(self._ctimeout)
151 self.socket.connect(address)
152 except socket.timeout, err:
153 raise TimeoutError("Connect timed out: %s" % str(err))
154 except socket.error, err:
155 if err.args[0] == errno.ENOENT:
156 raise NoMasterError((address,))
158 self.socket.settimeout(self._rwtimeout)
159 except (socket.error, NoMasterError):
160 if self.socket is not None:
165 def _CheckSocket(self):
166 """Make sure we are connected.
169 if self.socket is None:
170 raise ProtocolError("Connection is closed")
175 This just sends a message and doesn't wait for the response.
179 raise EncodingError("Message terminator found in payload")
182 self.socket.sendall(msg + self.eom)
183 except socket.timeout, err:
184 raise TimeoutError("Sending timeout: %s" % str(err))
187 """Try to receive a messae from the socket.
189 In case we already have messages queued, we just return from the
190 queue. Otherwise, we try to read data with a _rwtimeout network
191 timeout, and making sure we don't go over 2x_rwtimeout as a global
196 etime = time.time() + self._rwtimeout
197 while not self._msgs:
198 if time.time() > etime:
199 raise TimeoutError("Extended receive timeout")
201 data = self.socket.recv(4096)
202 except socket.timeout, err:
203 raise TimeoutError("Receive timeout: %s" % str(err))
205 raise ConnectionClosedError("Connection closed while reading")
206 new_msgs = (self._buffer + data).split(self.eom)
207 self._buffer = new_msgs.pop()
208 self._msgs.extend(new_msgs)
209 return self._msgs.popleft()
212 """Send a message and wait for the response.
214 This is just a wrapper over Send and Recv.
221 """Close the socket"""
222 if self.socket is not None:
227 class Client(object):
228 """High-level client implementation.
230 This uses a backing Transport-like class on top of which it
231 implements data serialization/deserialization.
234 def __init__(self, address=None, timeouts=None, transport=Transport):
235 """Constructor for the Client class.
238 - address: a valid address the the used transport class
239 - timeout: a list of timeouts, to be used on connect and read/write
240 - transport: a Transport-like class
243 If timeout is not passed, the default timeouts of the transport
248 address = constants.MASTER_SOCKET
249 self.transport = transport(address, timeouts=timeouts)
251 def CallMethod(self, method, args):
252 """Send a generic request and return the response.
261 # Send request and wait for response
262 result = self.transport.Call(serializer.DumpJson(request, indent=False))
264 data = serializer.LoadJson(result)
265 except Exception, err:
266 raise ProtocolError("Error while deserializing response: %s" % str(err))
269 if (not isinstance(data, dict) or
270 KEY_SUCCESS not in data or
271 KEY_RESULT not in data):
272 raise DecodingError("Invalid response from server: %s" % str(data))
274 if not data[KEY_SUCCESS]:
275 # TODO: decide on a standard exception
276 raise RequestError(data[KEY_RESULT])
278 return data[KEY_RESULT]
280 def SubmitJob(self, ops):
281 ops_state = map(lambda op: op.__getstate__(), ops)
282 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
284 def CancelJob(self, job_id):
285 return self.CallMethod(REQ_CANCEL_JOB, job_id)
287 def ArchiveJob(self, job_id):
288 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
290 def QueryJobs(self, job_ids, fields):
291 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
293 # TODO: class Server(object)