Merge branch 'stable-2.6-hotplug' into stable-2.6-ippool-hotplug-esi
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36 import logging
37
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
42 from ganeti import objects
43
44
45 KEY_METHOD = "method"
46 KEY_ARGS = "args"
47 KEY_SUCCESS = "success"
48 KEY_RESULT = "result"
49 KEY_VERSION = "version"
50
51 REQ_SUBMIT_JOB = "SubmitJob"
52 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
53 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
54 REQ_CANCEL_JOB = "CancelJob"
55 REQ_ARCHIVE_JOB = "ArchiveJob"
56 REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
57 REQ_QUERY = "Query"
58 REQ_QUERY_FIELDS = "QueryFields"
59 REQ_QUERY_JOBS = "QueryJobs"
60 REQ_QUERY_INSTANCES = "QueryInstances"
61 REQ_QUERY_NODES = "QueryNodes"
62 REQ_QUERY_GROUPS = "QueryGroups"
63 REQ_QUERY_NETWORKS = "QueryNetworks"
64 REQ_QUERY_EXPORTS = "QueryExports"
65 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
66 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
67 REQ_QUERY_TAGS = "QueryTags"
68 REQ_SET_DRAIN_FLAG = "SetDrainFlag"
69 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
70
71 #: List of all LUXI requests
72 REQ_ALL = frozenset([
73   REQ_ARCHIVE_JOB,
74   REQ_AUTO_ARCHIVE_JOBS,
75   REQ_CANCEL_JOB,
76   REQ_QUERY,
77   REQ_QUERY_CLUSTER_INFO,
78   REQ_QUERY_CONFIG_VALUES,
79   REQ_QUERY_EXPORTS,
80   REQ_QUERY_FIELDS,
81   REQ_QUERY_GROUPS,
82   REQ_QUERY_INSTANCES,
83   REQ_QUERY_JOBS,
84   REQ_QUERY_NODES,
85   REQ_QUERY_TAGS,
86   REQ_SET_DRAIN_FLAG,
87   REQ_SET_WATCHER_PAUSE,
88   REQ_SUBMIT_JOB,
89   REQ_SUBMIT_MANY_JOBS,
90   REQ_WAIT_FOR_JOB_CHANGE,
91   ])
92
93 DEF_CTMO = 10
94 DEF_RWTO = 60
95
96 # WaitForJobChange timeout
97 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
98
99
100 class ProtocolError(errors.LuxiError):
101   """Denotes an error in the LUXI protocol."""
102
103
104 class ConnectionClosedError(ProtocolError):
105   """Connection closed error."""
106
107
108 class TimeoutError(ProtocolError):
109   """Operation timeout error."""
110
111
112 class RequestError(ProtocolError):
113   """Error on request.
114
115   This signifies an error in the request format or request handling,
116   but not (e.g.) an error in starting up an instance.
117
118   Some common conditions that can trigger this exception:
119     - job submission failed because the job data was wrong
120     - query failed because required fields were missing
121
122   """
123
124
125 class NoMasterError(ProtocolError):
126   """The master cannot be reached.
127
128   This means that the master daemon is not running or the socket has
129   been removed.
130
131   """
132
133
134 class PermissionError(ProtocolError):
135   """Permission denied while connecting to the master socket.
136
137   This means the user doesn't have the proper rights.
138
139   """
140
141
142 class Transport:
143   """Low-level transport class.
144
145   This is used on the client side.
146
147   This could be replace by any other class that provides the same
148   semantics to the Client. This means:
149     - can send messages and receive messages
150     - safe for multithreading
151
152   """
153
154   def __init__(self, address, timeouts=None):
155     """Constructor for the Client class.
156
157     Arguments:
158       - address: a valid address the the used transport class
159       - timeout: a list of timeouts, to be used on connect and read/write
160
161     There are two timeouts used since we might want to wait for a long
162     time for a response, but the connect timeout should be lower.
163
164     If not passed, we use a default of 10 and respectively 60 seconds.
165
166     Note that on reading data, since the timeout applies to an
167     invidual receive, it might be that the total duration is longer
168     than timeout value passed (we make a hard limit at twice the read
169     timeout).
170
171     """
172     self.address = address
173     if timeouts is None:
174       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
175     else:
176       self._ctimeout, self._rwtimeout = timeouts
177
178     self.socket = None
179     self._buffer = ""
180     self._msgs = collections.deque()
181
182     try:
183       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
184
185       # Try to connect
186       try:
187         utils.Retry(self._Connect, 1.0, self._ctimeout,
188                     args=(self.socket, address, self._ctimeout))
189       except utils.RetryTimeout:
190         raise TimeoutError("Connect timed out")
191
192       self.socket.settimeout(self._rwtimeout)
193     except (socket.error, NoMasterError):
194       if self.socket is not None:
195         self.socket.close()
196       self.socket = None
197       raise
198
199   @staticmethod
200   def _Connect(sock, address, timeout):
201     sock.settimeout(timeout)
202     try:
203       sock.connect(address)
204     except socket.timeout, err:
205       raise TimeoutError("Connect timed out: %s" % str(err))
206     except socket.error, err:
207       error_code = err.args[0]
208       if error_code in (errno.ENOENT, errno.ECONNREFUSED):
209         raise NoMasterError(address)
210       elif error_code in (errno.EPERM, errno.EACCES):
211         raise PermissionError(address)
212       elif error_code == errno.EAGAIN:
213         # Server's socket backlog is full at the moment
214         raise utils.RetryAgain()
215       raise
216
217   def _CheckSocket(self):
218     """Make sure we are connected.
219
220     """
221     if self.socket is None:
222       raise ProtocolError("Connection is closed")
223
224   def Send(self, msg):
225     """Send a message.
226
227     This just sends a message and doesn't wait for the response.
228
229     """
230     if constants.LUXI_EOM in msg:
231       raise ProtocolError("Message terminator found in payload")
232
233     self._CheckSocket()
234     try:
235       # TODO: sendall is not guaranteed to send everything
236       self.socket.sendall(msg + constants.LUXI_EOM)
237     except socket.timeout, err:
238       raise TimeoutError("Sending timeout: %s" % str(err))
239
240   def Recv(self):
241     """Try to receive a message from the socket.
242
243     In case we already have messages queued, we just return from the
244     queue. Otherwise, we try to read data with a _rwtimeout network
245     timeout, and making sure we don't go over 2x_rwtimeout as a global
246     limit.
247
248     """
249     self._CheckSocket()
250     etime = time.time() + self._rwtimeout
251     while not self._msgs:
252       if time.time() > etime:
253         raise TimeoutError("Extended receive timeout")
254       while True:
255         try:
256           data = self.socket.recv(4096)
257         except socket.timeout, err:
258           raise TimeoutError("Receive timeout: %s" % str(err))
259         except socket.error, err:
260           if err.args and err.args[0] == errno.EAGAIN:
261             continue
262           raise
263         break
264       if not data:
265         raise ConnectionClosedError("Connection closed while reading")
266       new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
267       self._buffer = new_msgs.pop()
268       self._msgs.extend(new_msgs)
269     return self._msgs.popleft()
270
271   def Call(self, msg):
272     """Send a message and wait for the response.
273
274     This is just a wrapper over Send and Recv.
275
276     """
277     self.Send(msg)
278     return self.Recv()
279
280   def Close(self):
281     """Close the socket"""
282     if self.socket is not None:
283       self.socket.close()
284       self.socket = None
285
286
287 def ParseRequest(msg):
288   """Parses a LUXI request message.
289
290   """
291   try:
292     request = serializer.LoadJson(msg)
293   except ValueError, err:
294     raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
295
296   logging.debug("LUXI request: %s", request)
297
298   if not isinstance(request, dict):
299     logging.error("LUXI request not a dict: %r", msg)
300     raise ProtocolError("Invalid LUXI request (not a dict)")
301
302   method = request.get(KEY_METHOD, None) # pylint: disable=E1103
303   args = request.get(KEY_ARGS, None) # pylint: disable=E1103
304   version = request.get(KEY_VERSION, None) # pylint: disable=E1103
305
306   if method is None or args is None:
307     logging.error("LUXI request missing method or arguments: %r", msg)
308     raise ProtocolError(("Invalid LUXI request (no method or arguments"
309                          " in request): %r") % msg)
310
311   return (method, args, version)
312
313
314 def ParseResponse(msg):
315   """Parses a LUXI response message.
316
317   """
318   # Parse the result
319   try:
320     data = serializer.LoadJson(msg)
321   except KeyboardInterrupt:
322     raise
323   except Exception, err:
324     raise ProtocolError("Error while deserializing response: %s" % str(err))
325
326   # Validate response
327   if not (isinstance(data, dict) and
328           KEY_SUCCESS in data and
329           KEY_RESULT in data):
330     raise ProtocolError("Invalid response from server: %r" % data)
331
332   return (data[KEY_SUCCESS], data[KEY_RESULT],
333           data.get(KEY_VERSION, None)) # pylint: disable=E1103
334
335
336 def FormatResponse(success, result, version=None):
337   """Formats a LUXI response message.
338
339   """
340   response = {
341     KEY_SUCCESS: success,
342     KEY_RESULT: result,
343     }
344
345   if version is not None:
346     response[KEY_VERSION] = version
347
348   logging.debug("LUXI response: %s", response)
349
350   return serializer.DumpJson(response)
351
352
353 def FormatRequest(method, args, version=None):
354   """Formats a LUXI request message.
355
356   """
357   # Build request
358   request = {
359     KEY_METHOD: method,
360     KEY_ARGS: args,
361     }
362
363   if version is not None:
364     request[KEY_VERSION] = version
365
366   # Serialize the request
367   return serializer.DumpJson(request)
368
369
370 def CallLuxiMethod(transport_cb, method, args, version=None):
371   """Send a LUXI request via a transport and return the response.
372
373   """
374   assert callable(transport_cb)
375
376   request_msg = FormatRequest(method, args, version=version)
377
378   # Send request and wait for response
379   response_msg = transport_cb(request_msg)
380
381   (success, result, resp_version) = ParseResponse(response_msg)
382
383   # Verify version if there was one in the response
384   if resp_version is not None and resp_version != version:
385     raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
386                            (version, resp_version))
387
388   if success:
389     return result
390
391   errors.MaybeRaise(result)
392   raise RequestError(result)
393
394
395 class Client(object):
396   """High-level client implementation.
397
398   This uses a backing Transport-like class on top of which it
399   implements data serialization/deserialization.
400
401   """
402   def __init__(self, address=None, timeouts=None, transport=Transport):
403     """Constructor for the Client class.
404
405     Arguments:
406       - address: a valid address the the used transport class
407       - timeout: a list of timeouts, to be used on connect and read/write
408       - transport: a Transport-like class
409
410
411     If timeout is not passed, the default timeouts of the transport
412     class are used.
413
414     """
415     if address is None:
416       address = constants.MASTER_SOCKET
417     self.address = address
418     self.timeouts = timeouts
419     self.transport_class = transport
420     self.transport = None
421     self._InitTransport()
422
423   def _InitTransport(self):
424     """(Re)initialize the transport if needed.
425
426     """
427     if self.transport is None:
428       self.transport = self.transport_class(self.address,
429                                             timeouts=self.timeouts)
430
431   def _CloseTransport(self):
432     """Close the transport, ignoring errors.
433
434     """
435     if self.transport is None:
436       return
437     try:
438       old_transp = self.transport
439       self.transport = None
440       old_transp.Close()
441     except Exception: # pylint: disable=W0703
442       pass
443
444   def _SendMethodCall(self, data):
445     # Send request and wait for response
446     try:
447       self._InitTransport()
448       return self.transport.Call(data)
449     except Exception:
450       self._CloseTransport()
451       raise
452
453   def Close(self):
454     """Close the underlying connection.
455
456     """
457     self._CloseTransport()
458
459   def CallMethod(self, method, args):
460     """Send a generic request and return the response.
461
462     """
463     if not isinstance(args, (list, tuple)):
464       raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
465                                    " expected list, got %s" % type(args))
466     return CallLuxiMethod(self._SendMethodCall, method, args,
467                           version=constants.LUXI_VERSION)
468
469   def SetQueueDrainFlag(self, drain_flag):
470     return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
471
472   def SetWatcherPause(self, until):
473     return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
474
475   def SubmitJob(self, ops):
476     ops_state = map(lambda op: op.__getstate__(), ops)
477     return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
478
479   def SubmitManyJobs(self, jobs):
480     jobs_state = []
481     for ops in jobs:
482       jobs_state.append([op.__getstate__() for op in ops])
483     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
484
485   def CancelJob(self, job_id):
486     return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
487
488   def ArchiveJob(self, job_id):
489     return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
490
491   def AutoArchiveJobs(self, age):
492     timeout = (DEF_RWTO - 1) / 2
493     return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
494
495   def WaitForJobChangeOnce(self, job_id, fields,
496                            prev_job_info, prev_log_serial,
497                            timeout=WFJC_TIMEOUT):
498     """Waits for changes on a job.
499
500     @param job_id: Job ID
501     @type fields: list
502     @param fields: List of field names to be observed
503     @type prev_job_info: None or list
504     @param prev_job_info: Previously received job information
505     @type prev_log_serial: None or int/long
506     @param prev_log_serial: Highest log serial number previously received
507     @type timeout: int/float
508     @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
509                     be capped to that value)
510
511     """
512     assert timeout >= 0, "Timeout can not be negative"
513     return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
514                            (job_id, fields, prev_job_info,
515                             prev_log_serial,
516                             min(WFJC_TIMEOUT, timeout)))
517
518   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
519     while True:
520       result = self.WaitForJobChangeOnce(job_id, fields,
521                                          prev_job_info, prev_log_serial)
522       if result != constants.JOB_NOTCHANGED:
523         break
524     return result
525
526   def Query(self, what, fields, qfilter):
527     """Query for resources/items.
528
529     @param what: One of L{constants.QR_VIA_LUXI}
530     @type fields: List of strings
531     @param fields: List of requested fields
532     @type qfilter: None or list
533     @param qfilter: Query filter
534     @rtype: L{objects.QueryResponse}
535
536     """
537     result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
538     return objects.QueryResponse.FromDict(result)
539
540   def QueryFields(self, what, fields):
541     """Query for available fields.
542
543     @param what: One of L{constants.QR_VIA_LUXI}
544     @type fields: None or list of strings
545     @param fields: List of requested fields
546     @rtype: L{objects.QueryFieldsResponse}
547
548     """
549     result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
550     return objects.QueryFieldsResponse.FromDict(result)
551
552   def QueryJobs(self, job_ids, fields):
553     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
554
555   def QueryInstances(self, names, fields, use_locking):
556     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
557
558   def QueryNodes(self, names, fields, use_locking):
559     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
560
561   def QueryGroups(self, names, fields, use_locking):
562     return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
563
564   def QueryNetworks(self, names, fields, use_locking):
565     return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
566
567   def QueryExports(self, nodes, use_locking):
568     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
569
570   def QueryClusterInfo(self):
571     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
572
573   def QueryConfigValues(self, fields):
574     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
575
576   def QueryTags(self, kind, name):
577     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))