4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocl. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also be used by the master daemon.
37 from ganeti import opcodes
40 KEY_REQUEST = 'request'
50 class ProtocolError(Exception):
51 """Denotes an error in the server communication"""
54 class ConnectionClosedError(ProtocolError):
55 """Connection closed error"""
58 class TimeoutError(ProtocolError):
59 """Operation timeout error"""
62 class EncodingError(ProtocolError):
63 """Encoding failure on the sending side"""
66 class DecodingError(ProtocolError):
67 """Decoding failure on the receiving side"""
70 def SerializeJob(job):
71 """Convert a job description to a string format.
74 return simplejson.dumps(job.__getstate__())
77 def UnserializeJob(data):
78 """Load a job from a string format"""
80 new_data = simplejson.loads(data)
81 except Exception, err:
82 raise DecodingError("Error while unserializing: %s" % str(err))
84 job.__setstate__(new_data)
89 """Low-level transport class.
91 This is used on the client side.
93 This could be replace by any other class that provides the same
94 semantics to the Client. This means:
95 - can send messages and receive messages
96 - safe for multithreading
100 def __init__(self, address, timeouts=None, eom=None):
101 """Constructor for the Client class.
104 - address: a valid address the the used transport class
105 - timeout: a list of timeouts, to be used on connect and read/write
106 - eom: an identifier to be used as end-of-message which the
107 upper-layer will guarantee that this identifier will not appear
110 There are two timeouts used since we might want to wait for a long
111 time for a response, but the connect timeout should be lower.
113 If not passed, we use a default of 10 and respectively 60 seconds.
115 Note that on reading data, since the timeout applies to an
116 invidual receive, it might be that the total duration is longer
117 than timeout value passed (we make a hard limit at twice the read
121 self.address = address
123 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
125 self._ctimeout, self._rwtimeout = timeouts
129 self._msgs = collections.deque()
137 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
138 self.socket.settimeout(self._ctimeout)
140 self.socket.connect(address)
141 except socket.timeout, err:
142 raise TimeoutError("Connection timed out: %s" % str(err))
143 self.socket.settimeout(self._rwtimeout)
145 if self.socket is not None:
150 def _CheckSocket(self):
151 """Make sure we are connected.
154 if self.socket is None:
155 raise ProtocolError("Connection is closed")
160 This just sends a message and doesn't wait for the response.
164 raise EncodingError("Message terminator found in payload")
167 self.socket.sendall(msg + self.eom)
168 except socket.timeout, err:
169 raise TimeoutError("Sending timeout: %s" % str(err))
172 """Try to receive a messae from the socket.
174 In case we already have messages queued, we just return from the
175 queue. Otherwise, we try to read data with a _rwtimeout network
176 timeout, and making sure we don't go over 2x_rwtimeout as a global
181 etime = time.time() + self._rwtimeout
182 while not self._msgs:
183 if time.time() > etime:
184 raise TimeoutError("Extended receive timeout")
186 data = self.socket.recv(4096)
187 except socket.timeout, err:
188 raise TimeoutError("Receive timeout: %s" % str(err))
190 raise ConnectionClosedError("Connection closed while reading")
191 new_msgs = (self._buffer + data).split(self.eom)
192 self._buffer = new_msgs.pop()
193 self._msgs.extend(new_msgs)
194 return self._msgs.popleft()
197 """Send a message and wait for the response.
199 This is just a wrapper over Send and Recv.
206 """Close the socket"""
207 if self.socket is not None:
212 class Client(object):
213 """High-level client implementation.
215 This uses a backing Transport-like class on top of which it
216 implements data serialization/deserialization.
219 def __init__(self, address, timeouts=None, transport=Transport):
220 """Constructor for the Client class.
223 - address: a valid address the the used transport class
224 - timeout: a list of timeouts, to be used on connect and read/write
225 - transport: a Transport-like class
228 If timeout is not passed, the default timeouts of the transport
232 self.transport = transport(address, timeouts=timeouts)
234 def SendRequest(self, request, data):
235 """Send a generic request and return the response.
238 msg = {KEY_REQUEST: request, KEY_DATA: data}
239 result = self.transport.Call(simplejson.dumps(msg))
241 data = simplejson.loads(result)
242 except Exception, err:
243 raise ProtocolError("Error while deserializing response: %s" % str(err))
246 def SubmitJob(self, job):
248 return self.SendRequest(REQ_SUBMIT, SerializeJob(job))
250 def Query(self, data):
252 return self.SendRequest(REQ_QUERY, data)