verify-disks: Explicitely state nothing has to be done
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36 import logging
37
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
42 from ganeti import objects
43
44
45 KEY_METHOD = "method"
46 KEY_ARGS = "args"
47 KEY_SUCCESS = "success"
48 KEY_RESULT = "result"
49 KEY_VERSION = "version"
50
51 REQ_SUBMIT_JOB = "SubmitJob"
52 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
53 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
54 REQ_CANCEL_JOB = "CancelJob"
55 REQ_ARCHIVE_JOB = "ArchiveJob"
56 REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
57 REQ_QUERY = "Query"
58 REQ_QUERY_FIELDS = "QueryFields"
59 REQ_QUERY_JOBS = "QueryJobs"
60 REQ_QUERY_INSTANCES = "QueryInstances"
61 REQ_QUERY_NODES = "QueryNodes"
62 REQ_QUERY_GROUPS = "QueryGroups"
63 REQ_QUERY_EXPORTS = "QueryExports"
64 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
65 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
66 REQ_QUERY_TAGS = "QueryTags"
67 REQ_SET_DRAIN_FLAG = "SetDrainFlag"
68 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
69
70 #: List of all LUXI requests
71 REQ_ALL = frozenset([
72   REQ_ARCHIVE_JOB,
73   REQ_AUTO_ARCHIVE_JOBS,
74   REQ_CANCEL_JOB,
75   REQ_QUERY,
76   REQ_QUERY_CLUSTER_INFO,
77   REQ_QUERY_CONFIG_VALUES,
78   REQ_QUERY_EXPORTS,
79   REQ_QUERY_FIELDS,
80   REQ_QUERY_GROUPS,
81   REQ_QUERY_INSTANCES,
82   REQ_QUERY_JOBS,
83   REQ_QUERY_NODES,
84   REQ_QUERY_TAGS,
85   REQ_SET_DRAIN_FLAG,
86   REQ_SET_WATCHER_PAUSE,
87   REQ_SUBMIT_JOB,
88   REQ_SUBMIT_MANY_JOBS,
89   REQ_WAIT_FOR_JOB_CHANGE,
90   ])
91
92 DEF_CTMO = 10
93 DEF_RWTO = 60
94
95 # WaitForJobChange timeout
96 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
97
98
99 class ProtocolError(errors.LuxiError):
100   """Denotes an error in the LUXI protocol."""
101
102
103 class ConnectionClosedError(ProtocolError):
104   """Connection closed error."""
105
106
107 class TimeoutError(ProtocolError):
108   """Operation timeout error."""
109
110
111 class RequestError(ProtocolError):
112   """Error on request.
113
114   This signifies an error in the request format or request handling,
115   but not (e.g.) an error in starting up an instance.
116
117   Some common conditions that can trigger this exception:
118     - job submission failed because the job data was wrong
119     - query failed because required fields were missing
120
121   """
122
123
124 class NoMasterError(ProtocolError):
125   """The master cannot be reached.
126
127   This means that the master daemon is not running or the socket has
128   been removed.
129
130   """
131
132
133 class PermissionError(ProtocolError):
134   """Permission denied while connecting to the master socket.
135
136   This means the user doesn't have the proper rights.
137
138   """
139
140
141 class Transport:
142   """Low-level transport class.
143
144   This is used on the client side.
145
146   This could be replace by any other class that provides the same
147   semantics to the Client. This means:
148     - can send messages and receive messages
149     - safe for multithreading
150
151   """
152
153   def __init__(self, address, timeouts=None):
154     """Constructor for the Client class.
155
156     Arguments:
157       - address: a valid address the the used transport class
158       - timeout: a list of timeouts, to be used on connect and read/write
159
160     There are two timeouts used since we might want to wait for a long
161     time for a response, but the connect timeout should be lower.
162
163     If not passed, we use a default of 10 and respectively 60 seconds.
164
165     Note that on reading data, since the timeout applies to an
166     invidual receive, it might be that the total duration is longer
167     than timeout value passed (we make a hard limit at twice the read
168     timeout).
169
170     """
171     self.address = address
172     if timeouts is None:
173       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
174     else:
175       self._ctimeout, self._rwtimeout = timeouts
176
177     self.socket = None
178     self._buffer = ""
179     self._msgs = collections.deque()
180
181     try:
182       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
183
184       # Try to connect
185       try:
186         utils.Retry(self._Connect, 1.0, self._ctimeout,
187                     args=(self.socket, address, self._ctimeout))
188       except utils.RetryTimeout:
189         raise TimeoutError("Connect timed out")
190
191       self.socket.settimeout(self._rwtimeout)
192     except (socket.error, NoMasterError):
193       if self.socket is not None:
194         self.socket.close()
195       self.socket = None
196       raise
197
198   @staticmethod
199   def _Connect(sock, address, timeout):
200     sock.settimeout(timeout)
201     try:
202       sock.connect(address)
203     except socket.timeout, err:
204       raise TimeoutError("Connect timed out: %s" % str(err))
205     except socket.error, err:
206       error_code = err.args[0]
207       if error_code in (errno.ENOENT, errno.ECONNREFUSED):
208         raise NoMasterError(address)
209       elif error_code in (errno.EPERM, errno.EACCES):
210         raise PermissionError(address)
211       elif error_code == errno.EAGAIN:
212         # Server's socket backlog is full at the moment
213         raise utils.RetryAgain()
214       raise
215
216   def _CheckSocket(self):
217     """Make sure we are connected.
218
219     """
220     if self.socket is None:
221       raise ProtocolError("Connection is closed")
222
223   def Send(self, msg):
224     """Send a message.
225
226     This just sends a message and doesn't wait for the response.
227
228     """
229     if constants.LUXI_EOM in msg:
230       raise ProtocolError("Message terminator found in payload")
231
232     self._CheckSocket()
233     try:
234       # TODO: sendall is not guaranteed to send everything
235       self.socket.sendall(msg + constants.LUXI_EOM)
236     except socket.timeout, err:
237       raise TimeoutError("Sending timeout: %s" % str(err))
238
239   def Recv(self):
240     """Try to receive a message from the socket.
241
242     In case we already have messages queued, we just return from the
243     queue. Otherwise, we try to read data with a _rwtimeout network
244     timeout, and making sure we don't go over 2x_rwtimeout as a global
245     limit.
246
247     """
248     self._CheckSocket()
249     etime = time.time() + self._rwtimeout
250     while not self._msgs:
251       if time.time() > etime:
252         raise TimeoutError("Extended receive timeout")
253       while True:
254         try:
255           data = self.socket.recv(4096)
256         except socket.timeout, err:
257           raise TimeoutError("Receive timeout: %s" % str(err))
258         except socket.error, err:
259           if err.args and err.args[0] == errno.EAGAIN:
260             continue
261           raise
262         break
263       if not data:
264         raise ConnectionClosedError("Connection closed while reading")
265       new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
266       self._buffer = new_msgs.pop()
267       self._msgs.extend(new_msgs)
268     return self._msgs.popleft()
269
270   def Call(self, msg):
271     """Send a message and wait for the response.
272
273     This is just a wrapper over Send and Recv.
274
275     """
276     self.Send(msg)
277     return self.Recv()
278
279   def Close(self):
280     """Close the socket"""
281     if self.socket is not None:
282       self.socket.close()
283       self.socket = None
284
285
286 def ParseRequest(msg):
287   """Parses a LUXI request message.
288
289   """
290   try:
291     request = serializer.LoadJson(msg)
292   except ValueError, err:
293     raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
294
295   logging.debug("LUXI request: %s", request)
296
297   if not isinstance(request, dict):
298     logging.error("LUXI request not a dict: %r", msg)
299     raise ProtocolError("Invalid LUXI request (not a dict)")
300
301   method = request.get(KEY_METHOD, None) # pylint: disable=E1103
302   args = request.get(KEY_ARGS, None) # pylint: disable=E1103
303   version = request.get(KEY_VERSION, None) # pylint: disable=E1103
304
305   if method is None or args is None:
306     logging.error("LUXI request missing method or arguments: %r", msg)
307     raise ProtocolError(("Invalid LUXI request (no method or arguments"
308                          " in request): %r") % msg)
309
310   return (method, args, version)
311
312
313 def ParseResponse(msg):
314   """Parses a LUXI response message.
315
316   """
317   # Parse the result
318   try:
319     data = serializer.LoadJson(msg)
320   except KeyboardInterrupt:
321     raise
322   except Exception, err:
323     raise ProtocolError("Error while deserializing response: %s" % str(err))
324
325   # Validate response
326   if not (isinstance(data, dict) and
327           KEY_SUCCESS in data and
328           KEY_RESULT in data):
329     raise ProtocolError("Invalid response from server: %r" % data)
330
331   return (data[KEY_SUCCESS], data[KEY_RESULT],
332           data.get(KEY_VERSION, None)) # pylint: disable=E1103
333
334
335 def FormatResponse(success, result, version=None):
336   """Formats a LUXI response message.
337
338   """
339   response = {
340     KEY_SUCCESS: success,
341     KEY_RESULT: result,
342     }
343
344   if version is not None:
345     response[KEY_VERSION] = version
346
347   logging.debug("LUXI response: %s", response)
348
349   return serializer.DumpJson(response)
350
351
352 def FormatRequest(method, args, version=None):
353   """Formats a LUXI request message.
354
355   """
356   # Build request
357   request = {
358     KEY_METHOD: method,
359     KEY_ARGS: args,
360     }
361
362   if version is not None:
363     request[KEY_VERSION] = version
364
365   # Serialize the request
366   return serializer.DumpJson(request)
367
368
369 def CallLuxiMethod(transport_cb, method, args, version=None):
370   """Send a LUXI request via a transport and return the response.
371
372   """
373   assert callable(transport_cb)
374
375   request_msg = FormatRequest(method, args, version=version)
376
377   # Send request and wait for response
378   response_msg = transport_cb(request_msg)
379
380   (success, result, resp_version) = ParseResponse(response_msg)
381
382   # Verify version if there was one in the response
383   if resp_version is not None and resp_version != version:
384     raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
385                            (version, resp_version))
386
387   if success:
388     return result
389
390   errors.MaybeRaise(result)
391   raise RequestError(result)
392
393
394 class Client(object):
395   """High-level client implementation.
396
397   This uses a backing Transport-like class on top of which it
398   implements data serialization/deserialization.
399
400   """
401   def __init__(self, address=None, timeouts=None, transport=Transport):
402     """Constructor for the Client class.
403
404     Arguments:
405       - address: a valid address the the used transport class
406       - timeout: a list of timeouts, to be used on connect and read/write
407       - transport: a Transport-like class
408
409
410     If timeout is not passed, the default timeouts of the transport
411     class are used.
412
413     """
414     if address is None:
415       address = constants.MASTER_SOCKET
416     self.address = address
417     self.timeouts = timeouts
418     self.transport_class = transport
419     self.transport = None
420     self._InitTransport()
421
422   def _InitTransport(self):
423     """(Re)initialize the transport if needed.
424
425     """
426     if self.transport is None:
427       self.transport = self.transport_class(self.address,
428                                             timeouts=self.timeouts)
429
430   def _CloseTransport(self):
431     """Close the transport, ignoring errors.
432
433     """
434     if self.transport is None:
435       return
436     try:
437       old_transp = self.transport
438       self.transport = None
439       old_transp.Close()
440     except Exception: # pylint: disable=W0703
441       pass
442
443   def _SendMethodCall(self, data):
444     # Send request and wait for response
445     try:
446       self._InitTransport()
447       return self.transport.Call(data)
448     except Exception:
449       self._CloseTransport()
450       raise
451
452   def Close(self):
453     """Close the underlying connection.
454
455     """
456     self._CloseTransport()
457
458   def CallMethod(self, method, args):
459     """Send a generic request and return the response.
460
461     """
462     if not isinstance(args, (list, tuple)):
463       raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
464                                    " expected list, got %s" % type(args))
465     return CallLuxiMethod(self._SendMethodCall, method, args,
466                           version=constants.LUXI_VERSION)
467
468   def SetQueueDrainFlag(self, drain_flag):
469     return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
470
471   def SetWatcherPause(self, until):
472     return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
473
474   def SubmitJob(self, ops):
475     ops_state = map(lambda op: op.__getstate__(), ops)
476     return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
477
478   def SubmitManyJobs(self, jobs):
479     jobs_state = []
480     for ops in jobs:
481       jobs_state.append([op.__getstate__() for op in ops])
482     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
483
484   def CancelJob(self, job_id):
485     return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
486
487   def ArchiveJob(self, job_id):
488     return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
489
490   def AutoArchiveJobs(self, age):
491     timeout = (DEF_RWTO - 1) / 2
492     return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
493
494   def WaitForJobChangeOnce(self, job_id, fields,
495                            prev_job_info, prev_log_serial,
496                            timeout=WFJC_TIMEOUT):
497     """Waits for changes on a job.
498
499     @param job_id: Job ID
500     @type fields: list
501     @param fields: List of field names to be observed
502     @type prev_job_info: None or list
503     @param prev_job_info: Previously received job information
504     @type prev_log_serial: None or int/long
505     @param prev_log_serial: Highest log serial number previously received
506     @type timeout: int/float
507     @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
508                     be capped to that value)
509
510     """
511     assert timeout >= 0, "Timeout can not be negative"
512     return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
513                            (job_id, fields, prev_job_info,
514                             prev_log_serial,
515                             min(WFJC_TIMEOUT, timeout)))
516
517   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
518     while True:
519       result = self.WaitForJobChangeOnce(job_id, fields,
520                                          prev_job_info, prev_log_serial)
521       if result != constants.JOB_NOTCHANGED:
522         break
523     return result
524
525   def Query(self, what, fields, qfilter):
526     """Query for resources/items.
527
528     @param what: One of L{constants.QR_VIA_LUXI}
529     @type fields: List of strings
530     @param fields: List of requested fields
531     @type qfilter: None or list
532     @param qfilter: Query filter
533     @rtype: L{objects.QueryResponse}
534
535     """
536     result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
537     return objects.QueryResponse.FromDict(result)
538
539   def QueryFields(self, what, fields):
540     """Query for available fields.
541
542     @param what: One of L{constants.QR_VIA_LUXI}
543     @type fields: None or list of strings
544     @param fields: List of requested fields
545     @rtype: L{objects.QueryFieldsResponse}
546
547     """
548     result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
549     return objects.QueryFieldsResponse.FromDict(result)
550
551   def QueryJobs(self, job_ids, fields):
552     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
553
554   def QueryInstances(self, names, fields, use_locking):
555     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
556
557   def QueryNodes(self, names, fields, use_locking):
558     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
559
560   def QueryGroups(self, names, fields, use_locking):
561     return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
562
563   def QueryExports(self, nodes, use_locking):
564     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
565
566   def QueryClusterInfo(self):
567     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
568
569   def QueryConfigValues(self, fields):
570     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
571
572   def QueryTags(self, kind, name):
573     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))