4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
44 KEY_SUCCESS = "success"
47 REQ_SUBMIT_JOB = "SubmitJob"
48 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
49 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
50 REQ_CANCEL_JOB = "CancelJob"
51 REQ_ARCHIVE_JOB = "ArchiveJob"
52 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
53 REQ_QUERY_JOBS = "QueryJobs"
54 REQ_QUERY_INSTANCES = "QueryInstances"
55 REQ_QUERY_NODES = "QueryNodes"
56 REQ_QUERY_EXPORTS = "QueryExports"
57 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
58 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
59 REQ_QUERY_TAGS = "QueryTags"
60 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
61 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
67 class ProtocolError(Exception):
68 """Denotes an error in the server communication"""
71 class ConnectionClosedError(ProtocolError):
72 """Connection closed error"""
75 class TimeoutError(ProtocolError):
76 """Operation timeout error"""
79 class EncodingError(ProtocolError):
80 """Encoding failure on the sending side"""
83 class DecodingError(ProtocolError):
84 """Decoding failure on the receiving side"""
87 class RequestError(ProtocolError):
90 This signifies an error in the request format or request handling,
91 but not (e.g.) an error in starting up an instance.
93 Some common conditions that can trigger this exception:
94 - job submission failed because the job data was wrong
95 - query failed because required fields were missing
100 class NoMasterError(ProtocolError):
101 """The master cannot be reached
103 This means that the master daemon is not running or the socket has
110 """Low-level transport class.
112 This is used on the client side.
114 This could be replace by any other class that provides the same
115 semantics to the Client. This means:
116 - can send messages and receive messages
117 - safe for multithreading
121 def __init__(self, address, timeouts=None, eom=None):
122 """Constructor for the Client class.
125 - address: a valid address the the used transport class
126 - timeout: a list of timeouts, to be used on connect and read/write
127 - eom: an identifier to be used as end-of-message which the
128 upper-layer will guarantee that this identifier will not appear
131 There are two timeouts used since we might want to wait for a long
132 time for a response, but the connect timeout should be lower.
134 If not passed, we use a default of 10 and respectively 60 seconds.
136 Note that on reading data, since the timeout applies to an
137 invidual receive, it might be that the total duration is longer
138 than timeout value passed (we make a hard limit at twice the read
142 self.address = address
144 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
146 self._ctimeout, self._rwtimeout = timeouts
150 self._msgs = collections.deque()
158 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
159 self.socket.settimeout(self._ctimeout)
161 self.socket.connect(address)
162 except socket.timeout, err:
163 raise TimeoutError("Connect timed out: %s" % str(err))
164 except socket.error, err:
165 if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
166 raise NoMasterError(address)
168 self.socket.settimeout(self._rwtimeout)
169 except (socket.error, NoMasterError):
170 if self.socket is not None:
175 def _CheckSocket(self):
176 """Make sure we are connected.
179 if self.socket is None:
180 raise ProtocolError("Connection is closed")
185 This just sends a message and doesn't wait for the response.
189 raise EncodingError("Message terminator found in payload")
192 # TODO: sendall is not guaranteed to send everything
193 self.socket.sendall(msg + self.eom)
194 except socket.timeout, err:
195 raise TimeoutError("Sending timeout: %s" % str(err))
198 """Try to receive a message from the socket.
200 In case we already have messages queued, we just return from the
201 queue. Otherwise, we try to read data with a _rwtimeout network
202 timeout, and making sure we don't go over 2x_rwtimeout as a global
207 etime = time.time() + self._rwtimeout
208 while not self._msgs:
209 if time.time() > etime:
210 raise TimeoutError("Extended receive timeout")
213 data = self.socket.recv(4096)
214 except socket.error, err:
215 if err.args and err.args[0] == errno.EAGAIN:
218 except socket.timeout, err:
219 raise TimeoutError("Receive timeout: %s" % str(err))
222 raise ConnectionClosedError("Connection closed while reading")
223 new_msgs = (self._buffer + data).split(self.eom)
224 self._buffer = new_msgs.pop()
225 self._msgs.extend(new_msgs)
226 return self._msgs.popleft()
229 """Send a message and wait for the response.
231 This is just a wrapper over Send and Recv.
238 """Close the socket"""
239 if self.socket is not None:
244 class Client(object):
245 """High-level client implementation.
247 This uses a backing Transport-like class on top of which it
248 implements data serialization/deserialization.
251 def __init__(self, address=None, timeouts=None, transport=Transport):
252 """Constructor for the Client class.
255 - address: a valid address the the used transport class
256 - timeout: a list of timeouts, to be used on connect and read/write
257 - transport: a Transport-like class
260 If timeout is not passed, the default timeouts of the transport
265 address = constants.MASTER_SOCKET
266 self.address = address
267 self.timeouts = timeouts
268 self.transport_class = transport
269 self.transport = None
270 self._InitTransport()
272 def _InitTransport(self):
273 """(Re)initialize the transport if needed.
276 if self.transport is None:
277 self.transport = self.transport_class(self.address,
278 timeouts=self.timeouts)
280 def _CloseTransport(self):
281 """Close the transport, ignoring errors.
284 if self.transport is None:
287 old_transp = self.transport
288 self.transport = None
290 except Exception: # pylint: disable-msg=W0703
293 def CallMethod(self, method, args):
294 """Send a generic request and return the response.
303 # Serialize the request
304 send_data = serializer.DumpJson(request, indent=False)
306 # Send request and wait for response
308 self._InitTransport()
309 result = self.transport.Call(send_data)
311 self._CloseTransport()
316 data = serializer.LoadJson(result)
317 except Exception, err:
318 raise ProtocolError("Error while deserializing response: %s" % str(err))
321 if (not isinstance(data, dict) or
322 KEY_SUCCESS not in data or
323 KEY_RESULT not in data):
324 raise DecodingError("Invalid response from server: %s" % str(data))
326 result = data[KEY_RESULT]
328 if not data[KEY_SUCCESS]:
329 errors.MaybeRaise(result)
330 raise RequestError(result)
334 def SetQueueDrainFlag(self, drain_flag):
335 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
337 def SetWatcherPause(self, until):
338 return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
340 def SubmitJob(self, ops):
341 ops_state = map(lambda op: op.__getstate__(), ops)
342 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
344 def SubmitManyJobs(self, jobs):
347 jobs_state.append([op.__getstate__() for op in ops])
348 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
350 def CancelJob(self, job_id):
351 return self.CallMethod(REQ_CANCEL_JOB, job_id)
353 def ArchiveJob(self, job_id):
354 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
356 def AutoArchiveJobs(self, age):
357 timeout = (DEF_RWTO - 1) / 2
358 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
360 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
361 timeout = (DEF_RWTO - 1) / 2
363 result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
364 (job_id, fields, prev_job_info,
365 prev_log_serial, timeout))
366 if result != constants.JOB_NOTCHANGED:
370 def QueryJobs(self, job_ids, fields):
371 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
373 def QueryInstances(self, names, fields, use_locking):
374 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
376 def QueryNodes(self, names, fields, use_locking):
377 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
379 def QueryExports(self, nodes, use_locking):
380 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
382 def QueryClusterInfo(self):
383 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
385 def QueryConfigValues(self, fields):
386 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
388 def QueryTags(self, kind, name):
389 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
392 # TODO: class Server(object)