Fix timeout handling in LUXI client
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36 import logging
37
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
42
43
44 KEY_METHOD = "method"
45 KEY_ARGS = "args"
46 KEY_SUCCESS = "success"
47 KEY_RESULT = "result"
48 KEY_VERSION = "version"
49
50 REQ_SUBMIT_JOB = "SubmitJob"
51 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
52 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
53 REQ_CANCEL_JOB = "CancelJob"
54 REQ_ARCHIVE_JOB = "ArchiveJob"
55 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
56 REQ_QUERY_JOBS = "QueryJobs"
57 REQ_QUERY_INSTANCES = "QueryInstances"
58 REQ_QUERY_NODES = "QueryNodes"
59 REQ_QUERY_EXPORTS = "QueryExports"
60 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
61 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
62 REQ_QUERY_TAGS = "QueryTags"
63 REQ_QUERY_LOCKS = "QueryLocks"
64 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
65 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
66
67 DEF_CTMO = 10
68 DEF_RWTO = 60
69
70 # WaitForJobChange timeout
71 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
72
73
74 class ProtocolError(errors.LuxiError):
75   """Denotes an error in the LUXI protocol."""
76
77
78 class ConnectionClosedError(ProtocolError):
79   """Connection closed error."""
80
81
82 class TimeoutError(ProtocolError):
83   """Operation timeout error."""
84
85
86 class RequestError(ProtocolError):
87   """Error on request.
88
89   This signifies an error in the request format or request handling,
90   but not (e.g.) an error in starting up an instance.
91
92   Some common conditions that can trigger this exception:
93     - job submission failed because the job data was wrong
94     - query failed because required fields were missing
95
96   """
97
98
99 class NoMasterError(ProtocolError):
100   """The master cannot be reached.
101
102   This means that the master daemon is not running or the socket has
103   been removed.
104
105   """
106
107
108 class PermissionError(ProtocolError):
109   """Permission denied while connecting to the master socket.
110
111   This means the user doesn't have the proper rights.
112
113   """
114
115
116 class Transport:
117   """Low-level transport class.
118
119   This is used on the client side.
120
121   This could be replace by any other class that provides the same
122   semantics to the Client. This means:
123     - can send messages and receive messages
124     - safe for multithreading
125
126   """
127
128   def __init__(self, address, timeouts=None):
129     """Constructor for the Client class.
130
131     Arguments:
132       - address: a valid address the the used transport class
133       - timeout: a list of timeouts, to be used on connect and read/write
134
135     There are two timeouts used since we might want to wait for a long
136     time for a response, but the connect timeout should be lower.
137
138     If not passed, we use a default of 10 and respectively 60 seconds.
139
140     Note that on reading data, since the timeout applies to an
141     invidual receive, it might be that the total duration is longer
142     than timeout value passed (we make a hard limit at twice the read
143     timeout).
144
145     """
146     self.address = address
147     if timeouts is None:
148       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
149     else:
150       self._ctimeout, self._rwtimeout = timeouts
151
152     self.socket = None
153     self._buffer = ""
154     self._msgs = collections.deque()
155
156     try:
157       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
158
159       # Try to connect
160       try:
161         utils.Retry(self._Connect, 1.0, self._ctimeout,
162                     args=(self.socket, address, self._ctimeout))
163       except utils.RetryTimeout:
164         raise TimeoutError("Connect timed out")
165
166       self.socket.settimeout(self._rwtimeout)
167     except (socket.error, NoMasterError):
168       if self.socket is not None:
169         self.socket.close()
170       self.socket = None
171       raise
172
173   @staticmethod
174   def _Connect(sock, address, timeout):
175     sock.settimeout(timeout)
176     try:
177       sock.connect(address)
178     except socket.timeout, err:
179       raise TimeoutError("Connect timed out: %s" % str(err))
180     except socket.error, err:
181       error_code = err.args[0]
182       if error_code in (errno.ENOENT, errno.ECONNREFUSED):
183         raise NoMasterError(address)
184       elif error_code in (errno.EPERM, errno.EACCES):
185         raise PermissionError(address)
186       elif error_code == errno.EAGAIN:
187         # Server's socket backlog is full at the moment
188         raise utils.RetryAgain()
189       raise
190
191   def _CheckSocket(self):
192     """Make sure we are connected.
193
194     """
195     if self.socket is None:
196       raise ProtocolError("Connection is closed")
197
198   def Send(self, msg):
199     """Send a message.
200
201     This just sends a message and doesn't wait for the response.
202
203     """
204     if constants.LUXI_EOM in msg:
205       raise ProtocolError("Message terminator found in payload")
206
207     self._CheckSocket()
208     try:
209       # TODO: sendall is not guaranteed to send everything
210       self.socket.sendall(msg + constants.LUXI_EOM)
211     except socket.timeout, err:
212       raise TimeoutError("Sending timeout: %s" % str(err))
213
214   def Recv(self):
215     """Try to receive a message from the socket.
216
217     In case we already have messages queued, we just return from the
218     queue. Otherwise, we try to read data with a _rwtimeout network
219     timeout, and making sure we don't go over 2x_rwtimeout as a global
220     limit.
221
222     """
223     self._CheckSocket()
224     etime = time.time() + self._rwtimeout
225     while not self._msgs:
226       if time.time() > etime:
227         raise TimeoutError("Extended receive timeout")
228       while True:
229         try:
230           data = self.socket.recv(4096)
231         except socket.timeout, err:
232           raise TimeoutError("Receive timeout: %s" % str(err))
233         except socket.error, err:
234           if err.args and err.args[0] == errno.EAGAIN:
235             continue
236           raise
237         break
238       if not data:
239         raise ConnectionClosedError("Connection closed while reading")
240       new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
241       self._buffer = new_msgs.pop()
242       self._msgs.extend(new_msgs)
243     return self._msgs.popleft()
244
245   def Call(self, msg):
246     """Send a message and wait for the response.
247
248     This is just a wrapper over Send and Recv.
249
250     """
251     self.Send(msg)
252     return self.Recv()
253
254   def Close(self):
255     """Close the socket"""
256     if self.socket is not None:
257       self.socket.close()
258       self.socket = None
259
260
261 def ParseRequest(msg):
262   """Parses a LUXI request message.
263
264   """
265   try:
266     request = serializer.LoadJson(msg)
267   except ValueError, err:
268     raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
269
270   logging.debug("LUXI request: %s", request)
271
272   if not isinstance(request, dict):
273     logging.error("LUXI request not a dict: %r", msg)
274     raise ProtocolError("Invalid LUXI request (not a dict)")
275
276   method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
277   args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
278   version = request.get(KEY_VERSION, None) # pylint: disable-msg=E1103
279
280   if method is None or args is None:
281     logging.error("LUXI request missing method or arguments: %r", msg)
282     raise ProtocolError(("Invalid LUXI request (no method or arguments"
283                          " in request): %r") % msg)
284
285   return (method, args, version)
286
287
288 def ParseResponse(msg):
289   """Parses a LUXI response message.
290
291   """
292   # Parse the result
293   try:
294     data = serializer.LoadJson(msg)
295   except Exception, err:
296     raise ProtocolError("Error while deserializing response: %s" % str(err))
297
298   # Validate response
299   if not (isinstance(data, dict) and
300           KEY_SUCCESS in data and
301           KEY_RESULT in data):
302     raise ProtocolError("Invalid response from server: %r" % data)
303
304   return (data[KEY_SUCCESS], data[KEY_RESULT],
305           data.get(KEY_VERSION, None)) # pylint: disable-msg=E1103
306
307
308 def FormatResponse(success, result, version=None):
309   """Formats a LUXI response message.
310
311   """
312   response = {
313     KEY_SUCCESS: success,
314     KEY_RESULT: result,
315     }
316
317   if version is not None:
318     response[KEY_VERSION] = version
319
320   logging.debug("LUXI response: %s", response)
321
322   return serializer.DumpJson(response)
323
324
325 def FormatRequest(method, args, version=None):
326   """Formats a LUXI request message.
327
328   """
329   # Build request
330   request = {
331     KEY_METHOD: method,
332     KEY_ARGS: args,
333     }
334
335   if version is not None:
336     request[KEY_VERSION] = version
337
338   # Serialize the request
339   return serializer.DumpJson(request, indent=False)
340
341
342 def CallLuxiMethod(transport_cb, method, args, version=None):
343   """Send a LUXI request via a transport and return the response.
344
345   """
346   assert callable(transport_cb)
347
348   request_msg = FormatRequest(method, args, version=version)
349
350   # Send request and wait for response
351   response_msg = transport_cb(request_msg)
352
353   (success, result, resp_version) = ParseResponse(response_msg)
354
355   # Verify version if there was one in the response
356   if resp_version is not None and resp_version != version:
357     raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
358                            (version, resp_version))
359
360   if success:
361     return result
362
363   errors.MaybeRaise(result)
364   raise RequestError(result)
365
366
367 class Client(object):
368   """High-level client implementation.
369
370   This uses a backing Transport-like class on top of which it
371   implements data serialization/deserialization.
372
373   """
374   def __init__(self, address=None, timeouts=None, transport=Transport):
375     """Constructor for the Client class.
376
377     Arguments:
378       - address: a valid address the the used transport class
379       - timeout: a list of timeouts, to be used on connect and read/write
380       - transport: a Transport-like class
381
382
383     If timeout is not passed, the default timeouts of the transport
384     class are used.
385
386     """
387     if address is None:
388       address = constants.MASTER_SOCKET
389     self.address = address
390     self.timeouts = timeouts
391     self.transport_class = transport
392     self.transport = None
393     self._InitTransport()
394
395   def _InitTransport(self):
396     """(Re)initialize the transport if needed.
397
398     """
399     if self.transport is None:
400       self.transport = self.transport_class(self.address,
401                                             timeouts=self.timeouts)
402
403   def _CloseTransport(self):
404     """Close the transport, ignoring errors.
405
406     """
407     if self.transport is None:
408       return
409     try:
410       old_transp = self.transport
411       self.transport = None
412       old_transp.Close()
413     except Exception: # pylint: disable-msg=W0703
414       pass
415
416   def _SendMethodCall(self, data):
417     # Send request and wait for response
418     try:
419       self._InitTransport()
420       return self.transport.Call(data)
421     except Exception:
422       self._CloseTransport()
423       raise
424
425   def CallMethod(self, method, args):
426     """Send a generic request and return the response.
427
428     """
429     return CallLuxiMethod(self._SendMethodCall, method, args,
430                           version=constants.LUXI_VERSION)
431
432   def SetQueueDrainFlag(self, drain_flag):
433     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
434
435   def SetWatcherPause(self, until):
436     return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
437
438   def SubmitJob(self, ops):
439     ops_state = map(lambda op: op.__getstate__(), ops)
440     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
441
442   def SubmitManyJobs(self, jobs):
443     jobs_state = []
444     for ops in jobs:
445       jobs_state.append([op.__getstate__() for op in ops])
446     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
447
448   def CancelJob(self, job_id):
449     return self.CallMethod(REQ_CANCEL_JOB, job_id)
450
451   def ArchiveJob(self, job_id):
452     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
453
454   def AutoArchiveJobs(self, age):
455     timeout = (DEF_RWTO - 1) / 2
456     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
457
458   def WaitForJobChangeOnce(self, job_id, fields,
459                            prev_job_info, prev_log_serial,
460                            timeout=WFJC_TIMEOUT):
461     """Waits for changes on a job.
462
463     @param job_id: Job ID
464     @type fields: list
465     @param fields: List of field names to be observed
466     @type prev_job_info: None or list
467     @param prev_job_info: Previously received job information
468     @type prev_log_serial: None or int/long
469     @param prev_log_serial: Highest log serial number previously received
470     @type timeout: int/float
471     @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
472                     be capped to that value)
473
474     """
475     assert timeout >= 0, "Timeout can not be negative"
476     return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
477                            (job_id, fields, prev_job_info,
478                             prev_log_serial,
479                             min(WFJC_TIMEOUT, timeout)))
480
481   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
482     while True:
483       result = self.WaitForJobChangeOnce(job_id, fields,
484                                          prev_job_info, prev_log_serial)
485       if result != constants.JOB_NOTCHANGED:
486         break
487     return result
488
489   def QueryJobs(self, job_ids, fields):
490     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
491
492   def QueryInstances(self, names, fields, use_locking):
493     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
494
495   def QueryNodes(self, names, fields, use_locking):
496     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
497
498   def QueryExports(self, nodes, use_locking):
499     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
500
501   def QueryClusterInfo(self):
502     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
503
504   def QueryConfigValues(self, fields):
505     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
506
507   def QueryTags(self, kind, name):
508     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
509
510   def QueryLocks(self, fields, sync):
511     return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))