Add opcode parameter descriptions
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36 import logging
37
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
42 from ganeti import objects
43
44
45 KEY_METHOD = "method"
46 KEY_ARGS = "args"
47 KEY_SUCCESS = "success"
48 KEY_RESULT = "result"
49 KEY_VERSION = "version"
50
51 REQ_SUBMIT_JOB = "SubmitJob"
52 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
53 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
54 REQ_CANCEL_JOB = "CancelJob"
55 REQ_ARCHIVE_JOB = "ArchiveJob"
56 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
57 REQ_QUERY = "Query"
58 REQ_QUERY_FIELDS = "QueryFields"
59 REQ_QUERY_JOBS = "QueryJobs"
60 REQ_QUERY_INSTANCES = "QueryInstances"
61 REQ_QUERY_NODES = "QueryNodes"
62 REQ_QUERY_GROUPS = "QueryGroups"
63 REQ_QUERY_EXPORTS = "QueryExports"
64 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
65 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
66 REQ_QUERY_TAGS = "QueryTags"
67 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
68 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
69
70 DEF_CTMO = 10
71 DEF_RWTO = 60
72
73 # WaitForJobChange timeout
74 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
75
76
77 class ProtocolError(errors.LuxiError):
78   """Denotes an error in the LUXI protocol."""
79
80
81 class ConnectionClosedError(ProtocolError):
82   """Connection closed error."""
83
84
85 class TimeoutError(ProtocolError):
86   """Operation timeout error."""
87
88
89 class RequestError(ProtocolError):
90   """Error on request.
91
92   This signifies an error in the request format or request handling,
93   but not (e.g.) an error in starting up an instance.
94
95   Some common conditions that can trigger this exception:
96     - job submission failed because the job data was wrong
97     - query failed because required fields were missing
98
99   """
100
101
102 class NoMasterError(ProtocolError):
103   """The master cannot be reached.
104
105   This means that the master daemon is not running or the socket has
106   been removed.
107
108   """
109
110
111 class PermissionError(ProtocolError):
112   """Permission denied while connecting to the master socket.
113
114   This means the user doesn't have the proper rights.
115
116   """
117
118
119 class Transport:
120   """Low-level transport class.
121
122   This is used on the client side.
123
124   This could be replace by any other class that provides the same
125   semantics to the Client. This means:
126     - can send messages and receive messages
127     - safe for multithreading
128
129   """
130
131   def __init__(self, address, timeouts=None):
132     """Constructor for the Client class.
133
134     Arguments:
135       - address: a valid address the the used transport class
136       - timeout: a list of timeouts, to be used on connect and read/write
137
138     There are two timeouts used since we might want to wait for a long
139     time for a response, but the connect timeout should be lower.
140
141     If not passed, we use a default of 10 and respectively 60 seconds.
142
143     Note that on reading data, since the timeout applies to an
144     invidual receive, it might be that the total duration is longer
145     than timeout value passed (we make a hard limit at twice the read
146     timeout).
147
148     """
149     self.address = address
150     if timeouts is None:
151       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
152     else:
153       self._ctimeout, self._rwtimeout = timeouts
154
155     self.socket = None
156     self._buffer = ""
157     self._msgs = collections.deque()
158
159     try:
160       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
161
162       # Try to connect
163       try:
164         utils.Retry(self._Connect, 1.0, self._ctimeout,
165                     args=(self.socket, address, self._ctimeout))
166       except utils.RetryTimeout:
167         raise TimeoutError("Connect timed out")
168
169       self.socket.settimeout(self._rwtimeout)
170     except (socket.error, NoMasterError):
171       if self.socket is not None:
172         self.socket.close()
173       self.socket = None
174       raise
175
176   @staticmethod
177   def _Connect(sock, address, timeout):
178     sock.settimeout(timeout)
179     try:
180       sock.connect(address)
181     except socket.timeout, err:
182       raise TimeoutError("Connect timed out: %s" % str(err))
183     except socket.error, err:
184       error_code = err.args[0]
185       if error_code in (errno.ENOENT, errno.ECONNREFUSED):
186         raise NoMasterError(address)
187       elif error_code in (errno.EPERM, errno.EACCES):
188         raise PermissionError(address)
189       elif error_code == errno.EAGAIN:
190         # Server's socket backlog is full at the moment
191         raise utils.RetryAgain()
192       raise
193
194   def _CheckSocket(self):
195     """Make sure we are connected.
196
197     """
198     if self.socket is None:
199       raise ProtocolError("Connection is closed")
200
201   def Send(self, msg):
202     """Send a message.
203
204     This just sends a message and doesn't wait for the response.
205
206     """
207     if constants.LUXI_EOM in msg:
208       raise ProtocolError("Message terminator found in payload")
209
210     self._CheckSocket()
211     try:
212       # TODO: sendall is not guaranteed to send everything
213       self.socket.sendall(msg + constants.LUXI_EOM)
214     except socket.timeout, err:
215       raise TimeoutError("Sending timeout: %s" % str(err))
216
217   def Recv(self):
218     """Try to receive a message from the socket.
219
220     In case we already have messages queued, we just return from the
221     queue. Otherwise, we try to read data with a _rwtimeout network
222     timeout, and making sure we don't go over 2x_rwtimeout as a global
223     limit.
224
225     """
226     self._CheckSocket()
227     etime = time.time() + self._rwtimeout
228     while not self._msgs:
229       if time.time() > etime:
230         raise TimeoutError("Extended receive timeout")
231       while True:
232         try:
233           data = self.socket.recv(4096)
234         except socket.timeout, err:
235           raise TimeoutError("Receive timeout: %s" % str(err))
236         except socket.error, err:
237           if err.args and err.args[0] == errno.EAGAIN:
238             continue
239           raise
240         break
241       if not data:
242         raise ConnectionClosedError("Connection closed while reading")
243       new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
244       self._buffer = new_msgs.pop()
245       self._msgs.extend(new_msgs)
246     return self._msgs.popleft()
247
248   def Call(self, msg):
249     """Send a message and wait for the response.
250
251     This is just a wrapper over Send and Recv.
252
253     """
254     self.Send(msg)
255     return self.Recv()
256
257   def Close(self):
258     """Close the socket"""
259     if self.socket is not None:
260       self.socket.close()
261       self.socket = None
262
263
264 def ParseRequest(msg):
265   """Parses a LUXI request message.
266
267   """
268   try:
269     request = serializer.LoadJson(msg)
270   except ValueError, err:
271     raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
272
273   logging.debug("LUXI request: %s", request)
274
275   if not isinstance(request, dict):
276     logging.error("LUXI request not a dict: %r", msg)
277     raise ProtocolError("Invalid LUXI request (not a dict)")
278
279   method = request.get(KEY_METHOD, None) # pylint: disable=E1103
280   args = request.get(KEY_ARGS, None) # pylint: disable=E1103
281   version = request.get(KEY_VERSION, None) # pylint: disable=E1103
282
283   if method is None or args is None:
284     logging.error("LUXI request missing method or arguments: %r", msg)
285     raise ProtocolError(("Invalid LUXI request (no method or arguments"
286                          " in request): %r") % msg)
287
288   return (method, args, version)
289
290
291 def ParseResponse(msg):
292   """Parses a LUXI response message.
293
294   """
295   # Parse the result
296   try:
297     data = serializer.LoadJson(msg)
298   except KeyboardInterrupt:
299     raise
300   except Exception, err:
301     raise ProtocolError("Error while deserializing response: %s" % str(err))
302
303   # Validate response
304   if not (isinstance(data, dict) and
305           KEY_SUCCESS in data and
306           KEY_RESULT in data):
307     raise ProtocolError("Invalid response from server: %r" % data)
308
309   return (data[KEY_SUCCESS], data[KEY_RESULT],
310           data.get(KEY_VERSION, None)) # pylint: disable=E1103
311
312
313 def FormatResponse(success, result, version=None):
314   """Formats a LUXI response message.
315
316   """
317   response = {
318     KEY_SUCCESS: success,
319     KEY_RESULT: result,
320     }
321
322   if version is not None:
323     response[KEY_VERSION] = version
324
325   logging.debug("LUXI response: %s", response)
326
327   return serializer.DumpJson(response)
328
329
330 def FormatRequest(method, args, version=None):
331   """Formats a LUXI request message.
332
333   """
334   # Build request
335   request = {
336     KEY_METHOD: method,
337     KEY_ARGS: args,
338     }
339
340   if version is not None:
341     request[KEY_VERSION] = version
342
343   # Serialize the request
344   return serializer.DumpJson(request)
345
346
347 def CallLuxiMethod(transport_cb, method, args, version=None):
348   """Send a LUXI request via a transport and return the response.
349
350   """
351   assert callable(transport_cb)
352
353   request_msg = FormatRequest(method, args, version=version)
354
355   # Send request and wait for response
356   response_msg = transport_cb(request_msg)
357
358   (success, result, resp_version) = ParseResponse(response_msg)
359
360   # Verify version if there was one in the response
361   if resp_version is not None and resp_version != version:
362     raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
363                            (version, resp_version))
364
365   if success:
366     return result
367
368   errors.MaybeRaise(result)
369   raise RequestError(result)
370
371
372 class Client(object):
373   """High-level client implementation.
374
375   This uses a backing Transport-like class on top of which it
376   implements data serialization/deserialization.
377
378   """
379   def __init__(self, address=None, timeouts=None, transport=Transport):
380     """Constructor for the Client class.
381
382     Arguments:
383       - address: a valid address the the used transport class
384       - timeout: a list of timeouts, to be used on connect and read/write
385       - transport: a Transport-like class
386
387
388     If timeout is not passed, the default timeouts of the transport
389     class are used.
390
391     """
392     if address is None:
393       address = constants.MASTER_SOCKET
394     self.address = address
395     self.timeouts = timeouts
396     self.transport_class = transport
397     self.transport = None
398     self._InitTransport()
399
400   def _InitTransport(self):
401     """(Re)initialize the transport if needed.
402
403     """
404     if self.transport is None:
405       self.transport = self.transport_class(self.address,
406                                             timeouts=self.timeouts)
407
408   def _CloseTransport(self):
409     """Close the transport, ignoring errors.
410
411     """
412     if self.transport is None:
413       return
414     try:
415       old_transp = self.transport
416       self.transport = None
417       old_transp.Close()
418     except Exception: # pylint: disable=W0703
419       pass
420
421   def _SendMethodCall(self, data):
422     # Send request and wait for response
423     try:
424       self._InitTransport()
425       return self.transport.Call(data)
426     except Exception:
427       self._CloseTransport()
428       raise
429
430   def Close(self):
431     """Close the underlying connection.
432
433     """
434     self._CloseTransport()
435
436   def CallMethod(self, method, args):
437     """Send a generic request and return the response.
438
439     """
440     if not isinstance(args, (list, tuple)):
441       raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
442                                    " expected list, got %s" % type(args))
443     return CallLuxiMethod(self._SendMethodCall, method, args,
444                           version=constants.LUXI_VERSION)
445
446   def SetQueueDrainFlag(self, drain_flag):
447     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, (drain_flag, ))
448
449   def SetWatcherPause(self, until):
450     return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
451
452   def SubmitJob(self, ops):
453     ops_state = map(lambda op: op.__getstate__(), ops)
454     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
455
456   def SubmitManyJobs(self, jobs):
457     jobs_state = []
458     for ops in jobs:
459       jobs_state.append([op.__getstate__() for op in ops])
460     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
461
462   def CancelJob(self, job_id):
463     return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
464
465   def ArchiveJob(self, job_id):
466     return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
467
468   def AutoArchiveJobs(self, age):
469     timeout = (DEF_RWTO - 1) / 2
470     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
471
472   def WaitForJobChangeOnce(self, job_id, fields,
473                            prev_job_info, prev_log_serial,
474                            timeout=WFJC_TIMEOUT):
475     """Waits for changes on a job.
476
477     @param job_id: Job ID
478     @type fields: list
479     @param fields: List of field names to be observed
480     @type prev_job_info: None or list
481     @param prev_job_info: Previously received job information
482     @type prev_log_serial: None or int/long
483     @param prev_log_serial: Highest log serial number previously received
484     @type timeout: int/float
485     @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
486                     be capped to that value)
487
488     """
489     assert timeout >= 0, "Timeout can not be negative"
490     return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
491                            (job_id, fields, prev_job_info,
492                             prev_log_serial,
493                             min(WFJC_TIMEOUT, timeout)))
494
495   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
496     while True:
497       result = self.WaitForJobChangeOnce(job_id, fields,
498                                          prev_job_info, prev_log_serial)
499       if result != constants.JOB_NOTCHANGED:
500         break
501     return result
502
503   def Query(self, what, fields, qfilter):
504     """Query for resources/items.
505
506     @param what: One of L{constants.QR_VIA_LUXI}
507     @type fields: List of strings
508     @param fields: List of requested fields
509     @type qfilter: None or list
510     @param qfilter: Query filter
511     @rtype: L{objects.QueryResponse}
512
513     """
514     result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
515     return objects.QueryResponse.FromDict(result)
516
517   def QueryFields(self, what, fields):
518     """Query for available fields.
519
520     @param what: One of L{constants.QR_VIA_LUXI}
521     @type fields: None or list of strings
522     @param fields: List of requested fields
523     @rtype: L{objects.QueryFieldsResponse}
524
525     """
526     result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
527     return objects.QueryFieldsResponse.FromDict(result)
528
529   def QueryJobs(self, job_ids, fields):
530     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
531
532   def QueryInstances(self, names, fields, use_locking):
533     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
534
535   def QueryNodes(self, names, fields, use_locking):
536     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
537
538   def QueryGroups(self, names, fields, use_locking):
539     return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
540
541   def QueryExports(self, nodes, use_locking):
542     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
543
544   def QueryClusterInfo(self):
545     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
546
547   def QueryConfigValues(self, fields):
548     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
549
550   def QueryTags(self, kind, name):
551     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))