4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocl. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also be used by the master daemon.
37 from ganeti import serializer
38 from ganeti import constants
43 KEY_SUCCESS = "success"
46 REQ_SUBMIT_JOB = "SubmitJob"
47 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
48 REQ_CANCEL_JOB = "CancelJob"
49 REQ_ARCHIVE_JOB = "ArchiveJob"
50 REQ_QUERY_JOBS = "QueryJobs"
51 REQ_QUERY_INSTANCES = "QueryInstances"
52 REQ_QUERY_NODES = "QueryNodes"
53 REQ_QUERY_EXPORTS = "QueryExports"
59 class ProtocolError(Exception):
60 """Denotes an error in the server communication"""
63 class ConnectionClosedError(ProtocolError):
64 """Connection closed error"""
67 class TimeoutError(ProtocolError):
68 """Operation timeout error"""
71 class EncodingError(ProtocolError):
72 """Encoding failure on the sending side"""
75 class DecodingError(ProtocolError):
76 """Decoding failure on the receiving side"""
79 class RequestError(ProtocolError):
82 This signifies an error in the request format or request handling,
83 but not (e.g.) an error in starting up an instance.
85 Some common conditions that can trigger this exception:
86 - job submission failed because the job data was wrong
87 - query failed because required fields were missing
92 class NoMasterError(ProtocolError):
93 """The master cannot be reached
95 This means that the master daemon is not running or the socket has
102 """Low-level transport class.
104 This is used on the client side.
106 This could be replace by any other class that provides the same
107 semantics to the Client. This means:
108 - can send messages and receive messages
109 - safe for multithreading
113 def __init__(self, address, timeouts=None, eom=None):
114 """Constructor for the Client class.
117 - address: a valid address the the used transport class
118 - timeout: a list of timeouts, to be used on connect and read/write
119 - eom: an identifier to be used as end-of-message which the
120 upper-layer will guarantee that this identifier will not appear
123 There are two timeouts used since we might want to wait for a long
124 time for a response, but the connect timeout should be lower.
126 If not passed, we use a default of 10 and respectively 60 seconds.
128 Note that on reading data, since the timeout applies to an
129 invidual receive, it might be that the total duration is longer
130 than timeout value passed (we make a hard limit at twice the read
134 self.address = address
136 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
138 self._ctimeout, self._rwtimeout = timeouts
142 self._msgs = collections.deque()
150 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
151 self.socket.settimeout(self._ctimeout)
153 self.socket.connect(address)
154 except socket.timeout, err:
155 raise TimeoutError("Connect timed out: %s" % str(err))
156 except socket.error, err:
157 if err.args[0] == errno.ENOENT:
158 raise NoMasterError((address,))
160 self.socket.settimeout(self._rwtimeout)
161 except (socket.error, NoMasterError):
162 if self.socket is not None:
167 def _CheckSocket(self):
168 """Make sure we are connected.
171 if self.socket is None:
172 raise ProtocolError("Connection is closed")
177 This just sends a message and doesn't wait for the response.
181 raise EncodingError("Message terminator found in payload")
184 self.socket.sendall(msg + self.eom)
185 except socket.timeout, err:
186 raise TimeoutError("Sending timeout: %s" % str(err))
189 """Try to receive a messae from the socket.
191 In case we already have messages queued, we just return from the
192 queue. Otherwise, we try to read data with a _rwtimeout network
193 timeout, and making sure we don't go over 2x_rwtimeout as a global
198 etime = time.time() + self._rwtimeout
199 while not self._msgs:
200 if time.time() > etime:
201 raise TimeoutError("Extended receive timeout")
203 data = self.socket.recv(4096)
204 except socket.timeout, err:
205 raise TimeoutError("Receive timeout: %s" % str(err))
207 raise ConnectionClosedError("Connection closed while reading")
208 new_msgs = (self._buffer + data).split(self.eom)
209 self._buffer = new_msgs.pop()
210 self._msgs.extend(new_msgs)
211 return self._msgs.popleft()
214 """Send a message and wait for the response.
216 This is just a wrapper over Send and Recv.
223 """Close the socket"""
224 if self.socket is not None:
229 class Client(object):
230 """High-level client implementation.
232 This uses a backing Transport-like class on top of which it
233 implements data serialization/deserialization.
236 def __init__(self, address=None, timeouts=None, transport=Transport):
237 """Constructor for the Client class.
240 - address: a valid address the the used transport class
241 - timeout: a list of timeouts, to be used on connect and read/write
242 - transport: a Transport-like class
245 If timeout is not passed, the default timeouts of the transport
250 address = constants.MASTER_SOCKET
251 self.transport = transport(address, timeouts=timeouts)
253 def CallMethod(self, method, args):
254 """Send a generic request and return the response.
263 # Send request and wait for response
264 result = self.transport.Call(serializer.DumpJson(request, indent=False))
266 data = serializer.LoadJson(result)
267 except Exception, err:
268 raise ProtocolError("Error while deserializing response: %s" % str(err))
271 if (not isinstance(data, dict) or
272 KEY_SUCCESS not in data or
273 KEY_RESULT not in data):
274 raise DecodingError("Invalid response from server: %s" % str(data))
276 if not data[KEY_SUCCESS]:
277 # TODO: decide on a standard exception
278 raise RequestError(data[KEY_RESULT])
280 return data[KEY_RESULT]
282 def SubmitJob(self, ops):
283 ops_state = map(lambda op: op.__getstate__(), ops)
284 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
286 def CancelJob(self, job_id):
287 return self.CallMethod(REQ_CANCEL_JOB, job_id)
289 def ArchiveJob(self, job_id):
290 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
292 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
293 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
294 (job_id, fields, prev_job_info, prev_log_serial))
296 def QueryJobs(self, job_ids, fields):
297 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
299 def QueryInstances(self, names, fields):
300 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields))
302 def QueryNodes(self, names, fields):
303 return self.CallMethod(REQ_QUERY_NODES, (names, fields))
305 def QueryExports(self, nodes):
306 return self.CallMethod(REQ_QUERY_EXPORTS, nodes)
308 # TODO: class Server(object)