4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocl. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also be used by the master daemon.
38 from ganeti import opcodes
39 from ganeti import constants
42 KEY_REQUEST = 'request'
52 class ProtocolError(Exception):
53 """Denotes an error in the server communication"""
56 class ConnectionClosedError(ProtocolError):
57 """Connection closed error"""
60 class TimeoutError(ProtocolError):
61 """Operation timeout error"""
64 class EncodingError(ProtocolError):
65 """Encoding failure on the sending side"""
68 class DecodingError(ProtocolError):
69 """Decoding failure on the receiving side"""
72 class RequestError(ProtocolError):
75 This signifies an error in the request format or request handling,
76 but not (e.g.) an error in starting up an instance.
78 Some common conditions that can trigger this exception:
79 - job submission failed because the job data was wrong
80 - query failed because required fields were missing
84 class NoMasterError(ProtocolError):
85 """The master cannot be reached
87 This means that the master daemon is not running or the socket has
93 def SerializeJob(job):
94 """Convert a job description to a string format.
97 return simplejson.dumps(job.__getstate__())
100 def UnserializeJob(data):
101 """Load a job from a string format"""
103 new_data = simplejson.loads(data)
104 except Exception, err:
105 raise DecodingError("Error while unserializing: %s" % str(err))
107 job.__setstate__(new_data)
112 """Low-level transport class.
114 This is used on the client side.
116 This could be replace by any other class that provides the same
117 semantics to the Client. This means:
118 - can send messages and receive messages
119 - safe for multithreading
123 def __init__(self, address, timeouts=None, eom=None):
124 """Constructor for the Client class.
127 - address: a valid address the the used transport class
128 - timeout: a list of timeouts, to be used on connect and read/write
129 - eom: an identifier to be used as end-of-message which the
130 upper-layer will guarantee that this identifier will not appear
133 There are two timeouts used since we might want to wait for a long
134 time for a response, but the connect timeout should be lower.
136 If not passed, we use a default of 10 and respectively 60 seconds.
138 Note that on reading data, since the timeout applies to an
139 invidual receive, it might be that the total duration is longer
140 than timeout value passed (we make a hard limit at twice the read
144 self.address = address
146 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
148 self._ctimeout, self._rwtimeout = timeouts
152 self._msgs = collections.deque()
160 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
161 self.socket.settimeout(self._ctimeout)
163 self.socket.connect(address)
164 except socket.timeout, err:
165 raise TimeoutError("Connect timed out: %s" % str(err))
166 except socket.error, err:
167 if err.args[0] == errno.ENOENT:
168 raise NoMasterError((address,))
170 self.socket.settimeout(self._rwtimeout)
171 except (socket.error, NoMasterError):
172 if self.socket is not None:
177 def _CheckSocket(self):
178 """Make sure we are connected.
181 if self.socket is None:
182 raise ProtocolError("Connection is closed")
187 This just sends a message and doesn't wait for the response.
191 raise EncodingError("Message terminator found in payload")
194 self.socket.sendall(msg + self.eom)
195 except socket.timeout, err:
196 raise TimeoutError("Sending timeout: %s" % str(err))
199 """Try to receive a messae from the socket.
201 In case we already have messages queued, we just return from the
202 queue. Otherwise, we try to read data with a _rwtimeout network
203 timeout, and making sure we don't go over 2x_rwtimeout as a global
208 etime = time.time() + self._rwtimeout
209 while not self._msgs:
210 if time.time() > etime:
211 raise TimeoutError("Extended receive timeout")
213 data = self.socket.recv(4096)
214 except socket.timeout, err:
215 raise TimeoutError("Receive timeout: %s" % str(err))
217 raise ConnectionClosedError("Connection closed while reading")
218 new_msgs = (self._buffer + data).split(self.eom)
219 self._buffer = new_msgs.pop()
220 self._msgs.extend(new_msgs)
221 return self._msgs.popleft()
224 """Send a message and wait for the response.
226 This is just a wrapper over Send and Recv.
233 """Close the socket"""
234 if self.socket is not None:
239 class Client(object):
240 """High-level client implementation.
242 This uses a backing Transport-like class on top of which it
243 implements data serialization/deserialization.
246 def __init__(self, address=None, timeouts=None, transport=Transport):
247 """Constructor for the Client class.
250 - address: a valid address the the used transport class
251 - timeout: a list of timeouts, to be used on connect and read/write
252 - transport: a Transport-like class
255 If timeout is not passed, the default timeouts of the transport
260 address = constants.MASTER_SOCKET
261 self.transport = transport(address, timeouts=timeouts)
263 def SendRequest(self, request, data):
264 """Send a generic request and return the response.
267 msg = {KEY_REQUEST: request, KEY_DATA: data}
268 result = self.transport.Call(simplejson.dumps(msg))
270 data = simplejson.loads(result)
271 except Exception, err:
272 raise ProtocolError("Error while deserializing response: %s" % str(err))
273 if (not isinstance(data, dict) or
274 'success' not in data or
275 'result' not in data):
276 raise DecodingError("Invalid response from server: %s" % str(data))
279 def SubmitJob(self, job):
281 result = self.SendRequest(REQ_SUBMIT, SerializeJob(job))
282 if not result['success']:
283 raise RequestError(result['result'])
284 return result['result']
286 def Query(self, data):
288 result = self.SendRequest(REQ_QUERY, data)
289 if not result['success']:
290 raise RequestError(result[result])
291 result = result['result']
292 if data["object"] == "jobs":
293 # custom job processing of query values
295 for idx, field in enumerate(data["fields"]):
296 if field == "op_list":
297 row[idx] = [opcodes.OpCode.LoadOpCode(i) for i in row[idx]]