4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
44 KEY_SUCCESS = "success"
47 REQ_SUBMIT_JOB = "SubmitJob"
48 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
49 REQ_CANCEL_JOB = "CancelJob"
50 REQ_ARCHIVE_JOB = "ArchiveJob"
51 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
52 REQ_QUERY_JOBS = "QueryJobs"
53 REQ_QUERY_INSTANCES = "QueryInstances"
54 REQ_QUERY_NODES = "QueryNodes"
55 REQ_QUERY_EXPORTS = "QueryExports"
56 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
57 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
58 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
64 class ProtocolError(Exception):
65 """Denotes an error in the server communication"""
68 class ConnectionClosedError(ProtocolError):
69 """Connection closed error"""
72 class TimeoutError(ProtocolError):
73 """Operation timeout error"""
76 class EncodingError(ProtocolError):
77 """Encoding failure on the sending side"""
80 class DecodingError(ProtocolError):
81 """Decoding failure on the receiving side"""
84 class RequestError(ProtocolError):
87 This signifies an error in the request format or request handling,
88 but not (e.g.) an error in starting up an instance.
90 Some common conditions that can trigger this exception:
91 - job submission failed because the job data was wrong
92 - query failed because required fields were missing
97 class NoMasterError(ProtocolError):
98 """The master cannot be reached
100 This means that the master daemon is not running or the socket has
107 """Low-level transport class.
109 This is used on the client side.
111 This could be replace by any other class that provides the same
112 semantics to the Client. This means:
113 - can send messages and receive messages
114 - safe for multithreading
118 def __init__(self, address, timeouts=None, eom=None):
119 """Constructor for the Client class.
122 - address: a valid address the the used transport class
123 - timeout: a list of timeouts, to be used on connect and read/write
124 - eom: an identifier to be used as end-of-message which the
125 upper-layer will guarantee that this identifier will not appear
128 There are two timeouts used since we might want to wait for a long
129 time for a response, but the connect timeout should be lower.
131 If not passed, we use a default of 10 and respectively 60 seconds.
133 Note that on reading data, since the timeout applies to an
134 invidual receive, it might be that the total duration is longer
135 than timeout value passed (we make a hard limit at twice the read
139 self.address = address
141 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
143 self._ctimeout, self._rwtimeout = timeouts
147 self._msgs = collections.deque()
155 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
156 self.socket.settimeout(self._ctimeout)
158 self.socket.connect(address)
159 except socket.timeout, err:
160 raise TimeoutError("Connect timed out: %s" % str(err))
161 except socket.error, err:
162 if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
163 raise NoMasterError((address,))
165 self.socket.settimeout(self._rwtimeout)
166 except (socket.error, NoMasterError):
167 if self.socket is not None:
172 def _CheckSocket(self):
173 """Make sure we are connected.
176 if self.socket is None:
177 raise ProtocolError("Connection is closed")
182 This just sends a message and doesn't wait for the response.
186 raise EncodingError("Message terminator found in payload")
189 self.socket.sendall(msg + self.eom)
190 except socket.timeout, err:
191 raise TimeoutError("Sending timeout: %s" % str(err))
194 """Try to receive a messae from the socket.
196 In case we already have messages queued, we just return from the
197 queue. Otherwise, we try to read data with a _rwtimeout network
198 timeout, and making sure we don't go over 2x_rwtimeout as a global
203 etime = time.time() + self._rwtimeout
204 while not self._msgs:
205 if time.time() > etime:
206 raise TimeoutError("Extended receive timeout")
208 data = self.socket.recv(4096)
209 except socket.timeout, err:
210 raise TimeoutError("Receive timeout: %s" % str(err))
212 raise ConnectionClosedError("Connection closed while reading")
213 new_msgs = (self._buffer + data).split(self.eom)
214 self._buffer = new_msgs.pop()
215 self._msgs.extend(new_msgs)
216 return self._msgs.popleft()
219 """Send a message and wait for the response.
221 This is just a wrapper over Send and Recv.
228 """Close the socket"""
229 if self.socket is not None:
234 class Client(object):
235 """High-level client implementation.
237 This uses a backing Transport-like class on top of which it
238 implements data serialization/deserialization.
241 def __init__(self, address=None, timeouts=None, transport=Transport):
242 """Constructor for the Client class.
245 - address: a valid address the the used transport class
246 - timeout: a list of timeouts, to be used on connect and read/write
247 - transport: a Transport-like class
250 If timeout is not passed, the default timeouts of the transport
255 address = constants.MASTER_SOCKET
256 self.address = address
257 self.timeouts = timeouts
258 self.transport_class = transport
259 self.transport = None
260 self._InitTransport()
262 def _InitTransport(self):
263 """(Re)initialize the transport if needed.
266 if self.transport is None:
267 self.transport = self.transport_class(self.address,
268 timeouts=self.timeouts)
270 def _CloseTransport(self):
271 """Close the transport, ignoring errors.
274 if self.transport is None:
277 old_transp = self.transport
278 self.transport = None
280 except Exception, err:
283 def CallMethod(self, method, args):
284 """Send a generic request and return the response.
293 # Serialize the request
294 send_data = serializer.DumpJson(request, indent=False)
296 # Send request and wait for response
298 self._InitTransport()
299 result = self.transport.Call(send_data)
301 self._CloseTransport()
306 data = serializer.LoadJson(result)
307 except Exception, err:
308 raise ProtocolError("Error while deserializing response: %s" % str(err))
311 if (not isinstance(data, dict) or
312 KEY_SUCCESS not in data or
313 KEY_RESULT not in data):
314 raise DecodingError("Invalid response from server: %s" % str(data))
316 result = data[KEY_RESULT]
318 if not data[KEY_SUCCESS]:
319 # TODO: decide on a standard exception
320 if (isinstance(result, (tuple, list)) and len(result) == 2 and
321 isinstance(result[1], (tuple, list))):
322 # custom ganeti errors
323 err_class = errors.GetErrorClass(result[0])
324 if err_class is not None:
325 raise err_class, tuple(result[1])
327 raise RequestError(result)
331 def SetQueueDrainFlag(self, drain_flag):
332 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
334 def SubmitJob(self, ops):
335 ops_state = map(lambda op: op.__getstate__(), ops)
336 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
338 def CancelJob(self, job_id):
339 return self.CallMethod(REQ_CANCEL_JOB, job_id)
341 def ArchiveJob(self, job_id):
342 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
344 def AutoArchiveJobs(self, age):
345 timeout = (DEF_RWTO - 1) / 2
346 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
348 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
349 timeout = (DEF_RWTO - 1) / 2
351 result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
352 (job_id, fields, prev_job_info,
353 prev_log_serial, timeout))
354 if result != constants.JOB_NOTCHANGED:
358 def QueryJobs(self, job_ids, fields):
359 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
361 def QueryInstances(self, names, fields, use_locking):
362 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
364 def QueryNodes(self, names, fields, use_locking):
365 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
367 def QueryExports(self, nodes, use_locking):
368 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
370 def QueryClusterInfo(self):
371 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
373 def QueryConfigValues(self, fields):
374 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
377 # TODO: class Server(object)