4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Module for the unix socket protocol
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
28 The module is also used by the master daemon.
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
40 from ganeti import utils
45 KEY_SUCCESS = "success"
48 REQ_SUBMIT_JOB = "SubmitJob"
49 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
50 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
51 REQ_CANCEL_JOB = "CancelJob"
52 REQ_ARCHIVE_JOB = "ArchiveJob"
53 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
54 REQ_QUERY_JOBS = "QueryJobs"
55 REQ_QUERY_INSTANCES = "QueryInstances"
56 REQ_QUERY_NODES = "QueryNodes"
57 REQ_QUERY_EXPORTS = "QueryExports"
58 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
59 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
60 REQ_QUERY_TAGS = "QueryTags"
61 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
62 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
67 # WaitForJobChange timeout
68 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
71 class ProtocolError(Exception):
72 """Denotes an error in the server communication"""
75 class ConnectionClosedError(ProtocolError):
76 """Connection closed error"""
79 class TimeoutError(ProtocolError):
80 """Operation timeout error"""
83 class EncodingError(ProtocolError):
84 """Encoding failure on the sending side"""
87 class DecodingError(ProtocolError):
88 """Decoding failure on the receiving side"""
91 class RequestError(ProtocolError):
94 This signifies an error in the request format or request handling,
95 but not (e.g.) an error in starting up an instance.
97 Some common conditions that can trigger this exception:
98 - job submission failed because the job data was wrong
99 - query failed because required fields were missing
104 class NoMasterError(ProtocolError):
105 """The master cannot be reached
107 This means that the master daemon is not running or the socket has
114 """Low-level transport class.
116 This is used on the client side.
118 This could be replace by any other class that provides the same
119 semantics to the Client. This means:
120 - can send messages and receive messages
121 - safe for multithreading
125 def __init__(self, address, timeouts=None, eom=None):
126 """Constructor for the Client class.
129 - address: a valid address the the used transport class
130 - timeout: a list of timeouts, to be used on connect and read/write
131 - eom: an identifier to be used as end-of-message which the
132 upper-layer will guarantee that this identifier will not appear
135 There are two timeouts used since we might want to wait for a long
136 time for a response, but the connect timeout should be lower.
138 If not passed, we use a default of 10 and respectively 60 seconds.
140 Note that on reading data, since the timeout applies to an
141 invidual receive, it might be that the total duration is longer
142 than timeout value passed (we make a hard limit at twice the read
146 self.address = address
148 self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
150 self._ctimeout, self._rwtimeout = timeouts
154 self._msgs = collections.deque()
162 self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
166 utils.Retry(self._Connect, 1.0, self._ctimeout,
167 args=(self.socket, address, self._ctimeout))
168 except utils.RetryTimeout:
169 raise TimeoutError("Connect timed out")
171 self.socket.settimeout(self._rwtimeout)
172 except (socket.error, NoMasterError):
173 if self.socket is not None:
179 def _Connect(sock, address, timeout):
180 sock.settimeout(timeout)
182 sock.connect(address)
183 except socket.timeout, err:
184 raise TimeoutError("Connect timed out: %s" % str(err))
185 except socket.error, err:
186 if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
187 raise NoMasterError(address)
188 if err.args[0] == errno.EAGAIN:
189 # Server's socket backlog is full at the moment
190 raise utils.RetryAgain()
193 def _CheckSocket(self):
194 """Make sure we are connected.
197 if self.socket is None:
198 raise ProtocolError("Connection is closed")
203 This just sends a message and doesn't wait for the response.
207 raise EncodingError("Message terminator found in payload")
210 # TODO: sendall is not guaranteed to send everything
211 self.socket.sendall(msg + self.eom)
212 except socket.timeout, err:
213 raise TimeoutError("Sending timeout: %s" % str(err))
216 """Try to receive a message from the socket.
218 In case we already have messages queued, we just return from the
219 queue. Otherwise, we try to read data with a _rwtimeout network
220 timeout, and making sure we don't go over 2x_rwtimeout as a global
225 etime = time.time() + self._rwtimeout
226 while not self._msgs:
227 if time.time() > etime:
228 raise TimeoutError("Extended receive timeout")
231 data = self.socket.recv(4096)
232 except socket.error, err:
233 if err.args and err.args[0] == errno.EAGAIN:
236 except socket.timeout, err:
237 raise TimeoutError("Receive timeout: %s" % str(err))
240 raise ConnectionClosedError("Connection closed while reading")
241 new_msgs = (self._buffer + data).split(self.eom)
242 self._buffer = new_msgs.pop()
243 self._msgs.extend(new_msgs)
244 return self._msgs.popleft()
247 """Send a message and wait for the response.
249 This is just a wrapper over Send and Recv.
256 """Close the socket"""
257 if self.socket is not None:
262 class Client(object):
263 """High-level client implementation.
265 This uses a backing Transport-like class on top of which it
266 implements data serialization/deserialization.
269 def __init__(self, address=None, timeouts=None, transport=Transport):
270 """Constructor for the Client class.
273 - address: a valid address the the used transport class
274 - timeout: a list of timeouts, to be used on connect and read/write
275 - transport: a Transport-like class
278 If timeout is not passed, the default timeouts of the transport
283 address = constants.MASTER_SOCKET
284 self.address = address
285 self.timeouts = timeouts
286 self.transport_class = transport
287 self.transport = None
288 self._InitTransport()
290 def _InitTransport(self):
291 """(Re)initialize the transport if needed.
294 if self.transport is None:
295 self.transport = self.transport_class(self.address,
296 timeouts=self.timeouts)
298 def _CloseTransport(self):
299 """Close the transport, ignoring errors.
302 if self.transport is None:
305 old_transp = self.transport
306 self.transport = None
308 except Exception: # pylint: disable-msg=W0703
311 def CallMethod(self, method, args):
312 """Send a generic request and return the response.
321 # Serialize the request
322 send_data = serializer.DumpJson(request, indent=False)
324 # Send request and wait for response
326 self._InitTransport()
327 result = self.transport.Call(send_data)
329 self._CloseTransport()
334 data = serializer.LoadJson(result)
335 except Exception, err:
336 raise ProtocolError("Error while deserializing response: %s" % str(err))
339 if (not isinstance(data, dict) or
340 KEY_SUCCESS not in data or
341 KEY_RESULT not in data):
342 raise DecodingError("Invalid response from server: %s" % str(data))
344 result = data[KEY_RESULT]
346 if not data[KEY_SUCCESS]:
347 errors.MaybeRaise(result)
348 raise RequestError(result)
352 def SetQueueDrainFlag(self, drain_flag):
353 return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
355 def SetWatcherPause(self, until):
356 return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
358 def SubmitJob(self, ops):
359 ops_state = map(lambda op: op.__getstate__(), ops)
360 return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
362 def SubmitManyJobs(self, jobs):
365 jobs_state.append([op.__getstate__() for op in ops])
366 return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
368 def CancelJob(self, job_id):
369 return self.CallMethod(REQ_CANCEL_JOB, job_id)
371 def ArchiveJob(self, job_id):
372 return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
374 def AutoArchiveJobs(self, age):
375 timeout = (DEF_RWTO - 1) / 2
376 return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
378 def WaitForJobChangeOnce(self, job_id, fields,
379 prev_job_info, prev_log_serial,
380 timeout=WFJC_TIMEOUT):
381 """Waits for changes on a job.
383 @param job_id: Job ID
385 @param fields: List of field names to be observed
386 @type prev_job_info: None or list
387 @param prev_job_info: Previously received job information
388 @type prev_log_serial: None or int/long
389 @param prev_log_serial: Highest log serial number previously received
390 @type timeout: int/float
391 @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
392 be capped to that value)
395 assert timeout >= 0, "Timeout can not be negative"
396 return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
397 (job_id, fields, prev_job_info,
399 min(WFJC_TIMEOUT, timeout)))
401 def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
403 result = self.WaitForJobChangeOnce(job_id, fields,
404 prev_job_info, prev_log_serial)
405 if result != constants.JOB_NOTCHANGED:
409 def QueryJobs(self, job_ids, fields):
410 return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
412 def QueryInstances(self, names, fields, use_locking):
413 return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
415 def QueryNodes(self, names, fields, use_locking):
416 return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
418 def QueryExports(self, nodes, use_locking):
419 return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
421 def QueryClusterInfo(self):
422 return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
424 def QueryConfigValues(self, fields):
425 return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
427 def QueryTags(self, kind, name):
428 return self.CallMethod(REQ_QUERY_TAGS, (kind, name))