4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
40 from ganeti import utils
45 KEY_SUCCESS = "success"
48 REQ_SUBMIT_JOB = "SubmitJob"
49 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
50 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
51 REQ_CANCEL_JOB = "CancelJob"
52 REQ_ARCHIVE_JOB = "ArchiveJob"
53 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
54 REQ_QUERY_JOBS = "QueryJobs"
55 REQ_QUERY_INSTANCES = "QueryInstances"
56 REQ_QUERY_NODES = "QueryNodes"
57 REQ_QUERY_EXPORTS = "QueryExports"
58 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
59 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
60 REQ_QUERY_TAGS = "QueryTags"
61 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
62 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
68 class ProtocolError(Exception):
69 """Denotes an error in the server communication"""
72 class ConnectionClosedError(ProtocolError):
73 """Connection closed error"""
76 class TimeoutError(ProtocolError):
77 """Operation timeout error"""
80 class EncodingError(ProtocolError):
81 """Encoding failure on the sending side"""
84 class DecodingError(ProtocolError):
85 """Decoding failure on the receiving side"""
88 class RequestError(ProtocolError):
91 This signifies an error in the request format or request handling,
92 but not (e.g.) an error in starting up an instance.
94 Some common conditions that can trigger this exception:
95 - job submission failed because the job data was wrong
96 - query failed because required fields were missing
101 class NoMasterError(ProtocolError):
102 """The master cannot be reached
104 This means that the master daemon is not running or the socket has
111 """Low-level transport class.
113 This is used on the client side.
115 This could be replace by any other class that provides the same
116 semantics to the Client. This means:
117 - can send messages and receive messages
118 - safe for multithreading
122 def __init__(self, address, timeouts=None, eom=None):
123 """Constructor for the Client class.
126 - address: a valid address the the used transport class
127 - timeout: a list of timeouts, to be used on connect and read/write
128 - eom: an identifier to be used as end-of-message which the
129 upper-layer will guarantee that this identifier will not appear
132 There are two timeouts used since we might want to wait for a long
133 time for a response, but the connect timeout should be lower.
135 If not passed, we use a default of 10 and respectively 60 seconds.
137 Note that on reading data, since the timeout applies to an
138 invidual receive, it might be that the total duration is longer
139 than timeout value passed (we make a hard limit at twice the read
143 self.address = address
145 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
147 self._ctimeout, self._rwtimeout = timeouts
151 self._msgs = collections.deque()
159 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
163 utils.Retry(self._Connect, 1.0, self._ctimeout,
164 args=(self.socket, address, self._ctimeout))
165 except utils.RetryTimeout:
166 raise TimeoutError("Connect timed out")
168 self.socket.settimeout(self._rwtimeout)
169 except (socket.error, NoMasterError):
170 if self.socket is not None:
176 def _Connect(sock, address, timeout):
177 sock.settimeout(timeout)
179 sock.connect(address)
180 except socket.timeout, err:
181 raise TimeoutError("Connect timed out: %s" % str(err))
182 except socket.error, err:
183 if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
184 raise NoMasterError(address)
185 if err.args[0] == errno.EAGAIN:
186 # Server's socket backlog is full at the moment
187 raise utils.RetryAgain()
190 def _CheckSocket(self):
191 """Make sure we are connected.
194 if self.socket is None:
195 raise ProtocolError("Connection is closed")
200 This just sends a message and doesn't wait for the response.
204 raise EncodingError("Message terminator found in payload")
207 # TODO: sendall is not guaranteed to send everything
208 self.socket.sendall(msg + self.eom)
209 except socket.timeout, err:
210 raise TimeoutError("Sending timeout: %s" % str(err))
213 """Try to receive a message from the socket.
215 In case we already have messages queued, we just return from the
216 queue. Otherwise, we try to read data with a _rwtimeout network
217 timeout, and making sure we don't go over 2x_rwtimeout as a global
222 etime = time.time() + self._rwtimeout
223 while not self._msgs:
224 if time.time() > etime:
225 raise TimeoutError("Extended receive timeout")
228 data = self.socket.recv(4096)
229 except socket.error, err:
230 if err.args and err.args[0] == errno.EAGAIN:
233 except socket.timeout, err:
234 raise TimeoutError("Receive timeout: %s" % str(err))
237 raise ConnectionClosedError("Connection closed while reading")
238 new_msgs = (self._buffer + data).split(self.eom)
239 self._buffer = new_msgs.pop()
240 self._msgs.extend(new_msgs)
241 return self._msgs.popleft()
244 """Send a message and wait for the response.
246 This is just a wrapper over Send and Recv.
253 """Close the socket"""
254 if self.socket is not None:
259 class Client(object):
260 """High-level client implementation.
262 This uses a backing Transport-like class on top of which it
263 implements data serialization/deserialization.
266 def __init__(self, address=None, timeouts=None, transport=Transport):
267 """Constructor for the Client class.
270 - address: a valid address the the used transport class
271 - timeout: a list of timeouts, to be used on connect and read/write
272 - transport: a Transport-like class
275 If timeout is not passed, the default timeouts of the transport
280 address = constants.MASTER_SOCKET
281 self.address = address
282 self.timeouts = timeouts
283 self.transport_class = transport
284 self.transport = None
285 self._InitTransport()
287 def _InitTransport(self):
288 """(Re)initialize the transport if needed.
291 if self.transport is None:
292 self.transport = self.transport_class(self.address,
293 timeouts=self.timeouts)
295 def _CloseTransport(self):
296 """Close the transport, ignoring errors.
299 if self.transport is None:
302 old_transp = self.transport
303 self.transport = None
305 except Exception: # pylint: disable-msg=W0703
308 def CallMethod(self, method, args):
309 """Send a generic request and return the response.
318 # Serialize the request
319 send_data = serializer.DumpJson(request, indent=False)
321 # Send request and wait for response
323 self._InitTransport()
324 result = self.transport.Call(send_data)
326 self._CloseTransport()
331 data = serializer.LoadJson(result)
332 except Exception, err:
333 raise ProtocolError("Error while deserializing response: %s" % str(err))
336 if (not isinstance(data, dict) or
337 KEY_SUCCESS not in data or
338 KEY_RESULT not in data):
339 raise DecodingError("Invalid response from server: %s" % str(data))
341 result = data[KEY_RESULT]
343 if not data[KEY_SUCCESS]:
344 errors.MaybeRaise(result)
345 raise RequestError(result)
349 def SetQueueDrainFlag(self, drain_flag):
350 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
352 def SetWatcherPause(self, until):
353 return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
355 def SubmitJob(self, ops):
356 ops_state = map(lambda op: op.__getstate__(), ops)
357 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
359 def SubmitManyJobs(self, jobs):
362 jobs_state.append([op.__getstate__() for op in ops])
363 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
365 def CancelJob(self, job_id):
366 return self.CallMethod(REQ_CANCEL_JOB, job_id)
368 def ArchiveJob(self, job_id):
369 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
371 def AutoArchiveJobs(self, age):
372 timeout = (DEF_RWTO - 1) / 2
373 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
375 def WaitForJobChangeOnce(self, job_id, fields,
376 prev_job_info, prev_log_serial):
377 timeout = (DEF_RWTO - 1) / 2
378 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
379 (job_id, fields, prev_job_info,
380 prev_log_serial, timeout))
382 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
384 result = self.WaitForJobChangeOnce(job_id, fields,
385 prev_job_info, prev_log_serial)
386 if result != constants.JOB_NOTCHANGED:
390 def QueryJobs(self, job_ids, fields):
391 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
393 def QueryInstances(self, names, fields, use_locking):
394 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
396 def QueryNodes(self, names, fields, use_locking):
397 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
399 def QueryExports(self, nodes, use_locking):
400 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
402 def QueryClusterInfo(self):
403 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
405 def QueryConfigValues(self, fields):
406 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
408 def QueryTags(self, kind, name):
409 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))