4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
44 KEY_SUCCESS = "success"
47 REQ_SUBMIT_JOB = "SubmitJob"
48 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
49 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
50 REQ_CANCEL_JOB = "CancelJob"
51 REQ_ARCHIVE_JOB = "ArchiveJob"
52 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
53 REQ_QUERY_JOBS = "QueryJobs"
54 REQ_QUERY_INSTANCES = "QueryInstances"
55 REQ_QUERY_NODES = "QueryNodes"
56 REQ_QUERY_EXPORTS = "QueryExports"
57 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
58 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
59 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
60 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
66 class ProtocolError(errors.GenericError):
67 """Denotes an error in the LUXI protocol"""
70 class ConnectionClosedError(ProtocolError):
71 """Connection closed error"""
74 class TimeoutError(ProtocolError):
75 """Operation timeout error"""
78 class RequestError(ProtocolError):
81 This signifies an error in the request format or request handling,
82 but not (e.g.) an error in starting up an instance.
84 Some common conditions that can trigger this exception:
85 - job submission failed because the job data was wrong
86 - query failed because required fields were missing
91 class NoMasterError(ProtocolError):
92 """The master cannot be reached
94 This means that the master daemon is not running or the socket has
101 """Low-level transport class.
103 This is used on the client side.
105 This could be replace by any other class that provides the same
106 semantics to the Client. This means:
107 - can send messages and receive messages
108 - safe for multithreading
112 def __init__(self, address, timeouts=None, eom=None):
113 """Constructor for the Client class.
116 - address: a valid address the the used transport class
117 - timeout: a list of timeouts, to be used on connect and read/write
118 - eom: an identifier to be used as end-of-message which the
119 upper-layer will guarantee that this identifier will not appear
122 There are two timeouts used since we might want to wait for a long
123 time for a response, but the connect timeout should be lower.
125 If not passed, we use a default of 10 and respectively 60 seconds.
127 Note that on reading data, since the timeout applies to an
128 invidual receive, it might be that the total duration is longer
129 than timeout value passed (we make a hard limit at twice the read
133 self.address = address
135 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
137 self._ctimeout, self._rwtimeout = timeouts
141 self._msgs = collections.deque()
149 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
150 self.socket.settimeout(self._ctimeout)
152 self.socket.connect(address)
153 except socket.timeout, err:
154 raise TimeoutError("Connect timed out: %s" % str(err))
155 except socket.error, err:
156 if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
157 raise NoMasterError(address)
159 self.socket.settimeout(self._rwtimeout)
160 except (socket.error, NoMasterError):
161 if self.socket is not None:
166 def _CheckSocket(self):
167 """Make sure we are connected.
170 if self.socket is None:
171 raise ProtocolError("Connection is closed")
176 This just sends a message and doesn't wait for the response.
180 raise ProtocolError("Message terminator found in payload")
184 # TODO: sendall is not guaranteed to send everything
185 self.socket.sendall(msg + self.eom)
186 except socket.timeout, err:
187 raise TimeoutError("Sending timeout: %s" % str(err))
190 """Try to receive a message from the socket.
192 In case we already have messages queued, we just return from the
193 queue. Otherwise, we try to read data with a _rwtimeout network
194 timeout, and making sure we don't go over 2x_rwtimeout as a global
199 etime = time.time() + self._rwtimeout
200 while not self._msgs:
201 if time.time() > etime:
202 raise TimeoutError("Extended receive timeout")
205 data = self.socket.recv(4096)
206 except socket.error, err:
207 if err.args and err.args[0] == errno.EAGAIN:
210 except socket.timeout, err:
211 raise TimeoutError("Receive timeout: %s" % str(err))
214 raise ConnectionClosedError("Connection closed while reading")
215 new_msgs = (self._buffer + data).split(self.eom)
216 self._buffer = new_msgs.pop()
217 self._msgs.extend(new_msgs)
218 return self._msgs.popleft()
221 """Send a message and wait for the response.
223 This is just a wrapper over Send and Recv.
230 """Close the socket"""
231 if self.socket is not None:
236 class Client(object):
237 """High-level client implementation.
239 This uses a backing Transport-like class on top of which it
240 implements data serialization/deserialization.
243 def __init__(self, address=None, timeouts=None, transport=Transport):
244 """Constructor for the Client class.
247 - address: a valid address the the used transport class
248 - timeout: a list of timeouts, to be used on connect and read/write
249 - transport: a Transport-like class
252 If timeout is not passed, the default timeouts of the transport
257 address = constants.MASTER_SOCKET
258 self.address = address
259 self.timeouts = timeouts
260 self.transport_class = transport
261 self.transport = None
262 self._InitTransport()
264 def _InitTransport(self):
265 """(Re)initialize the transport if needed.
268 if self.transport is None:
269 self.transport = self.transport_class(self.address,
270 timeouts=self.timeouts)
272 def _CloseTransport(self):
273 """Close the transport, ignoring errors.
276 if self.transport is None:
279 old_transp = self.transport
280 self.transport = None
282 except Exception: # pylint: disable-msg=W0703
285 def CallMethod(self, method, args):
286 """Send a generic request and return the response.
295 # Serialize the request
296 send_data = serializer.DumpJson(request, indent=False)
298 # Send request and wait for response
300 self._InitTransport()
301 result = self.transport.Call(send_data)
303 self._CloseTransport()
308 data = serializer.LoadJson(result)
309 except Exception, err:
310 raise ProtocolError("Error while deserializing response: %s" % str(err))
313 if (not isinstance(data, dict) or
314 KEY_SUCCESS not in data or
315 KEY_RESULT not in data):
316 raise ProtocolError("Invalid response from server: %s" % str(data))
318 result = data[KEY_RESULT]
320 if not data[KEY_SUCCESS]:
321 errors.MaybeRaise(result)
322 raise RequestError(result)
326 def SetQueueDrainFlag(self, drain_flag):
327 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
329 def SetWatcherPause(self, until):
330 return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
332 def SubmitJob(self, ops):
333 ops_state = map(lambda op: op.__getstate__(), ops)
334 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
336 def SubmitManyJobs(self, jobs):
339 jobs_state.append([op.__getstate__() for op in ops])
340 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
342 def CancelJob(self, job_id):
343 return self.CallMethod(REQ_CANCEL_JOB, job_id)
345 def ArchiveJob(self, job_id):
346 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
348 def AutoArchiveJobs(self, age):
349 timeout = (DEF_RWTO - 1) / 2
350 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
352 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
353 timeout = (DEF_RWTO - 1) / 2
355 result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
356 (job_id, fields, prev_job_info,
357 prev_log_serial, timeout))
358 if result != constants.JOB_NOTCHANGED:
362 def QueryJobs(self, job_ids, fields):
363 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
365 def QueryInstances(self, names, fields, use_locking):
366 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
368 def QueryNodes(self, names, fields, use_locking):
369 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
371 def QueryExports(self, nodes, use_locking):
372 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
374 def QueryClusterInfo(self):
375 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
377 def QueryConfigValues(self, fields):
378 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
381 # TODO: class Server(object)