4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocl. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also be used by the master daemon.
38 from ganeti import opcodes
39 from ganeti import serializer
40 from ganeti import constants
43 KEY_REQUEST = 'request'
53 class ProtocolError(Exception):
54 """Denotes an error in the server communication"""
57 class ConnectionClosedError(ProtocolError):
58 """Connection closed error"""
61 class TimeoutError(ProtocolError):
62 """Operation timeout error"""
65 class EncodingError(ProtocolError):
66 """Encoding failure on the sending side"""
69 class DecodingError(ProtocolError):
70 """Decoding failure on the receiving side"""
73 class RequestError(ProtocolError):
76 This signifies an error in the request format or request handling,
77 but not (e.g.) an error in starting up an instance.
79 Some common conditions that can trigger this exception:
80 - job submission failed because the job data was wrong
81 - query failed because required fields were missing
85 class NoMasterError(ProtocolError):
86 """The master cannot be reached
88 This means that the master daemon is not running or the socket has
94 def SerializeJob(job):
95 """Convert a job description to a string format.
98 return simplejson.dumps(job.__getstate__())
101 def UnserializeJob(data):
102 """Load a job from a string format"""
104 new_data = simplejson.loads(data)
105 except Exception, err:
106 raise DecodingError("Error while unserializing: %s" % str(err))
108 job.__setstate__(new_data)
113 """Low-level transport class.
115 This is used on the client side.
117 This could be replace by any other class that provides the same
118 semantics to the Client. This means:
119 - can send messages and receive messages
120 - safe for multithreading
124 def __init__(self, address, timeouts=None, eom=None):
125 """Constructor for the Client class.
128 - address: a valid address the the used transport class
129 - timeout: a list of timeouts, to be used on connect and read/write
130 - eom: an identifier to be used as end-of-message which the
131 upper-layer will guarantee that this identifier will not appear
134 There are two timeouts used since we might want to wait for a long
135 time for a response, but the connect timeout should be lower.
137 If not passed, we use a default of 10 and respectively 60 seconds.
139 Note that on reading data, since the timeout applies to an
140 invidual receive, it might be that the total duration is longer
141 than timeout value passed (we make a hard limit at twice the read
145 self.address = address
147 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
149 self._ctimeout, self._rwtimeout = timeouts
153 self._msgs = collections.deque()
161 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
162 self.socket.settimeout(self._ctimeout)
164 self.socket.connect(address)
165 except socket.timeout, err:
166 raise TimeoutError("Connect timed out: %s" % str(err))
167 except socket.error, err:
168 if err.args[0] == errno.ENOENT:
169 raise NoMasterError((address,))
171 self.socket.settimeout(self._rwtimeout)
172 except (socket.error, NoMasterError):
173 if self.socket is not None:
178 def _CheckSocket(self):
179 """Make sure we are connected.
182 if self.socket is None:
183 raise ProtocolError("Connection is closed")
188 This just sends a message and doesn't wait for the response.
192 raise EncodingError("Message terminator found in payload")
195 self.socket.sendall(msg + self.eom)
196 except socket.timeout, err:
197 raise TimeoutError("Sending timeout: %s" % str(err))
200 """Try to receive a messae from the socket.
202 In case we already have messages queued, we just return from the
203 queue. Otherwise, we try to read data with a _rwtimeout network
204 timeout, and making sure we don't go over 2x_rwtimeout as a global
209 etime = time.time() + self._rwtimeout
210 while not self._msgs:
211 if time.time() > etime:
212 raise TimeoutError("Extended receive timeout")
214 data = self.socket.recv(4096)
215 except socket.timeout, err:
216 raise TimeoutError("Receive timeout: %s" % str(err))
218 raise ConnectionClosedError("Connection closed while reading")
219 new_msgs = (self._buffer + data).split(self.eom)
220 self._buffer = new_msgs.pop()
221 self._msgs.extend(new_msgs)
222 return self._msgs.popleft()
225 """Send a message and wait for the response.
227 This is just a wrapper over Send and Recv.
234 """Close the socket"""
235 if self.socket is not None:
240 class Client(object):
241 """High-level client implementation.
243 This uses a backing Transport-like class on top of which it
244 implements data serialization/deserialization.
247 def __init__(self, address=None, timeouts=None, transport=Transport):
248 """Constructor for the Client class.
251 - address: a valid address the the used transport class
252 - timeout: a list of timeouts, to be used on connect and read/write
253 - transport: a Transport-like class
256 If timeout is not passed, the default timeouts of the transport
261 address = constants.MASTER_SOCKET
262 self.transport = transport(address, timeouts=timeouts)
264 def SendRequest(self, request, data):
265 """Send a generic request and return the response.
268 msg = {KEY_REQUEST: request, KEY_DATA: data}
269 result = self.transport.Call(serializer.DumpJson(msg, indent=False))
271 data = serializer.LoadJson(result)
272 except Exception, err:
273 raise ProtocolError("Error while deserializing response: %s" % str(err))
274 if (not isinstance(data, dict) or
275 'success' not in data or
276 'result' not in data):
277 raise DecodingError("Invalid response from server: %s" % str(data))
280 def SubmitJob(self, job):
282 result = self.SendRequest(REQ_SUBMIT, SerializeJob(job))
283 if not result['success']:
284 raise RequestError(result['result'])
285 return result['result']
287 def Query(self, data):
289 result = self.SendRequest(REQ_QUERY, data)
290 if not result['success']:
291 raise RequestError(result[result])
292 result = result['result']
293 if data["object"] == "jobs":
294 # custom job processing of query values
296 for idx, field in enumerate(data["fields"]):
297 if field == "op_list":
298 row[idx] = [opcodes.OpCode.LoadOpCode(i) for i in row[idx]]