4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
44 KEY_SUCCESS = "success"
47 REQ_SUBMIT_JOB = "SubmitJob"
48 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
49 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
50 REQ_CANCEL_JOB = "CancelJob"
51 REQ_ARCHIVE_JOB = "ArchiveJob"
52 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
53 REQ_QUERY_JOBS = "QueryJobs"
54 REQ_QUERY_INSTANCES = "QueryInstances"
55 REQ_QUERY_NODES = "QueryNodes"
56 REQ_QUERY_EXPORTS = "QueryExports"
57 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
58 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
59 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
60 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
66 class ProtocolError(Exception):
67 """Denotes an error in the server communication"""
70 class ConnectionClosedError(ProtocolError):
71 """Connection closed error"""
74 class TimeoutError(ProtocolError):
75 """Operation timeout error"""
78 class EncodingError(ProtocolError):
79 """Encoding failure on the sending side"""
82 class DecodingError(ProtocolError):
83 """Decoding failure on the receiving side"""
86 class RequestError(ProtocolError):
89 This signifies an error in the request format or request handling,
90 but not (e.g.) an error in starting up an instance.
92 Some common conditions that can trigger this exception:
93 - job submission failed because the job data was wrong
94 - query failed because required fields were missing
99 class NoMasterError(ProtocolError):
100 """The master cannot be reached
102 This means that the master daemon is not running or the socket has
109 """Low-level transport class.
111 This is used on the client side.
113 This could be replace by any other class that provides the same
114 semantics to the Client. This means:
115 - can send messages and receive messages
116 - safe for multithreading
120 def __init__(self, address, timeouts=None, eom=None):
121 """Constructor for the Client class.
124 - address: a valid address the the used transport class
125 - timeout: a list of timeouts, to be used on connect and read/write
126 - eom: an identifier to be used as end-of-message which the
127 upper-layer will guarantee that this identifier will not appear
130 There are two timeouts used since we might want to wait for a long
131 time for a response, but the connect timeout should be lower.
133 If not passed, we use a default of 10 and respectively 60 seconds.
135 Note that on reading data, since the timeout applies to an
136 invidual receive, it might be that the total duration is longer
137 than timeout value passed (we make a hard limit at twice the read
141 self.address = address
143 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
145 self._ctimeout, self._rwtimeout = timeouts
149 self._msgs = collections.deque()
157 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
158 self.socket.settimeout(self._ctimeout)
160 self.socket.connect(address)
161 except socket.timeout, err:
162 raise TimeoutError("Connect timed out: %s" % str(err))
163 except socket.error, err:
164 if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
165 raise NoMasterError((address,))
167 self.socket.settimeout(self._rwtimeout)
168 except (socket.error, NoMasterError):
169 if self.socket is not None:
174 def _CheckSocket(self):
175 """Make sure we are connected.
178 if self.socket is None:
179 raise ProtocolError("Connection is closed")
184 This just sends a message and doesn't wait for the response.
188 raise EncodingError("Message terminator found in payload")
191 # TODO: sendall is not guaranteed to send everything
192 self.socket.sendall(msg + self.eom)
193 except socket.timeout, err:
194 raise TimeoutError("Sending timeout: %s" % str(err))
197 """Try to receive a message from the socket.
199 In case we already have messages queued, we just return from the
200 queue. Otherwise, we try to read data with a _rwtimeout network
201 timeout, and making sure we don't go over 2x_rwtimeout as a global
206 etime = time.time() + self._rwtimeout
207 while not self._msgs:
208 if time.time() > etime:
209 raise TimeoutError("Extended receive timeout")
212 data = self.socket.recv(4096)
213 except socket.error, err:
214 if err.args and err.args[0] == errno.EAGAIN:
217 except socket.timeout, err:
218 raise TimeoutError("Receive timeout: %s" % str(err))
221 raise ConnectionClosedError("Connection closed while reading")
222 new_msgs = (self._buffer + data).split(self.eom)
223 self._buffer = new_msgs.pop()
224 self._msgs.extend(new_msgs)
225 return self._msgs.popleft()
228 """Send a message and wait for the response.
230 This is just a wrapper over Send and Recv.
237 """Close the socket"""
238 if self.socket is not None:
243 class Client(object):
244 """High-level client implementation.
246 This uses a backing Transport-like class on top of which it
247 implements data serialization/deserialization.
250 def __init__(self, address=None, timeouts=None, transport=Transport):
251 """Constructor for the Client class.
254 - address: a valid address the the used transport class
255 - timeout: a list of timeouts, to be used on connect and read/write
256 - transport: a Transport-like class
259 If timeout is not passed, the default timeouts of the transport
264 address = constants.MASTER_SOCKET
265 self.address = address
266 self.timeouts = timeouts
267 self.transport_class = transport
268 self.transport = None
269 self._InitTransport()
271 def _InitTransport(self):
272 """(Re)initialize the transport if needed.
275 if self.transport is None:
276 self.transport = self.transport_class(self.address,
277 timeouts=self.timeouts)
279 def _CloseTransport(self):
280 """Close the transport, ignoring errors.
283 if self.transport is None:
286 old_transp = self.transport
287 self.transport = None
292 def CallMethod(self, method, args):
293 """Send a generic request and return the response.
302 # Serialize the request
303 send_data = serializer.DumpJson(request, indent=False)
305 # Send request and wait for response
307 self._InitTransport()
308 result = self.transport.Call(send_data)
310 self._CloseTransport()
315 data = serializer.LoadJson(result)
316 except Exception, err:
317 raise ProtocolError("Error while deserializing response: %s" % str(err))
320 if (not isinstance(data, dict) or
321 KEY_SUCCESS not in data or
322 KEY_RESULT not in data):
323 raise DecodingError("Invalid response from server: %s" % str(data))
325 result = data[KEY_RESULT]
327 if not data[KEY_SUCCESS]:
328 # TODO: decide on a standard exception
329 if (isinstance(result, (tuple, list)) and len(result) == 2 and
330 isinstance(result[1], (tuple, list))):
331 # custom ganeti errors
332 err_class = errors.GetErrorClass(result[0])
333 if err_class is not None:
334 raise err_class, tuple(result[1])
336 raise RequestError(result)
340 def SetQueueDrainFlag(self, drain_flag):
341 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
343 def SetWatcherPause(self, until):
344 return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
346 def SubmitJob(self, ops):
347 ops_state = map(lambda op: op.__getstate__(), ops)
348 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
350 def SubmitManyJobs(self, jobs):
353 jobs_state.append([op.__getstate__() for op in ops])
354 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
356 def CancelJob(self, job_id):
357 return self.CallMethod(REQ_CANCEL_JOB, job_id)
359 def ArchiveJob(self, job_id):
360 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
362 def AutoArchiveJobs(self, age):
363 timeout = (DEF_RWTO - 1) / 2
364 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
366 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
367 timeout = (DEF_RWTO - 1) / 2
369 result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
370 (job_id, fields, prev_job_info,
371 prev_log_serial, timeout))
372 if result != constants.JOB_NOTCHANGED:
376 def QueryJobs(self, job_ids, fields):
377 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
379 def QueryInstances(self, names, fields, use_locking):
380 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
382 def QueryNodes(self, names, fields, use_locking):
383 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
385 def QueryExports(self, nodes, use_locking):
386 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
388 def QueryClusterInfo(self):
389 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
391 def QueryConfigValues(self, fields):
392 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
395 # TODO: class Server(object)