4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocl. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also be used by the master daemon.
37 from ganeti import serializer
38 from ganeti import constants
43 KEY_SUCCESS = "success"
46 REQ_SUBMIT_JOB = "SubmitJob"
47 REQ_CANCEL_JOB = "CancelJob"
48 REQ_ARCHIVE_JOB = "ArchiveJob"
49 REQ_QUERY_JOBS = "QueryJobs"
50 REQ_QUERY_INSTANCES = "QueryInstances"
56 class ProtocolError(Exception):
57 """Denotes an error in the server communication"""
60 class ConnectionClosedError(ProtocolError):
61 """Connection closed error"""
64 class TimeoutError(ProtocolError):
65 """Operation timeout error"""
68 class EncodingError(ProtocolError):
69 """Encoding failure on the sending side"""
72 class DecodingError(ProtocolError):
73 """Decoding failure on the receiving side"""
76 class RequestError(ProtocolError):
79 This signifies an error in the request format or request handling,
80 but not (e.g.) an error in starting up an instance.
82 Some common conditions that can trigger this exception:
83 - job submission failed because the job data was wrong
84 - query failed because required fields were missing
89 class NoMasterError(ProtocolError):
90 """The master cannot be reached
92 This means that the master daemon is not running or the socket has
99 """Low-level transport class.
101 This is used on the client side.
103 This could be replace by any other class that provides the same
104 semantics to the Client. This means:
105 - can send messages and receive messages
106 - safe for multithreading
110 def __init__(self, address, timeouts=None, eom=None):
111 """Constructor for the Client class.
114 - address: a valid address the the used transport class
115 - timeout: a list of timeouts, to be used on connect and read/write
116 - eom: an identifier to be used as end-of-message which the
117 upper-layer will guarantee that this identifier will not appear
120 There are two timeouts used since we might want to wait for a long
121 time for a response, but the connect timeout should be lower.
123 If not passed, we use a default of 10 and respectively 60 seconds.
125 Note that on reading data, since the timeout applies to an
126 invidual receive, it might be that the total duration is longer
127 than timeout value passed (we make a hard limit at twice the read
131 self.address = address
133 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
135 self._ctimeout, self._rwtimeout = timeouts
139 self._msgs = collections.deque()
147 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
148 self.socket.settimeout(self._ctimeout)
150 self.socket.connect(address)
151 except socket.timeout, err:
152 raise TimeoutError("Connect timed out: %s" % str(err))
153 except socket.error, err:
154 if err.args[0] == errno.ENOENT:
155 raise NoMasterError((address,))
157 self.socket.settimeout(self._rwtimeout)
158 except (socket.error, NoMasterError):
159 if self.socket is not None:
164 def _CheckSocket(self):
165 """Make sure we are connected.
168 if self.socket is None:
169 raise ProtocolError("Connection is closed")
174 This just sends a message and doesn't wait for the response.
178 raise EncodingError("Message terminator found in payload")
181 self.socket.sendall(msg + self.eom)
182 except socket.timeout, err:
183 raise TimeoutError("Sending timeout: %s" % str(err))
186 """Try to receive a messae from the socket.
188 In case we already have messages queued, we just return from the
189 queue. Otherwise, we try to read data with a _rwtimeout network
190 timeout, and making sure we don't go over 2x_rwtimeout as a global
195 etime = time.time() + self._rwtimeout
196 while not self._msgs:
197 if time.time() > etime:
198 raise TimeoutError("Extended receive timeout")
200 data = self.socket.recv(4096)
201 except socket.timeout, err:
202 raise TimeoutError("Receive timeout: %s" % str(err))
204 raise ConnectionClosedError("Connection closed while reading")
205 new_msgs = (self._buffer + data).split(self.eom)
206 self._buffer = new_msgs.pop()
207 self._msgs.extend(new_msgs)
208 return self._msgs.popleft()
211 """Send a message and wait for the response.
213 This is just a wrapper over Send and Recv.
220 """Close the socket"""
221 if self.socket is not None:
226 class Client(object):
227 """High-level client implementation.
229 This uses a backing Transport-like class on top of which it
230 implements data serialization/deserialization.
233 def __init__(self, address=None, timeouts=None, transport=Transport):
234 """Constructor for the Client class.
237 - address: a valid address the the used transport class
238 - timeout: a list of timeouts, to be used on connect and read/write
239 - transport: a Transport-like class
242 If timeout is not passed, the default timeouts of the transport
247 address = constants.MASTER_SOCKET
248 self.transport = transport(address, timeouts=timeouts)
250 def CallMethod(self, method, args):
251 """Send a generic request and return the response.
260 # Send request and wait for response
261 result = self.transport.Call(serializer.DumpJson(request, indent=False))
263 data = serializer.LoadJson(result)
264 except Exception, err:
265 raise ProtocolError("Error while deserializing response: %s" % str(err))
268 if (not isinstance(data, dict) or
269 KEY_SUCCESS not in data or
270 KEY_RESULT not in data):
271 raise DecodingError("Invalid response from server: %s" % str(data))
273 if not data[KEY_SUCCESS]:
274 # TODO: decide on a standard exception
275 raise RequestError(data[KEY_RESULT])
277 return data[KEY_RESULT]
279 def SubmitJob(self, ops):
280 ops_state = map(lambda op: op.__getstate__(), ops)
281 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
283 def CancelJob(self, job_id):
284 return self.CallMethod(REQ_CANCEL_JOB, job_id)
286 def ArchiveJob(self, job_id):
287 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
289 def QueryJobs(self, job_ids, fields):
290 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
292 def QueryInstances(self, names, fields):
293 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields))
295 # TODO: class Server(object)