(2.11) Make disk.name and disk.uuid available in bdev
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011, 2012, 2014 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36 import logging
37
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
42 from ganeti import objects
43 from ganeti import pathutils
44
45
46 KEY_METHOD = constants.LUXI_KEY_METHOD
47 KEY_ARGS = constants.LUXI_KEY_ARGS
48 KEY_SUCCESS = constants.LUXI_KEY_SUCCESS
49 KEY_RESULT = constants.LUXI_KEY_RESULT
50 KEY_VERSION = constants.LUXI_KEY_VERSION
51
52 REQ_SUBMIT_JOB = constants.LUXI_REQ_SUBMIT_JOB
53 REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = constants.LUXI_REQ_SUBMIT_JOB_TO_DRAINED_QUEUE
54 REQ_SUBMIT_MANY_JOBS = constants.LUXI_REQ_SUBMIT_MANY_JOBS
55 REQ_WAIT_FOR_JOB_CHANGE = constants.LUXI_REQ_WAIT_FOR_JOB_CHANGE
56 REQ_CANCEL_JOB = constants.LUXI_REQ_CANCEL_JOB
57 REQ_ARCHIVE_JOB = constants.LUXI_REQ_ARCHIVE_JOB
58 REQ_CHANGE_JOB_PRIORITY = constants.LUXI_REQ_CHANGE_JOB_PRIORITY
59 REQ_AUTO_ARCHIVE_JOBS = constants.LUXI_REQ_AUTO_ARCHIVE_JOBS
60 REQ_QUERY = constants.LUXI_REQ_QUERY
61 REQ_QUERY_FIELDS = constants.LUXI_REQ_QUERY_FIELDS
62 REQ_QUERY_JOBS = constants.LUXI_REQ_QUERY_JOBS
63 REQ_QUERY_INSTANCES = constants.LUXI_REQ_QUERY_INSTANCES
64 REQ_QUERY_NODES = constants.LUXI_REQ_QUERY_NODES
65 REQ_QUERY_GROUPS = constants.LUXI_REQ_QUERY_GROUPS
66 REQ_QUERY_NETWORKS = constants.LUXI_REQ_QUERY_NETWORKS
67 REQ_QUERY_EXPORTS = constants.LUXI_REQ_QUERY_EXPORTS
68 REQ_QUERY_CONFIG_VALUES = constants.LUXI_REQ_QUERY_CONFIG_VALUES
69 REQ_QUERY_CLUSTER_INFO = constants.LUXI_REQ_QUERY_CLUSTER_INFO
70 REQ_QUERY_TAGS = constants.LUXI_REQ_QUERY_TAGS
71 REQ_SET_DRAIN_FLAG = constants.LUXI_REQ_SET_DRAIN_FLAG
72 REQ_SET_WATCHER_PAUSE = constants.LUXI_REQ_SET_WATCHER_PAUSE
73 REQ_ALL = constants.LUXI_REQ_ALL
74
75 DEF_CTMO = constants.LUXI_DEF_CTMO
76 DEF_RWTO = constants.LUXI_DEF_RWTO
77 WFJC_TIMEOUT = constants.LUXI_WFJC_TIMEOUT
78
79
80 class ProtocolError(errors.LuxiError):
81   """Denotes an error in the LUXI protocol."""
82
83
84 class ConnectionClosedError(ProtocolError):
85   """Connection closed error."""
86
87
88 class TimeoutError(ProtocolError):
89   """Operation timeout error."""
90
91
92 class RequestError(ProtocolError):
93   """Error on request.
94
95   This signifies an error in the request format or request handling,
96   but not (e.g.) an error in starting up an instance.
97
98   Some common conditions that can trigger this exception:
99     - job submission failed because the job data was wrong
100     - query failed because required fields were missing
101
102   """
103
104
105 class NoMasterError(ProtocolError):
106   """The master cannot be reached.
107
108   This means that the master daemon is not running or the socket has
109   been removed.
110
111   """
112
113
114 class PermissionError(ProtocolError):
115   """Permission denied while connecting to the master socket.
116
117   This means the user doesn't have the proper rights.
118
119   """
120
121
122 class Transport:
123   """Low-level transport class.
124
125   This is used on the client side.
126
127   This could be replace by any other class that provides the same
128   semantics to the Client. This means:
129     - can send messages and receive messages
130     - safe for multithreading
131
132   """
133
134   def __init__(self, address, timeouts=None):
135     """Constructor for the Client class.
136
137     Arguments:
138       - address: a valid address the the used transport class
139       - timeout: a list of timeouts, to be used on connect and read/write
140
141     There are two timeouts used since we might want to wait for a long
142     time for a response, but the connect timeout should be lower.
143
144     If not passed, we use a default of 10 and respectively 60 seconds.
145
146     Note that on reading data, since the timeout applies to an
147     invidual receive, it might be that the total duration is longer
148     than timeout value passed (we make a hard limit at twice the read
149     timeout).
150
151     """
152     self.address = address
153     if timeouts is None:
154       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
155     else:
156       self._ctimeout, self._rwtimeout = timeouts
157
158     self.socket = None
159     self._buffer = ""
160     self._msgs = collections.deque()
161
162     try:
163       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
164
165       # Try to connect
166       try:
167         utils.Retry(self._Connect, 1.0, self._ctimeout,
168                     args=(self.socket, address, self._ctimeout))
169       except utils.RetryTimeout:
170         raise TimeoutError("Connect timed out")
171
172       self.socket.settimeout(self._rwtimeout)
173     except (socket.error, NoMasterError):
174       if self.socket is not None:
175         self.socket.close()
176       self.socket = None
177       raise
178
179   @staticmethod
180   def _Connect(sock, address, timeout):
181     sock.settimeout(timeout)
182     try:
183       sock.connect(address)
184     except socket.timeout, err:
185       raise TimeoutError("Connect timed out: %s" % str(err))
186     except socket.error, err:
187       error_code = err.args[0]
188       if error_code in (errno.ENOENT, errno.ECONNREFUSED):
189         raise NoMasterError(address)
190       elif error_code in (errno.EPERM, errno.EACCES):
191         raise PermissionError(address)
192       elif error_code == errno.EAGAIN:
193         # Server's socket backlog is full at the moment
194         raise utils.RetryAgain()
195       raise
196
197   def _CheckSocket(self):
198     """Make sure we are connected.
199
200     """
201     if self.socket is None:
202       raise ProtocolError("Connection is closed")
203
204   def Send(self, msg):
205     """Send a message.
206
207     This just sends a message and doesn't wait for the response.
208
209     """
210     if constants.LUXI_EOM in msg:
211       raise ProtocolError("Message terminator found in payload")
212
213     self._CheckSocket()
214     try:
215       # TODO: sendall is not guaranteed to send everything
216       self.socket.sendall(msg + constants.LUXI_EOM)
217     except socket.timeout, err:
218       raise TimeoutError("Sending timeout: %s" % str(err))
219
220   def Recv(self):
221     """Try to receive a message from the socket.
222
223     In case we already have messages queued, we just return from the
224     queue. Otherwise, we try to read data with a _rwtimeout network
225     timeout, and making sure we don't go over 2x_rwtimeout as a global
226     limit.
227
228     """
229     self._CheckSocket()
230     etime = time.time() + self._rwtimeout
231     while not self._msgs:
232       if time.time() > etime:
233         raise TimeoutError("Extended receive timeout")
234       while True:
235         try:
236           data = self.socket.recv(4096)
237         except socket.timeout, err:
238           raise TimeoutError("Receive timeout: %s" % str(err))
239         except socket.error, err:
240           if err.args and err.args[0] == errno.EAGAIN:
241             continue
242           raise
243         break
244       if not data:
245         raise ConnectionClosedError("Connection closed while reading")
246       new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
247       self._buffer = new_msgs.pop()
248       self._msgs.extend(new_msgs)
249     return self._msgs.popleft()
250
251   def Call(self, msg):
252     """Send a message and wait for the response.
253
254     This is just a wrapper over Send and Recv.
255
256     """
257     self.Send(msg)
258     return self.Recv()
259
260   def Close(self):
261     """Close the socket"""
262     if self.socket is not None:
263       self.socket.close()
264       self.socket = None
265
266
267 def ParseRequest(msg):
268   """Parses a LUXI request message.
269
270   """
271   try:
272     request = serializer.LoadJson(msg)
273   except ValueError, err:
274     raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
275
276   logging.debug("LUXI request: %s", request)
277
278   if not isinstance(request, dict):
279     logging.error("LUXI request not a dict: %r", msg)
280     raise ProtocolError("Invalid LUXI request (not a dict)")
281
282   method = request.get(KEY_METHOD, None) # pylint: disable=E1103
283   args = request.get(KEY_ARGS, None) # pylint: disable=E1103
284   version = request.get(KEY_VERSION, None) # pylint: disable=E1103
285
286   if method is None or args is None:
287     logging.error("LUXI request missing method or arguments: %r", msg)
288     raise ProtocolError(("Invalid LUXI request (no method or arguments"
289                          " in request): %r") % msg)
290
291   return (method, args, version)
292
293
294 def ParseResponse(msg):
295   """Parses a LUXI response message.
296
297   """
298   # Parse the result
299   try:
300     data = serializer.LoadJson(msg)
301   except KeyboardInterrupt:
302     raise
303   except Exception, err:
304     raise ProtocolError("Error while deserializing response: %s" % str(err))
305
306   # Validate response
307   if not (isinstance(data, dict) and
308           KEY_SUCCESS in data and
309           KEY_RESULT in data):
310     raise ProtocolError("Invalid response from server: %r" % data)
311
312   return (data[KEY_SUCCESS], data[KEY_RESULT],
313           data.get(KEY_VERSION, None)) # pylint: disable=E1103
314
315
316 def FormatResponse(success, result, version=None):
317   """Formats a LUXI response message.
318
319   """
320   response = {
321     KEY_SUCCESS: success,
322     KEY_RESULT: result,
323     }
324
325   if version is not None:
326     response[KEY_VERSION] = version
327
328   logging.debug("LUXI response: %s", response)
329
330   return serializer.DumpJson(response)
331
332
333 def FormatRequest(method, args, version=None):
334   """Formats a LUXI request message.
335
336   """
337   # Build request
338   request = {
339     KEY_METHOD: method,
340     KEY_ARGS: args,
341     }
342
343   if version is not None:
344     request[KEY_VERSION] = version
345
346   # Serialize the request
347   return serializer.DumpJson(request)
348
349
350 def CallLuxiMethod(transport_cb, method, args, version=None):
351   """Send a LUXI request via a transport and return the response.
352
353   """
354   assert callable(transport_cb)
355
356   request_msg = FormatRequest(method, args, version=version)
357
358   # Send request and wait for response
359   response_msg = transport_cb(request_msg)
360
361   (success, result, resp_version) = ParseResponse(response_msg)
362
363   # Verify version if there was one in the response
364   if resp_version is not None and resp_version != version:
365     raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
366                            (version, resp_version))
367
368   if success:
369     return result
370
371   errors.MaybeRaise(result)
372   raise RequestError(result)
373
374
375 class Client(object):
376   """High-level client implementation.
377
378   This uses a backing Transport-like class on top of which it
379   implements data serialization/deserialization.
380
381   """
382   def __init__(self, address=None, timeouts=None, transport=Transport):
383     """Constructor for the Client class.
384
385     Arguments:
386       - address: a valid address the the used transport class
387       - timeout: a list of timeouts, to be used on connect and read/write
388       - transport: a Transport-like class
389
390
391     If timeout is not passed, the default timeouts of the transport
392     class are used.
393
394     """
395     if address is None:
396       address = pathutils.MASTER_SOCKET
397     self.address = address
398     self.timeouts = timeouts
399     self.transport_class = transport
400     self.transport = None
401     self._InitTransport()
402
403   def _InitTransport(self):
404     """(Re)initialize the transport if needed.
405
406     """
407     if self.transport is None:
408       self.transport = self.transport_class(self.address,
409                                             timeouts=self.timeouts)
410
411   def _CloseTransport(self):
412     """Close the transport, ignoring errors.
413
414     """
415     if self.transport is None:
416       return
417     try:
418       old_transp = self.transport
419       self.transport = None
420       old_transp.Close()
421     except Exception: # pylint: disable=W0703
422       pass
423
424   def _SendMethodCall(self, data):
425     # Send request and wait for response
426     try:
427       self._InitTransport()
428       return self.transport.Call(data)
429     except Exception:
430       self._CloseTransport()
431       raise
432
433   def Close(self):
434     """Close the underlying connection.
435
436     """
437     self._CloseTransport()
438
439   def CallMethod(self, method, args):
440     """Send a generic request and return the response.
441
442     """
443     if not isinstance(args, (list, tuple)):
444       raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
445                                    " expected list, got %s" % type(args))
446     return CallLuxiMethod(self._SendMethodCall, method, args,
447                           version=constants.LUXI_VERSION)
448
449   def SetQueueDrainFlag(self, drain_flag):
450     return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
451
452   def SetWatcherPause(self, until):
453     return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
454
455   def SubmitJob(self, ops):
456     ops_state = map(lambda op: op.__getstate__(), ops)
457     return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
458
459   def SubmitJobToDrainedQueue(self, ops):
460     ops_state = map(lambda op: op.__getstate__(), ops)
461     return self.CallMethod(REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, (ops_state, ))
462
463   def SubmitManyJobs(self, jobs):
464     jobs_state = []
465     for ops in jobs:
466       jobs_state.append([op.__getstate__() for op in ops])
467     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
468
469   @staticmethod
470   def _PrepareJobId(request_name, job_id):
471     try:
472       return int(job_id)
473     except ValueError:
474       raise RequestError("Invalid parameter passed to %s as job id: "
475                          " expected integer, got value %s" %
476                          (request_name, job_id))
477
478   def CancelJob(self, job_id):
479     job_id = Client._PrepareJobId(REQ_CANCEL_JOB, job_id)
480     return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
481
482   def ArchiveJob(self, job_id):
483     job_id = Client._PrepareJobId(REQ_ARCHIVE_JOB, job_id)
484     return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
485
486   def ChangeJobPriority(self, job_id, priority):
487     job_id = Client._PrepareJobId(REQ_CHANGE_JOB_PRIORITY, job_id)
488     return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
489
490   def AutoArchiveJobs(self, age):
491     timeout = (DEF_RWTO - 1) / 2
492     return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
493
494   def WaitForJobChangeOnce(self, job_id, fields,
495                            prev_job_info, prev_log_serial,
496                            timeout=WFJC_TIMEOUT):
497     """Waits for changes on a job.
498
499     @param job_id: Job ID
500     @type fields: list
501     @param fields: List of field names to be observed
502     @type prev_job_info: None or list
503     @param prev_job_info: Previously received job information
504     @type prev_log_serial: None or int/long
505     @param prev_log_serial: Highest log serial number previously received
506     @type timeout: int/float
507     @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
508                     be capped to that value)
509
510     """
511     assert timeout >= 0, "Timeout can not be negative"
512     return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
513                            (job_id, fields, prev_job_info,
514                             prev_log_serial,
515                             min(WFJC_TIMEOUT, timeout)))
516
517   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
518     job_id = Client._PrepareJobId(REQ_WAIT_FOR_JOB_CHANGE, job_id)
519     while True:
520       result = self.WaitForJobChangeOnce(job_id, fields,
521                                          prev_job_info, prev_log_serial)
522       if result != constants.JOB_NOTCHANGED:
523         break
524     return result
525
526   def Query(self, what, fields, qfilter):
527     """Query for resources/items.
528
529     @param what: One of L{constants.QR_VIA_LUXI}
530     @type fields: List of strings
531     @param fields: List of requested fields
532     @type qfilter: None or list
533     @param qfilter: Query filter
534     @rtype: L{objects.QueryResponse}
535
536     """
537     result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
538     return objects.QueryResponse.FromDict(result)
539
540   def QueryFields(self, what, fields):
541     """Query for available fields.
542
543     @param what: One of L{constants.QR_VIA_LUXI}
544     @type fields: None or list of strings
545     @param fields: List of requested fields
546     @rtype: L{objects.QueryFieldsResponse}
547
548     """
549     result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
550     return objects.QueryFieldsResponse.FromDict(result)
551
552   def QueryJobs(self, job_ids, fields):
553     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
554
555   def QueryInstances(self, names, fields, use_locking):
556     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
557
558   def QueryNodes(self, names, fields, use_locking):
559     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
560
561   def QueryGroups(self, names, fields, use_locking):
562     return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
563
564   def QueryNetworks(self, names, fields, use_locking):
565     return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
566
567   def QueryExports(self, nodes, use_locking):
568     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
569
570   def QueryClusterInfo(self):
571     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
572
573   def QueryConfigValues(self, fields):
574     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
575
576   def QueryTags(self, kind, name):
577     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))