Properly add the UUID to all the disks
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36 import logging
37
38 from ganeti import compat
39 from ganeti import serializer
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import utils
43 from ganeti import objects
44 from ganeti import pathutils
45
46
47 KEY_METHOD = "method"
48 KEY_ARGS = "args"
49 KEY_SUCCESS = "success"
50 KEY_RESULT = "result"
51 KEY_VERSION = "version"
52
53 REQ_SUBMIT_JOB = "SubmitJob"
54 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
55 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
56 REQ_CANCEL_JOB = "CancelJob"
57 REQ_ARCHIVE_JOB = "ArchiveJob"
58 REQ_CHANGE_JOB_PRIORITY = "ChangeJobPriority"
59 REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
60 REQ_QUERY = "Query"
61 REQ_QUERY_FIELDS = "QueryFields"
62 REQ_QUERY_JOBS = "QueryJobs"
63 REQ_QUERY_INSTANCES = "QueryInstances"
64 REQ_QUERY_NODES = "QueryNodes"
65 REQ_QUERY_GROUPS = "QueryGroups"
66 REQ_QUERY_NETWORKS = "QueryNetworks"
67 REQ_QUERY_EXPORTS = "QueryExports"
68 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
69 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
70 REQ_QUERY_TAGS = "QueryTags"
71 REQ_SET_DRAIN_FLAG = "SetDrainFlag"
72 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
73
74 #: List of all LUXI requests
75 REQ_ALL = compat.UniqueFrozenset([
76   REQ_ARCHIVE_JOB,
77   REQ_AUTO_ARCHIVE_JOBS,
78   REQ_CANCEL_JOB,
79   REQ_CHANGE_JOB_PRIORITY,
80   REQ_QUERY,
81   REQ_QUERY_CLUSTER_INFO,
82   REQ_QUERY_CONFIG_VALUES,
83   REQ_QUERY_EXPORTS,
84   REQ_QUERY_FIELDS,
85   REQ_QUERY_GROUPS,
86   REQ_QUERY_INSTANCES,
87   REQ_QUERY_JOBS,
88   REQ_QUERY_NODES,
89   REQ_QUERY_NETWORKS,
90   REQ_QUERY_TAGS,
91   REQ_SET_DRAIN_FLAG,
92   REQ_SET_WATCHER_PAUSE,
93   REQ_SUBMIT_JOB,
94   REQ_SUBMIT_MANY_JOBS,
95   REQ_WAIT_FOR_JOB_CHANGE,
96   ])
97
98 DEF_CTMO = 10
99 DEF_RWTO = 60
100
101 # WaitForJobChange timeout
102 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
103
104
105 class ProtocolError(errors.LuxiError):
106   """Denotes an error in the LUXI protocol."""
107
108
109 class ConnectionClosedError(ProtocolError):
110   """Connection closed error."""
111
112
113 class TimeoutError(ProtocolError):
114   """Operation timeout error."""
115
116
117 class RequestError(ProtocolError):
118   """Error on request.
119
120   This signifies an error in the request format or request handling,
121   but not (e.g.) an error in starting up an instance.
122
123   Some common conditions that can trigger this exception:
124     - job submission failed because the job data was wrong
125     - query failed because required fields were missing
126
127   """
128
129
130 class NoMasterError(ProtocolError):
131   """The master cannot be reached.
132
133   This means that the master daemon is not running or the socket has
134   been removed.
135
136   """
137
138
139 class PermissionError(ProtocolError):
140   """Permission denied while connecting to the master socket.
141
142   This means the user doesn't have the proper rights.
143
144   """
145
146
147 class Transport:
148   """Low-level transport class.
149
150   This is used on the client side.
151
152   This could be replace by any other class that provides the same
153   semantics to the Client. This means:
154     - can send messages and receive messages
155     - safe for multithreading
156
157   """
158
159   def __init__(self, address, timeouts=None):
160     """Constructor for the Client class.
161
162     Arguments:
163       - address: a valid address the the used transport class
164       - timeout: a list of timeouts, to be used on connect and read/write
165
166     There are two timeouts used since we might want to wait for a long
167     time for a response, but the connect timeout should be lower.
168
169     If not passed, we use a default of 10 and respectively 60 seconds.
170
171     Note that on reading data, since the timeout applies to an
172     invidual receive, it might be that the total duration is longer
173     than timeout value passed (we make a hard limit at twice the read
174     timeout).
175
176     """
177     self.address = address
178     if timeouts is None:
179       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
180     else:
181       self._ctimeout, self._rwtimeout = timeouts
182
183     self.socket = None
184     self._buffer = ""
185     self._msgs = collections.deque()
186
187     try:
188       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
189
190       # Try to connect
191       try:
192         utils.Retry(self._Connect, 1.0, self._ctimeout,
193                     args=(self.socket, address, self._ctimeout))
194       except utils.RetryTimeout:
195         raise TimeoutError("Connect timed out")
196
197       self.socket.settimeout(self._rwtimeout)
198     except (socket.error, NoMasterError):
199       if self.socket is not None:
200         self.socket.close()
201       self.socket = None
202       raise
203
204   @staticmethod
205   def _Connect(sock, address, timeout):
206     sock.settimeout(timeout)
207     try:
208       sock.connect(address)
209     except socket.timeout, err:
210       raise TimeoutError("Connect timed out: %s" % str(err))
211     except socket.error, err:
212       error_code = err.args[0]
213       if error_code in (errno.ENOENT, errno.ECONNREFUSED):
214         raise NoMasterError(address)
215       elif error_code in (errno.EPERM, errno.EACCES):
216         raise PermissionError(address)
217       elif error_code == errno.EAGAIN:
218         # Server's socket backlog is full at the moment
219         raise utils.RetryAgain()
220       raise
221
222   def _CheckSocket(self):
223     """Make sure we are connected.
224
225     """
226     if self.socket is None:
227       raise ProtocolError("Connection is closed")
228
229   def Send(self, msg):
230     """Send a message.
231
232     This just sends a message and doesn't wait for the response.
233
234     """
235     if constants.LUXI_EOM in msg:
236       raise ProtocolError("Message terminator found in payload")
237
238     self._CheckSocket()
239     try:
240       # TODO: sendall is not guaranteed to send everything
241       self.socket.sendall(msg + constants.LUXI_EOM)
242     except socket.timeout, err:
243       raise TimeoutError("Sending timeout: %s" % str(err))
244
245   def Recv(self):
246     """Try to receive a message from the socket.
247
248     In case we already have messages queued, we just return from the
249     queue. Otherwise, we try to read data with a _rwtimeout network
250     timeout, and making sure we don't go over 2x_rwtimeout as a global
251     limit.
252
253     """
254     self._CheckSocket()
255     etime = time.time() + self._rwtimeout
256     while not self._msgs:
257       if time.time() > etime:
258         raise TimeoutError("Extended receive timeout")
259       while True:
260         try:
261           data = self.socket.recv(4096)
262         except socket.timeout, err:
263           raise TimeoutError("Receive timeout: %s" % str(err))
264         except socket.error, err:
265           if err.args and err.args[0] == errno.EAGAIN:
266             continue
267           raise
268         break
269       if not data:
270         raise ConnectionClosedError("Connection closed while reading")
271       new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
272       self._buffer = new_msgs.pop()
273       self._msgs.extend(new_msgs)
274     return self._msgs.popleft()
275
276   def Call(self, msg):
277     """Send a message and wait for the response.
278
279     This is just a wrapper over Send and Recv.
280
281     """
282     self.Send(msg)
283     return self.Recv()
284
285   def Close(self):
286     """Close the socket"""
287     if self.socket is not None:
288       self.socket.close()
289       self.socket = None
290
291
292 def ParseRequest(msg):
293   """Parses a LUXI request message.
294
295   """
296   try:
297     request = serializer.LoadJson(msg)
298   except ValueError, err:
299     raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
300
301   logging.debug("LUXI request: %s", request)
302
303   if not isinstance(request, dict):
304     logging.error("LUXI request not a dict: %r", msg)
305     raise ProtocolError("Invalid LUXI request (not a dict)")
306
307   method = request.get(KEY_METHOD, None) # pylint: disable=E1103
308   args = request.get(KEY_ARGS, None) # pylint: disable=E1103
309   version = request.get(KEY_VERSION, None) # pylint: disable=E1103
310
311   if method is None or args is None:
312     logging.error("LUXI request missing method or arguments: %r", msg)
313     raise ProtocolError(("Invalid LUXI request (no method or arguments"
314                          " in request): %r") % msg)
315
316   return (method, args, version)
317
318
319 def ParseResponse(msg):
320   """Parses a LUXI response message.
321
322   """
323   # Parse the result
324   try:
325     data = serializer.LoadJson(msg)
326   except KeyboardInterrupt:
327     raise
328   except Exception, err:
329     raise ProtocolError("Error while deserializing response: %s" % str(err))
330
331   # Validate response
332   if not (isinstance(data, dict) and
333           KEY_SUCCESS in data and
334           KEY_RESULT in data):
335     raise ProtocolError("Invalid response from server: %r" % data)
336
337   return (data[KEY_SUCCESS], data[KEY_RESULT],
338           data.get(KEY_VERSION, None)) # pylint: disable=E1103
339
340
341 def FormatResponse(success, result, version=None):
342   """Formats a LUXI response message.
343
344   """
345   response = {
346     KEY_SUCCESS: success,
347     KEY_RESULT: result,
348     }
349
350   if version is not None:
351     response[KEY_VERSION] = version
352
353   logging.debug("LUXI response: %s", response)
354
355   return serializer.DumpJson(response)
356
357
358 def FormatRequest(method, args, version=None):
359   """Formats a LUXI request message.
360
361   """
362   # Build request
363   request = {
364     KEY_METHOD: method,
365     KEY_ARGS: args,
366     }
367
368   if version is not None:
369     request[KEY_VERSION] = version
370
371   # Serialize the request
372   return serializer.DumpJson(request)
373
374
375 def CallLuxiMethod(transport_cb, method, args, version=None):
376   """Send a LUXI request via a transport and return the response.
377
378   """
379   assert callable(transport_cb)
380
381   request_msg = FormatRequest(method, args, version=version)
382
383   # Send request and wait for response
384   response_msg = transport_cb(request_msg)
385
386   (success, result, resp_version) = ParseResponse(response_msg)
387
388   # Verify version if there was one in the response
389   if resp_version is not None and resp_version != version:
390     raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
391                            (version, resp_version))
392
393   if success:
394     return result
395
396   errors.MaybeRaise(result)
397   raise RequestError(result)
398
399
400 class Client(object):
401   """High-level client implementation.
402
403   This uses a backing Transport-like class on top of which it
404   implements data serialization/deserialization.
405
406   """
407   def __init__(self, address=None, timeouts=None, transport=Transport):
408     """Constructor for the Client class.
409
410     Arguments:
411       - address: a valid address the the used transport class
412       - timeout: a list of timeouts, to be used on connect and read/write
413       - transport: a Transport-like class
414
415
416     If timeout is not passed, the default timeouts of the transport
417     class are used.
418
419     """
420     if address is None:
421       address = pathutils.MASTER_SOCKET
422     self.address = address
423     self.timeouts = timeouts
424     self.transport_class = transport
425     self.transport = None
426     self._InitTransport()
427
428   def _InitTransport(self):
429     """(Re)initialize the transport if needed.
430
431     """
432     if self.transport is None:
433       self.transport = self.transport_class(self.address,
434                                             timeouts=self.timeouts)
435
436   def _CloseTransport(self):
437     """Close the transport, ignoring errors.
438
439     """
440     if self.transport is None:
441       return
442     try:
443       old_transp = self.transport
444       self.transport = None
445       old_transp.Close()
446     except Exception: # pylint: disable=W0703
447       pass
448
449   def _SendMethodCall(self, data):
450     # Send request and wait for response
451     try:
452       self._InitTransport()
453       return self.transport.Call(data)
454     except Exception:
455       self._CloseTransport()
456       raise
457
458   def Close(self):
459     """Close the underlying connection.
460
461     """
462     self._CloseTransport()
463
464   def CallMethod(self, method, args):
465     """Send a generic request and return the response.
466
467     """
468     if not isinstance(args, (list, tuple)):
469       raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
470                                    " expected list, got %s" % type(args))
471     return CallLuxiMethod(self._SendMethodCall, method, args,
472                           version=constants.LUXI_VERSION)
473
474   def SetQueueDrainFlag(self, drain_flag):
475     return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
476
477   def SetWatcherPause(self, until):
478     return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
479
480   def SubmitJob(self, ops):
481     ops_state = map(lambda op: op.__getstate__(), ops)
482     return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
483
484   def SubmitManyJobs(self, jobs):
485     jobs_state = []
486     for ops in jobs:
487       jobs_state.append([op.__getstate__() for op in ops])
488     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
489
490   def CancelJob(self, job_id):
491     return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
492
493   def ArchiveJob(self, job_id):
494     return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
495
496   def ChangeJobPriority(self, job_id, priority):
497     return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
498
499   def AutoArchiveJobs(self, age):
500     timeout = (DEF_RWTO - 1) / 2
501     return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
502
503   def WaitForJobChangeOnce(self, job_id, fields,
504                            prev_job_info, prev_log_serial,
505                            timeout=WFJC_TIMEOUT):
506     """Waits for changes on a job.
507
508     @param job_id: Job ID
509     @type fields: list
510     @param fields: List of field names to be observed
511     @type prev_job_info: None or list
512     @param prev_job_info: Previously received job information
513     @type prev_log_serial: None or int/long
514     @param prev_log_serial: Highest log serial number previously received
515     @type timeout: int/float
516     @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
517                     be capped to that value)
518
519     """
520     assert timeout >= 0, "Timeout can not be negative"
521     return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
522                            (job_id, fields, prev_job_info,
523                             prev_log_serial,
524                             min(WFJC_TIMEOUT, timeout)))
525
526   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
527     while True:
528       result = self.WaitForJobChangeOnce(job_id, fields,
529                                          prev_job_info, prev_log_serial)
530       if result != constants.JOB_NOTCHANGED:
531         break
532     return result
533
534   def Query(self, what, fields, qfilter):
535     """Query for resources/items.
536
537     @param what: One of L{constants.QR_VIA_LUXI}
538     @type fields: List of strings
539     @param fields: List of requested fields
540     @type qfilter: None or list
541     @param qfilter: Query filter
542     @rtype: L{objects.QueryResponse}
543
544     """
545     result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
546     return objects.QueryResponse.FromDict(result)
547
548   def QueryFields(self, what, fields):
549     """Query for available fields.
550
551     @param what: One of L{constants.QR_VIA_LUXI}
552     @type fields: None or list of strings
553     @param fields: List of requested fields
554     @rtype: L{objects.QueryFieldsResponse}
555
556     """
557     result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
558     return objects.QueryFieldsResponse.FromDict(result)
559
560   def QueryJobs(self, job_ids, fields):
561     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
562
563   def QueryInstances(self, names, fields, use_locking):
564     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
565
566   def QueryNodes(self, names, fields, use_locking):
567     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
568
569   def QueryGroups(self, names, fields, use_locking):
570     return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
571
572   def QueryNetworks(self, names, fields, use_locking):
573     return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
574
575   def QueryExports(self, nodes, use_locking):
576     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
577
578   def QueryClusterInfo(self):
579     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
580
581   def QueryConfigValues(self, fields):
582     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
583
584   def QueryTags(self, kind, name):
585     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))