4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocl. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also be used by the master daemon.
37 from ganeti import opcodes
38 from ganeti import constants
41 KEY_REQUEST = 'request'
51 class ProtocolError(Exception):
52 """Denotes an error in the server communication"""
55 class ConnectionClosedError(ProtocolError):
56 """Connection closed error"""
59 class TimeoutError(ProtocolError):
60 """Operation timeout error"""
63 class EncodingError(ProtocolError):
64 """Encoding failure on the sending side"""
67 class DecodingError(ProtocolError):
68 """Decoding failure on the receiving side"""
71 class RequestError(ProtocolError):
74 This signifies an error in the request format or request handling,
75 but not (e.g.) an error in starting up an instance.
77 Some common conditions that can trigger this exception:
78 - job submission failed because the job data was wrong
79 - query failed because required fields were missing
84 def SerializeJob(job):
85 """Convert a job description to a string format.
88 return simplejson.dumps(job.__getstate__())
91 def UnserializeJob(data):
92 """Load a job from a string format"""
94 new_data = simplejson.loads(data)
95 except Exception, err:
96 raise DecodingError("Error while unserializing: %s" % str(err))
98 job.__setstate__(new_data)
103 """Low-level transport class.
105 This is used on the client side.
107 This could be replace by any other class that provides the same
108 semantics to the Client. This means:
109 - can send messages and receive messages
110 - safe for multithreading
114 def __init__(self, address, timeouts=None, eom=None):
115 """Constructor for the Client class.
118 - address: a valid address the the used transport class
119 - timeout: a list of timeouts, to be used on connect and read/write
120 - eom: an identifier to be used as end-of-message which the
121 upper-layer will guarantee that this identifier will not appear
124 There are two timeouts used since we might want to wait for a long
125 time for a response, but the connect timeout should be lower.
127 If not passed, we use a default of 10 and respectively 60 seconds.
129 Note that on reading data, since the timeout applies to an
130 invidual receive, it might be that the total duration is longer
131 than timeout value passed (we make a hard limit at twice the read
135 self.address = address
137 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
139 self._ctimeout, self._rwtimeout = timeouts
143 self._msgs = collections.deque()
151 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
152 self.socket.settimeout(self._ctimeout)
154 self.socket.connect(address)
155 except socket.timeout, err:
156 raise TimeoutError("Connection timed out: %s" % str(err))
157 self.socket.settimeout(self._rwtimeout)
159 if self.socket is not None:
164 def _CheckSocket(self):
165 """Make sure we are connected.
168 if self.socket is None:
169 raise ProtocolError("Connection is closed")
174 This just sends a message and doesn't wait for the response.
178 raise EncodingError("Message terminator found in payload")
181 self.socket.sendall(msg + self.eom)
182 except socket.timeout, err:
183 raise TimeoutError("Sending timeout: %s" % str(err))
186 """Try to receive a messae from the socket.
188 In case we already have messages queued, we just return from the
189 queue. Otherwise, we try to read data with a _rwtimeout network
190 timeout, and making sure we don't go over 2x_rwtimeout as a global
195 etime = time.time() + self._rwtimeout
196 while not self._msgs:
197 if time.time() > etime:
198 raise TimeoutError("Extended receive timeout")
200 data = self.socket.recv(4096)
201 except socket.timeout, err:
202 raise TimeoutError("Receive timeout: %s" % str(err))
204 raise ConnectionClosedError("Connection closed while reading")
205 new_msgs = (self._buffer + data).split(self.eom)
206 self._buffer = new_msgs.pop()
207 self._msgs.extend(new_msgs)
208 return self._msgs.popleft()
211 """Send a message and wait for the response.
213 This is just a wrapper over Send and Recv.
220 """Close the socket"""
221 if self.socket is not None:
226 class Client(object):
227 """High-level client implementation.
229 This uses a backing Transport-like class on top of which it
230 implements data serialization/deserialization.
233 def __init__(self, address=None, timeouts=None, transport=Transport):
234 """Constructor for the Client class.
237 - address: a valid address the the used transport class
238 - timeout: a list of timeouts, to be used on connect and read/write
239 - transport: a Transport-like class
242 If timeout is not passed, the default timeouts of the transport
247 address = constants.MASTER_SOCKET
248 self.transport = transport(address, timeouts=timeouts)
250 def SendRequest(self, request, data):
251 """Send a generic request and return the response.
254 msg = {KEY_REQUEST: request, KEY_DATA: data}
255 result = self.transport.Call(simplejson.dumps(msg))
257 data = simplejson.loads(result)
258 except Exception, err:
259 raise ProtocolError("Error while deserializing response: %s" % str(err))
260 if (not isinstance(data, dict) or
261 'success' not in data or
262 'result' not in data):
263 raise DecodingError("Invalid response from server: %s" % str(data))
266 def SubmitJob(self, job):
268 result = self.SendRequest(REQ_SUBMIT, SerializeJob(job))
269 if not result['success']:
270 raise RequestError(result['result'])
271 return result['result']
273 def Query(self, data):
275 result = self.SendRequest(REQ_QUERY, data)
276 if not result['success']:
277 raise RequestError(result[result])
278 result = result['result']
279 if data["object"] == "jobs":
280 # custom job processing of query values
282 for idx, field in enumerate(data["fields"]):
283 if field == "op_list":
284 row[idx] = [opcodes.OpCode.LoadOpCode(i) for i in row[idx]]