4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
44 KEY_SUCCESS = "success"
47 REQ_SUBMIT_JOB = "SubmitJob"
48 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
49 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
50 REQ_CANCEL_JOB = "CancelJob"
51 REQ_ARCHIVE_JOB = "ArchiveJob"
52 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
53 REQ_QUERY_JOBS = "QueryJobs"
54 REQ_QUERY_INSTANCES = "QueryInstances"
55 REQ_QUERY_NODES = "QueryNodes"
56 REQ_QUERY_EXPORTS = "QueryExports"
57 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
58 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
59 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
65 class ProtocolError(Exception):
66 """Denotes an error in the server communication"""
69 class ConnectionClosedError(ProtocolError):
70 """Connection closed error"""
73 class TimeoutError(ProtocolError):
74 """Operation timeout error"""
77 class EncodingError(ProtocolError):
78 """Encoding failure on the sending side"""
81 class DecodingError(ProtocolError):
82 """Decoding failure on the receiving side"""
85 class RequestError(ProtocolError):
88 This signifies an error in the request format or request handling,
89 but not (e.g.) an error in starting up an instance.
91 Some common conditions that can trigger this exception:
92 - job submission failed because the job data was wrong
93 - query failed because required fields were missing
98 class NoMasterError(ProtocolError):
99 """The master cannot be reached
101 This means that the master daemon is not running or the socket has
108 """Low-level transport class.
110 This is used on the client side.
112 This could be replace by any other class that provides the same
113 semantics to the Client. This means:
114 - can send messages and receive messages
115 - safe for multithreading
119 def __init__(self, address, timeouts=None, eom=None):
120 """Constructor for the Client class.
123 - address: a valid address the the used transport class
124 - timeout: a list of timeouts, to be used on connect and read/write
125 - eom: an identifier to be used as end-of-message which the
126 upper-layer will guarantee that this identifier will not appear
129 There are two timeouts used since we might want to wait for a long
130 time for a response, but the connect timeout should be lower.
132 If not passed, we use a default of 10 and respectively 60 seconds.
134 Note that on reading data, since the timeout applies to an
135 invidual receive, it might be that the total duration is longer
136 than timeout value passed (we make a hard limit at twice the read
140 self.address = address
142 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
144 self._ctimeout, self._rwtimeout = timeouts
148 self._msgs = collections.deque()
156 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
157 self.socket.settimeout(self._ctimeout)
159 self.socket.connect(address)
160 except socket.timeout, err:
161 raise TimeoutError("Connect timed out: %s" % str(err))
162 except socket.error, err:
163 if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
164 raise NoMasterError((address,))
166 self.socket.settimeout(self._rwtimeout)
167 except (socket.error, NoMasterError):
168 if self.socket is not None:
173 def _CheckSocket(self):
174 """Make sure we are connected.
177 if self.socket is None:
178 raise ProtocolError("Connection is closed")
183 This just sends a message and doesn't wait for the response.
187 raise EncodingError("Message terminator found in payload")
190 # TODO: sendall is not guaranteed to send everything
191 self.socket.sendall(msg + self.eom)
192 except socket.timeout, err:
193 raise TimeoutError("Sending timeout: %s" % str(err))
196 """Try to receive a message from the socket.
198 In case we already have messages queued, we just return from the
199 queue. Otherwise, we try to read data with a _rwtimeout network
200 timeout, and making sure we don't go over 2x_rwtimeout as a global
205 etime = time.time() + self._rwtimeout
206 while not self._msgs:
207 if time.time() > etime:
208 raise TimeoutError("Extended receive timeout")
211 data = self.socket.recv(4096)
212 except socket.error, err:
213 if err.args and err.args[0] == errno.EAGAIN:
216 except socket.timeout, err:
217 raise TimeoutError("Receive timeout: %s" % str(err))
220 raise ConnectionClosedError("Connection closed while reading")
221 new_msgs = (self._buffer + data).split(self.eom)
222 self._buffer = new_msgs.pop()
223 self._msgs.extend(new_msgs)
224 return self._msgs.popleft()
227 """Send a message and wait for the response.
229 This is just a wrapper over Send and Recv.
236 """Close the socket"""
237 if self.socket is not None:
242 class Client(object):
243 """High-level client implementation.
245 This uses a backing Transport-like class on top of which it
246 implements data serialization/deserialization.
249 def __init__(self, address=None, timeouts=None, transport=Transport):
250 """Constructor for the Client class.
253 - address: a valid address the the used transport class
254 - timeout: a list of timeouts, to be used on connect and read/write
255 - transport: a Transport-like class
258 If timeout is not passed, the default timeouts of the transport
263 address = constants.MASTER_SOCKET
264 self.address = address
265 self.timeouts = timeouts
266 self.transport_class = transport
267 self.transport = None
268 self._InitTransport()
270 def _InitTransport(self):
271 """(Re)initialize the transport if needed.
274 if self.transport is None:
275 self.transport = self.transport_class(self.address,
276 timeouts=self.timeouts)
278 def _CloseTransport(self):
279 """Close the transport, ignoring errors.
282 if self.transport is None:
285 old_transp = self.transport
286 self.transport = None
291 def CallMethod(self, method, args):
292 """Send a generic request and return the response.
301 # Serialize the request
302 send_data = serializer.DumpJson(request, indent=False)
304 # Send request and wait for response
306 self._InitTransport()
307 result = self.transport.Call(send_data)
309 self._CloseTransport()
314 data = serializer.LoadJson(result)
315 except Exception, err:
316 raise ProtocolError("Error while deserializing response: %s" % str(err))
319 if (not isinstance(data, dict) or
320 KEY_SUCCESS not in data or
321 KEY_RESULT not in data):
322 raise DecodingError("Invalid response from server: %s" % str(data))
324 result = data[KEY_RESULT]
326 if not data[KEY_SUCCESS]:
327 errors.MaybeRaise(result)
328 raise RequestError(result)
332 def SetQueueDrainFlag(self, drain_flag):
333 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
335 def SubmitJob(self, ops):
336 ops_state = map(lambda op: op.__getstate__(), ops)
337 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
339 def SubmitManyJobs(self, jobs):
342 jobs_state.append([op.__getstate__() for op in ops])
343 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
345 def CancelJob(self, job_id):
346 return self.CallMethod(REQ_CANCEL_JOB, job_id)
348 def ArchiveJob(self, job_id):
349 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
351 def AutoArchiveJobs(self, age):
352 timeout = (DEF_RWTO - 1) / 2
353 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
355 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
356 timeout = (DEF_RWTO - 1) / 2
358 result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
359 (job_id, fields, prev_job_info,
360 prev_log_serial, timeout))
361 if result != constants.JOB_NOTCHANGED:
365 def QueryJobs(self, job_ids, fields):
366 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
368 def QueryInstances(self, names, fields, use_locking):
369 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
371 def QueryNodes(self, names, fields, use_locking):
372 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
374 def QueryExports(self, nodes, use_locking):
375 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
377 def QueryClusterInfo(self):
378 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
380 def QueryConfigValues(self, fields):
381 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
384 # TODO: class Server(object)