Standardise LUXI call argument types
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36 import logging
37 import warnings
38
39 from ganeti import serializer
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import utils
43 from ganeti import objects
44
45
46 KEY_METHOD = "method"
47 KEY_ARGS = "args"
48 KEY_SUCCESS = "success"
49 KEY_RESULT = "result"
50 KEY_VERSION = "version"
51
52 REQ_SUBMIT_JOB = "SubmitJob"
53 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
54 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
55 REQ_CANCEL_JOB = "CancelJob"
56 REQ_ARCHIVE_JOB = "ArchiveJob"
57 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
58 REQ_QUERY = "Query"
59 REQ_QUERY_FIELDS = "QueryFields"
60 REQ_QUERY_JOBS = "QueryJobs"
61 REQ_QUERY_INSTANCES = "QueryInstances"
62 REQ_QUERY_NODES = "QueryNodes"
63 REQ_QUERY_GROUPS = "QueryGroups"
64 REQ_QUERY_EXPORTS = "QueryExports"
65 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
66 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
67 REQ_QUERY_TAGS = "QueryTags"
68 REQ_QUERY_LOCKS = "QueryLocks"
69 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
70 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
71
72 DEF_CTMO = 10
73 DEF_RWTO = 60
74
75 # WaitForJobChange timeout
76 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
77
78
79 class ProtocolError(errors.LuxiError):
80   """Denotes an error in the LUXI protocol."""
81
82
83 class ConnectionClosedError(ProtocolError):
84   """Connection closed error."""
85
86
87 class TimeoutError(ProtocolError):
88   """Operation timeout error."""
89
90
91 class RequestError(ProtocolError):
92   """Error on request.
93
94   This signifies an error in the request format or request handling,
95   but not (e.g.) an error in starting up an instance.
96
97   Some common conditions that can trigger this exception:
98     - job submission failed because the job data was wrong
99     - query failed because required fields were missing
100
101   """
102
103
104 class NoMasterError(ProtocolError):
105   """The master cannot be reached.
106
107   This means that the master daemon is not running or the socket has
108   been removed.
109
110   """
111
112
113 class PermissionError(ProtocolError):
114   """Permission denied while connecting to the master socket.
115
116   This means the user doesn't have the proper rights.
117
118   """
119
120
121 class Transport:
122   """Low-level transport class.
123
124   This is used on the client side.
125
126   This could be replace by any other class that provides the same
127   semantics to the Client. This means:
128     - can send messages and receive messages
129     - safe for multithreading
130
131   """
132
133   def __init__(self, address, timeouts=None):
134     """Constructor for the Client class.
135
136     Arguments:
137       - address: a valid address the the used transport class
138       - timeout: a list of timeouts, to be used on connect and read/write
139
140     There are two timeouts used since we might want to wait for a long
141     time for a response, but the connect timeout should be lower.
142
143     If not passed, we use a default of 10 and respectively 60 seconds.
144
145     Note that on reading data, since the timeout applies to an
146     invidual receive, it might be that the total duration is longer
147     than timeout value passed (we make a hard limit at twice the read
148     timeout).
149
150     """
151     self.address = address
152     if timeouts is None:
153       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
154     else:
155       self._ctimeout, self._rwtimeout = timeouts
156
157     self.socket = None
158     self._buffer = ""
159     self._msgs = collections.deque()
160
161     try:
162       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
163
164       # Try to connect
165       try:
166         utils.Retry(self._Connect, 1.0, self._ctimeout,
167                     args=(self.socket, address, self._ctimeout))
168       except utils.RetryTimeout:
169         raise TimeoutError("Connect timed out")
170
171       self.socket.settimeout(self._rwtimeout)
172     except (socket.error, NoMasterError):
173       if self.socket is not None:
174         self.socket.close()
175       self.socket = None
176       raise
177
178   @staticmethod
179   def _Connect(sock, address, timeout):
180     sock.settimeout(timeout)
181     try:
182       sock.connect(address)
183     except socket.timeout, err:
184       raise TimeoutError("Connect timed out: %s" % str(err))
185     except socket.error, err:
186       error_code = err.args[0]
187       if error_code in (errno.ENOENT, errno.ECONNREFUSED):
188         raise NoMasterError(address)
189       elif error_code in (errno.EPERM, errno.EACCES):
190         raise PermissionError(address)
191       elif error_code == errno.EAGAIN:
192         # Server's socket backlog is full at the moment
193         raise utils.RetryAgain()
194       raise
195
196   def _CheckSocket(self):
197     """Make sure we are connected.
198
199     """
200     if self.socket is None:
201       raise ProtocolError("Connection is closed")
202
203   def Send(self, msg):
204     """Send a message.
205
206     This just sends a message and doesn't wait for the response.
207
208     """
209     if constants.LUXI_EOM in msg:
210       raise ProtocolError("Message terminator found in payload")
211
212     self._CheckSocket()
213     try:
214       # TODO: sendall is not guaranteed to send everything
215       self.socket.sendall(msg + constants.LUXI_EOM)
216     except socket.timeout, err:
217       raise TimeoutError("Sending timeout: %s" % str(err))
218
219   def Recv(self):
220     """Try to receive a message from the socket.
221
222     In case we already have messages queued, we just return from the
223     queue. Otherwise, we try to read data with a _rwtimeout network
224     timeout, and making sure we don't go over 2x_rwtimeout as a global
225     limit.
226
227     """
228     self._CheckSocket()
229     etime = time.time() + self._rwtimeout
230     while not self._msgs:
231       if time.time() > etime:
232         raise TimeoutError("Extended receive timeout")
233       while True:
234         try:
235           data = self.socket.recv(4096)
236         except socket.timeout, err:
237           raise TimeoutError("Receive timeout: %s" % str(err))
238         except socket.error, err:
239           if err.args and err.args[0] == errno.EAGAIN:
240             continue
241           raise
242         break
243       if not data:
244         raise ConnectionClosedError("Connection closed while reading")
245       new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
246       self._buffer = new_msgs.pop()
247       self._msgs.extend(new_msgs)
248     return self._msgs.popleft()
249
250   def Call(self, msg):
251     """Send a message and wait for the response.
252
253     This is just a wrapper over Send and Recv.
254
255     """
256     self.Send(msg)
257     return self.Recv()
258
259   def Close(self):
260     """Close the socket"""
261     if self.socket is not None:
262       self.socket.close()
263       self.socket = None
264
265
266 def ParseRequest(msg):
267   """Parses a LUXI request message.
268
269   """
270   try:
271     request = serializer.LoadJson(msg)
272   except ValueError, err:
273     raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
274
275   logging.debug("LUXI request: %s", request)
276
277   if not isinstance(request, dict):
278     logging.error("LUXI request not a dict: %r", msg)
279     raise ProtocolError("Invalid LUXI request (not a dict)")
280
281   method = request.get(KEY_METHOD, None) # pylint: disable=E1103
282   args = request.get(KEY_ARGS, None) # pylint: disable=E1103
283   version = request.get(KEY_VERSION, None) # pylint: disable=E1103
284
285   if method is None or args is None:
286     logging.error("LUXI request missing method or arguments: %r", msg)
287     raise ProtocolError(("Invalid LUXI request (no method or arguments"
288                          " in request): %r") % msg)
289
290   return (method, args, version)
291
292
293 def ParseResponse(msg):
294   """Parses a LUXI response message.
295
296   """
297   # Parse the result
298   try:
299     data = serializer.LoadJson(msg)
300   except KeyboardInterrupt:
301     raise
302   except Exception, err:
303     raise ProtocolError("Error while deserializing response: %s" % str(err))
304
305   # Validate response
306   if not (isinstance(data, dict) and
307           KEY_SUCCESS in data and
308           KEY_RESULT in data):
309     raise ProtocolError("Invalid response from server: %r" % data)
310
311   return (data[KEY_SUCCESS], data[KEY_RESULT],
312           data.get(KEY_VERSION, None)) # pylint: disable=E1103
313
314
315 def FormatResponse(success, result, version=None):
316   """Formats a LUXI response message.
317
318   """
319   response = {
320     KEY_SUCCESS: success,
321     KEY_RESULT: result,
322     }
323
324   if version is not None:
325     response[KEY_VERSION] = version
326
327   logging.debug("LUXI response: %s", response)
328
329   return serializer.DumpJson(response)
330
331
332 def FormatRequest(method, args, version=None):
333   """Formats a LUXI request message.
334
335   """
336   # Build request
337   request = {
338     KEY_METHOD: method,
339     KEY_ARGS: args,
340     }
341
342   if version is not None:
343     request[KEY_VERSION] = version
344
345   # Serialize the request
346   return serializer.DumpJson(request, indent=False)
347
348
349 def CallLuxiMethod(transport_cb, method, args, version=None):
350   """Send a LUXI request via a transport and return the response.
351
352   """
353   assert callable(transport_cb)
354
355   request_msg = FormatRequest(method, args, version=version)
356
357   # Send request and wait for response
358   response_msg = transport_cb(request_msg)
359
360   (success, result, resp_version) = ParseResponse(response_msg)
361
362   # Verify version if there was one in the response
363   if resp_version is not None and resp_version != version:
364     raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
365                            (version, resp_version))
366
367   if success:
368     return result
369
370   errors.MaybeRaise(result)
371   raise RequestError(result)
372
373
374 class Client(object):
375   """High-level client implementation.
376
377   This uses a backing Transport-like class on top of which it
378   implements data serialization/deserialization.
379
380   """
381   def __init__(self, address=None, timeouts=None, transport=Transport):
382     """Constructor for the Client class.
383
384     Arguments:
385       - address: a valid address the the used transport class
386       - timeout: a list of timeouts, to be used on connect and read/write
387       - transport: a Transport-like class
388
389
390     If timeout is not passed, the default timeouts of the transport
391     class are used.
392
393     """
394     if address is None:
395       address = constants.MASTER_SOCKET
396     self.address = address
397     self.timeouts = timeouts
398     self.transport_class = transport
399     self.transport = None
400     self._InitTransport()
401
402   def _InitTransport(self):
403     """(Re)initialize the transport if needed.
404
405     """
406     if self.transport is None:
407       self.transport = self.transport_class(self.address,
408                                             timeouts=self.timeouts)
409
410   def _CloseTransport(self):
411     """Close the transport, ignoring errors.
412
413     """
414     if self.transport is None:
415       return
416     try:
417       old_transp = self.transport
418       self.transport = None
419       old_transp.Close()
420     except Exception: # pylint: disable=W0703
421       pass
422
423   def _SendMethodCall(self, data):
424     # Send request and wait for response
425     try:
426       self._InitTransport()
427       return self.transport.Call(data)
428     except Exception:
429       self._CloseTransport()
430       raise
431
432   def Close(self):
433     """Close the underlying connection.
434
435     """
436     self._CloseTransport()
437
438   def CallMethod(self, method, args):
439     """Send a generic request and return the response.
440
441     """
442     if not isinstance(args, (list, tuple)):
443       raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
444                                    " expected list, got %s" % type(args))
445     return CallLuxiMethod(self._SendMethodCall, method, args,
446                           version=constants.LUXI_VERSION)
447
448   def SetQueueDrainFlag(self, drain_flag):
449     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, (drain_flag, ))
450
451   def SetWatcherPause(self, until):
452     return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
453
454   def SubmitJob(self, ops):
455     ops_state = map(lambda op: op.__getstate__(), ops)
456     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
457
458   def SubmitManyJobs(self, jobs):
459     jobs_state = []
460     for ops in jobs:
461       jobs_state.append([op.__getstate__() for op in ops])
462     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
463
464   def CancelJob(self, job_id):
465     return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
466
467   def ArchiveJob(self, job_id):
468     return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
469
470   def AutoArchiveJobs(self, age):
471     timeout = (DEF_RWTO - 1) / 2
472     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
473
474   def WaitForJobChangeOnce(self, job_id, fields,
475                            prev_job_info, prev_log_serial,
476                            timeout=WFJC_TIMEOUT):
477     """Waits for changes on a job.
478
479     @param job_id: Job ID
480     @type fields: list
481     @param fields: List of field names to be observed
482     @type prev_job_info: None or list
483     @param prev_job_info: Previously received job information
484     @type prev_log_serial: None or int/long
485     @param prev_log_serial: Highest log serial number previously received
486     @type timeout: int/float
487     @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
488                     be capped to that value)
489
490     """
491     assert timeout >= 0, "Timeout can not be negative"
492     return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
493                            (job_id, fields, prev_job_info,
494                             prev_log_serial,
495                             min(WFJC_TIMEOUT, timeout)))
496
497   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
498     while True:
499       result = self.WaitForJobChangeOnce(job_id, fields,
500                                          prev_job_info, prev_log_serial)
501       if result != constants.JOB_NOTCHANGED:
502         break
503     return result
504
505   def Query(self, what, fields, qfilter):
506     """Query for resources/items.
507
508     @param what: One of L{constants.QR_VIA_LUXI}
509     @type fields: List of strings
510     @param fields: List of requested fields
511     @type qfilter: None or list
512     @param qfilter: Query filter
513     @rtype: L{objects.QueryResponse}
514
515     """
516     result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
517     return objects.QueryResponse.FromDict(result)
518
519   def QueryFields(self, what, fields):
520     """Query for available fields.
521
522     @param what: One of L{constants.QR_VIA_LUXI}
523     @type fields: None or list of strings
524     @param fields: List of requested fields
525     @rtype: L{objects.QueryFieldsResponse}
526
527     """
528     result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
529     return objects.QueryFieldsResponse.FromDict(result)
530
531   def QueryJobs(self, job_ids, fields):
532     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
533
534   def QueryInstances(self, names, fields, use_locking):
535     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
536
537   def QueryNodes(self, names, fields, use_locking):
538     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
539
540   def QueryGroups(self, names, fields, use_locking):
541     return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
542
543   def QueryExports(self, nodes, use_locking):
544     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
545
546   def QueryClusterInfo(self):
547     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
548
549   def QueryConfigValues(self, fields):
550     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
551
552   def QueryTags(self, kind, name):
553     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
554
555   def QueryLocks(self, fields, sync):
556     warnings.warn("This LUXI call is deprecated and will be removed, use"
557                   " Query(\"%s\", ...) instead" % constants.QR_LOCK)
558     return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))