4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
40 from ganeti import utils
45 KEY_SUCCESS = "success"
48 REQ_SUBMIT_JOB = "SubmitJob"
49 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
50 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
51 REQ_CANCEL_JOB = "CancelJob"
52 REQ_ARCHIVE_JOB = "ArchiveJob"
53 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
54 REQ_QUERY_JOBS = "QueryJobs"
55 REQ_QUERY_INSTANCES = "QueryInstances"
56 REQ_QUERY_NODES = "QueryNodes"
57 REQ_QUERY_EXPORTS = "QueryExports"
58 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
59 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
60 REQ_QUERY_TAGS = "QueryTags"
61 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
62 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
67 # WaitForJobChange timeout
68 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
71 class ProtocolError(Exception):
72 """Denotes an error in the server communication"""
75 class ConnectionClosedError(ProtocolError):
76 """Connection closed error"""
79 class TimeoutError(ProtocolError):
80 """Operation timeout error"""
83 class EncodingError(ProtocolError):
84 """Encoding failure on the sending side"""
87 class DecodingError(ProtocolError):
88 """Decoding failure on the receiving side"""
91 class RequestError(ProtocolError):
94 This signifies an error in the request format or request handling,
95 but not (e.g.) an error in starting up an instance.
97 Some common conditions that can trigger this exception:
98 - job submission failed because the job data was wrong
99 - query failed because required fields were missing
104 class NoMasterError(ProtocolError):
105 """The master cannot be reached
107 This means that the master daemon is not running or the socket has
114 """Low-level transport class.
116 This is used on the client side.
118 This could be replace by any other class that provides the same
119 semantics to the Client. This means:
120 - can send messages and receive messages
121 - safe for multithreading
125 def __init__(self, address, timeouts=None):
126 """Constructor for the Client class.
129 - address: a valid address the the used transport class
130 - timeout: a list of timeouts, to be used on connect and read/write
132 There are two timeouts used since we might want to wait for a long
133 time for a response, but the connect timeout should be lower.
135 If not passed, we use a default of 10 and respectively 60 seconds.
137 Note that on reading data, since the timeout applies to an
138 invidual receive, it might be that the total duration is longer
139 than timeout value passed (we make a hard limit at twice the read
143 self.address = address
145 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
147 self._ctimeout, self._rwtimeout = timeouts
151 self._msgs = collections.deque()
154 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
158 utils.Retry(self._Connect, 1.0, self._ctimeout,
159 args=(self.socket, address, self._ctimeout))
160 except utils.RetryTimeout:
161 raise TimeoutError("Connect timed out")
163 self.socket.settimeout(self._rwtimeout)
164 except (socket.error, NoMasterError):
165 if self.socket is not None:
171 def _Connect(sock, address, timeout):
172 sock.settimeout(timeout)
174 sock.connect(address)
175 except socket.timeout, err:
176 raise TimeoutError("Connect timed out: %s" % str(err))
177 except socket.error, err:
178 if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
179 raise NoMasterError(address)
180 if err.args[0] == errno.EAGAIN:
181 # Server's socket backlog is full at the moment
182 raise utils.RetryAgain()
185 def _CheckSocket(self):
186 """Make sure we are connected.
189 if self.socket is None:
190 raise ProtocolError("Connection is closed")
195 This just sends a message and doesn't wait for the response.
198 if constants.LUXI_EOM in msg:
199 raise EncodingError("Message terminator found in payload")
202 # TODO: sendall is not guaranteed to send everything
203 self.socket.sendall(msg + constants.LUXI_EOM)
204 except socket.timeout, err:
205 raise TimeoutError("Sending timeout: %s" % str(err))
208 """Try to receive a message from the socket.
210 In case we already have messages queued, we just return from the
211 queue. Otherwise, we try to read data with a _rwtimeout network
212 timeout, and making sure we don't go over 2x_rwtimeout as a global
217 etime = time.time() + self._rwtimeout
218 while not self._msgs:
219 if time.time() > etime:
220 raise TimeoutError("Extended receive timeout")
223 data = self.socket.recv(4096)
224 except socket.error, err:
225 if err.args and err.args[0] == errno.EAGAIN:
228 except socket.timeout, err:
229 raise TimeoutError("Receive timeout: %s" % str(err))
232 raise ConnectionClosedError("Connection closed while reading")
233 new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
234 self._buffer = new_msgs.pop()
235 self._msgs.extend(new_msgs)
236 return self._msgs.popleft()
239 """Send a message and wait for the response.
241 This is just a wrapper over Send and Recv.
248 """Close the socket"""
249 if self.socket is not None:
254 class Client(object):
255 """High-level client implementation.
257 This uses a backing Transport-like class on top of which it
258 implements data serialization/deserialization.
261 def __init__(self, address=None, timeouts=None, transport=Transport):
262 """Constructor for the Client class.
265 - address: a valid address the the used transport class
266 - timeout: a list of timeouts, to be used on connect and read/write
267 - transport: a Transport-like class
270 If timeout is not passed, the default timeouts of the transport
275 address = constants.MASTER_SOCKET
276 self.address = address
277 self.timeouts = timeouts
278 self.transport_class = transport
279 self.transport = None
280 self._InitTransport()
282 def _InitTransport(self):
283 """(Re)initialize the transport if needed.
286 if self.transport is None:
287 self.transport = self.transport_class(self.address,
288 timeouts=self.timeouts)
290 def _CloseTransport(self):
291 """Close the transport, ignoring errors.
294 if self.transport is None:
297 old_transp = self.transport
298 self.transport = None
300 except Exception: # pylint: disable-msg=W0703
303 def CallMethod(self, method, args):
304 """Send a generic request and return the response.
313 # Serialize the request
314 send_data = serializer.DumpJson(request, indent=False)
316 # Send request and wait for response
318 self._InitTransport()
319 result = self.transport.Call(send_data)
321 self._CloseTransport()
326 data = serializer.LoadJson(result)
327 except Exception, err:
328 raise ProtocolError("Error while deserializing response: %s" % str(err))
331 if (not isinstance(data, dict) or
332 KEY_SUCCESS not in data or
333 KEY_RESULT not in data):
334 raise DecodingError("Invalid response from server: %s" % str(data))
336 result = data[KEY_RESULT]
338 if not data[KEY_SUCCESS]:
339 errors.MaybeRaise(result)
340 raise RequestError(result)
344 def SetQueueDrainFlag(self, drain_flag):
345 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
347 def SetWatcherPause(self, until):
348 return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
350 def SubmitJob(self, ops):
351 ops_state = map(lambda op: op.__getstate__(), ops)
352 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
354 def SubmitManyJobs(self, jobs):
357 jobs_state.append([op.__getstate__() for op in ops])
358 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
360 def CancelJob(self, job_id):
361 return self.CallMethod(REQ_CANCEL_JOB, job_id)
363 def ArchiveJob(self, job_id):
364 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
366 def AutoArchiveJobs(self, age):
367 timeout = (DEF_RWTO - 1) / 2
368 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
370 def WaitForJobChangeOnce(self, job_id, fields,
371 prev_job_info, prev_log_serial,
372 timeout=WFJC_TIMEOUT):
373 """Waits for changes on a job.
375 @param job_id: Job ID
377 @param fields: List of field names to be observed
378 @type prev_job_info: None or list
379 @param prev_job_info: Previously received job information
380 @type prev_log_serial: None or int/long
381 @param prev_log_serial: Highest log serial number previously received
382 @type timeout: int/float
383 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
384 be capped to that value)
387 assert timeout >= 0, "Timeout can not be negative"
388 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
389 (job_id, fields, prev_job_info,
391 min(WFJC_TIMEOUT, timeout)))
393 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
395 result = self.WaitForJobChangeOnce(job_id, fields,
396 prev_job_info, prev_log_serial)
397 if result != constants.JOB_NOTCHANGED:
401 def QueryJobs(self, job_ids, fields):
402 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
404 def QueryInstances(self, names, fields, use_locking):
405 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
407 def QueryNodes(self, names, fields, use_locking):
408 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
410 def QueryExports(self, nodes, use_locking):
411 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
413 def QueryClusterInfo(self):
414 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
416 def QueryConfigValues(self, fields):
417 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
419 def QueryTags(self, kind, name):
420 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))