Improve error message for when adding inotify watcher fails
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36 import logging
37
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
42 from ganeti import objects
43 from ganeti import pathutils
44
45
46 KEY_METHOD = "method"
47 KEY_ARGS = "args"
48 KEY_SUCCESS = "success"
49 KEY_RESULT = "result"
50 KEY_VERSION = "version"
51
52 REQ_SUBMIT_JOB = "SubmitJob"
53 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
54 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
55 REQ_CANCEL_JOB = "CancelJob"
56 REQ_ARCHIVE_JOB = "ArchiveJob"
57 REQ_CHANGE_JOB_PRIORITY = "ChangeJobPriority"
58 REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
59 REQ_QUERY = "Query"
60 REQ_QUERY_FIELDS = "QueryFields"
61 REQ_QUERY_JOBS = "QueryJobs"
62 REQ_QUERY_INSTANCES = "QueryInstances"
63 REQ_QUERY_NODES = "QueryNodes"
64 REQ_QUERY_GROUPS = "QueryGroups"
65 REQ_QUERY_NETWORKS = "QueryNetworks"
66 REQ_QUERY_EXPORTS = "QueryExports"
67 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
68 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
69 REQ_QUERY_TAGS = "QueryTags"
70 REQ_SET_DRAIN_FLAG = "SetDrainFlag"
71 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
72
73 #: List of all LUXI requests
74 REQ_ALL = frozenset([
75   REQ_ARCHIVE_JOB,
76   REQ_AUTO_ARCHIVE_JOBS,
77   REQ_CANCEL_JOB,
78   REQ_CHANGE_JOB_PRIORITY,
79   REQ_QUERY,
80   REQ_QUERY_CLUSTER_INFO,
81   REQ_QUERY_CONFIG_VALUES,
82   REQ_QUERY_EXPORTS,
83   REQ_QUERY_FIELDS,
84   REQ_QUERY_GROUPS,
85   REQ_QUERY_INSTANCES,
86   REQ_QUERY_JOBS,
87   REQ_QUERY_NODES,
88   REQ_QUERY_TAGS,
89   REQ_SET_DRAIN_FLAG,
90   REQ_SET_WATCHER_PAUSE,
91   REQ_SUBMIT_JOB,
92   REQ_SUBMIT_MANY_JOBS,
93   REQ_WAIT_FOR_JOB_CHANGE,
94   ])
95
96 DEF_CTMO = 10
97 DEF_RWTO = 60
98
99 # WaitForJobChange timeout
100 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
101
102
103 class ProtocolError(errors.LuxiError):
104   """Denotes an error in the LUXI protocol."""
105
106
107 class ConnectionClosedError(ProtocolError):
108   """Connection closed error."""
109
110
111 class TimeoutError(ProtocolError):
112   """Operation timeout error."""
113
114
115 class RequestError(ProtocolError):
116   """Error on request.
117
118   This signifies an error in the request format or request handling,
119   but not (e.g.) an error in starting up an instance.
120
121   Some common conditions that can trigger this exception:
122     - job submission failed because the job data was wrong
123     - query failed because required fields were missing
124
125   """
126
127
128 class NoMasterError(ProtocolError):
129   """The master cannot be reached.
130
131   This means that the master daemon is not running or the socket has
132   been removed.
133
134   """
135
136
137 class PermissionError(ProtocolError):
138   """Permission denied while connecting to the master socket.
139
140   This means the user doesn't have the proper rights.
141
142   """
143
144
145 class Transport:
146   """Low-level transport class.
147
148   This is used on the client side.
149
150   This could be replace by any other class that provides the same
151   semantics to the Client. This means:
152     - can send messages and receive messages
153     - safe for multithreading
154
155   """
156
157   def __init__(self, address, timeouts=None):
158     """Constructor for the Client class.
159
160     Arguments:
161       - address: a valid address the the used transport class
162       - timeout: a list of timeouts, to be used on connect and read/write
163
164     There are two timeouts used since we might want to wait for a long
165     time for a response, but the connect timeout should be lower.
166
167     If not passed, we use a default of 10 and respectively 60 seconds.
168
169     Note that on reading data, since the timeout applies to an
170     invidual receive, it might be that the total duration is longer
171     than timeout value passed (we make a hard limit at twice the read
172     timeout).
173
174     """
175     self.address = address
176     if timeouts is None:
177       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
178     else:
179       self._ctimeout, self._rwtimeout = timeouts
180
181     self.socket = None
182     self._buffer = ""
183     self._msgs = collections.deque()
184
185     try:
186       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
187
188       # Try to connect
189       try:
190         utils.Retry(self._Connect, 1.0, self._ctimeout,
191                     args=(self.socket, address, self._ctimeout))
192       except utils.RetryTimeout:
193         raise TimeoutError("Connect timed out")
194
195       self.socket.settimeout(self._rwtimeout)
196     except (socket.error, NoMasterError):
197       if self.socket is not None:
198         self.socket.close()
199       self.socket = None
200       raise
201
202   @staticmethod
203   def _Connect(sock, address, timeout):
204     sock.settimeout(timeout)
205     try:
206       sock.connect(address)
207     except socket.timeout, err:
208       raise TimeoutError("Connect timed out: %s" % str(err))
209     except socket.error, err:
210       error_code = err.args[0]
211       if error_code in (errno.ENOENT, errno.ECONNREFUSED):
212         raise NoMasterError(address)
213       elif error_code in (errno.EPERM, errno.EACCES):
214         raise PermissionError(address)
215       elif error_code == errno.EAGAIN:
216         # Server's socket backlog is full at the moment
217         raise utils.RetryAgain()
218       raise
219
220   def _CheckSocket(self):
221     """Make sure we are connected.
222
223     """
224     if self.socket is None:
225       raise ProtocolError("Connection is closed")
226
227   def Send(self, msg):
228     """Send a message.
229
230     This just sends a message and doesn't wait for the response.
231
232     """
233     if constants.LUXI_EOM in msg:
234       raise ProtocolError("Message terminator found in payload")
235
236     self._CheckSocket()
237     try:
238       # TODO: sendall is not guaranteed to send everything
239       self.socket.sendall(msg + constants.LUXI_EOM)
240     except socket.timeout, err:
241       raise TimeoutError("Sending timeout: %s" % str(err))
242
243   def Recv(self):
244     """Try to receive a message from the socket.
245
246     In case we already have messages queued, we just return from the
247     queue. Otherwise, we try to read data with a _rwtimeout network
248     timeout, and making sure we don't go over 2x_rwtimeout as a global
249     limit.
250
251     """
252     self._CheckSocket()
253     etime = time.time() + self._rwtimeout
254     while not self._msgs:
255       if time.time() > etime:
256         raise TimeoutError("Extended receive timeout")
257       while True:
258         try:
259           data = self.socket.recv(4096)
260         except socket.timeout, err:
261           raise TimeoutError("Receive timeout: %s" % str(err))
262         except socket.error, err:
263           if err.args and err.args[0] == errno.EAGAIN:
264             continue
265           raise
266         break
267       if not data:
268         raise ConnectionClosedError("Connection closed while reading")
269       new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
270       self._buffer = new_msgs.pop()
271       self._msgs.extend(new_msgs)
272     return self._msgs.popleft()
273
274   def Call(self, msg):
275     """Send a message and wait for the response.
276
277     This is just a wrapper over Send and Recv.
278
279     """
280     self.Send(msg)
281     return self.Recv()
282
283   def Close(self):
284     """Close the socket"""
285     if self.socket is not None:
286       self.socket.close()
287       self.socket = None
288
289
290 def ParseRequest(msg):
291   """Parses a LUXI request message.
292
293   """
294   try:
295     request = serializer.LoadJson(msg)
296   except ValueError, err:
297     raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
298
299   logging.debug("LUXI request: %s", request)
300
301   if not isinstance(request, dict):
302     logging.error("LUXI request not a dict: %r", msg)
303     raise ProtocolError("Invalid LUXI request (not a dict)")
304
305   method = request.get(KEY_METHOD, None) # pylint: disable=E1103
306   args = request.get(KEY_ARGS, None) # pylint: disable=E1103
307   version = request.get(KEY_VERSION, None) # pylint: disable=E1103
308
309   if method is None or args is None:
310     logging.error("LUXI request missing method or arguments: %r", msg)
311     raise ProtocolError(("Invalid LUXI request (no method or arguments"
312                          " in request): %r") % msg)
313
314   return (method, args, version)
315
316
317 def ParseResponse(msg):
318   """Parses a LUXI response message.
319
320   """
321   # Parse the result
322   try:
323     data = serializer.LoadJson(msg)
324   except KeyboardInterrupt:
325     raise
326   except Exception, err:
327     raise ProtocolError("Error while deserializing response: %s" % str(err))
328
329   # Validate response
330   if not (isinstance(data, dict) and
331           KEY_SUCCESS in data and
332           KEY_RESULT in data):
333     raise ProtocolError("Invalid response from server: %r" % data)
334
335   return (data[KEY_SUCCESS], data[KEY_RESULT],
336           data.get(KEY_VERSION, None)) # pylint: disable=E1103
337
338
339 def FormatResponse(success, result, version=None):
340   """Formats a LUXI response message.
341
342   """
343   response = {
344     KEY_SUCCESS: success,
345     KEY_RESULT: result,
346     }
347
348   if version is not None:
349     response[KEY_VERSION] = version
350
351   logging.debug("LUXI response: %s", response)
352
353   return serializer.DumpJson(response)
354
355
356 def FormatRequest(method, args, version=None):
357   """Formats a LUXI request message.
358
359   """
360   # Build request
361   request = {
362     KEY_METHOD: method,
363     KEY_ARGS: args,
364     }
365
366   if version is not None:
367     request[KEY_VERSION] = version
368
369   # Serialize the request
370   return serializer.DumpJson(request)
371
372
373 def CallLuxiMethod(transport_cb, method, args, version=None):
374   """Send a LUXI request via a transport and return the response.
375
376   """
377   assert callable(transport_cb)
378
379   request_msg = FormatRequest(method, args, version=version)
380
381   # Send request and wait for response
382   response_msg = transport_cb(request_msg)
383
384   (success, result, resp_version) = ParseResponse(response_msg)
385
386   # Verify version if there was one in the response
387   if resp_version is not None and resp_version != version:
388     raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
389                            (version, resp_version))
390
391   if success:
392     return result
393
394   errors.MaybeRaise(result)
395   raise RequestError(result)
396
397
398 class Client(object):
399   """High-level client implementation.
400
401   This uses a backing Transport-like class on top of which it
402   implements data serialization/deserialization.
403
404   """
405   def __init__(self, address=None, timeouts=None, transport=Transport):
406     """Constructor for the Client class.
407
408     Arguments:
409       - address: a valid address the the used transport class
410       - timeout: a list of timeouts, to be used on connect and read/write
411       - transport: a Transport-like class
412
413
414     If timeout is not passed, the default timeouts of the transport
415     class are used.
416
417     """
418     if address is None:
419       address = pathutils.MASTER_SOCKET
420     self.address = address
421     self.timeouts = timeouts
422     self.transport_class = transport
423     self.transport = None
424     self._InitTransport()
425
426   def _InitTransport(self):
427     """(Re)initialize the transport if needed.
428
429     """
430     if self.transport is None:
431       self.transport = self.transport_class(self.address,
432                                             timeouts=self.timeouts)
433
434   def _CloseTransport(self):
435     """Close the transport, ignoring errors.
436
437     """
438     if self.transport is None:
439       return
440     try:
441       old_transp = self.transport
442       self.transport = None
443       old_transp.Close()
444     except Exception: # pylint: disable=W0703
445       pass
446
447   def _SendMethodCall(self, data):
448     # Send request and wait for response
449     try:
450       self._InitTransport()
451       return self.transport.Call(data)
452     except Exception:
453       self._CloseTransport()
454       raise
455
456   def Close(self):
457     """Close the underlying connection.
458
459     """
460     self._CloseTransport()
461
462   def CallMethod(self, method, args):
463     """Send a generic request and return the response.
464
465     """
466     if not isinstance(args, (list, tuple)):
467       raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
468                                    " expected list, got %s" % type(args))
469     return CallLuxiMethod(self._SendMethodCall, method, args,
470                           version=constants.LUXI_VERSION)
471
472   def SetQueueDrainFlag(self, drain_flag):
473     return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
474
475   def SetWatcherPause(self, until):
476     return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
477
478   def SubmitJob(self, ops):
479     ops_state = map(lambda op: op.__getstate__(), ops)
480     return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
481
482   def SubmitManyJobs(self, jobs):
483     jobs_state = []
484     for ops in jobs:
485       jobs_state.append([op.__getstate__() for op in ops])
486     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
487
488   def CancelJob(self, job_id):
489     return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
490
491   def ArchiveJob(self, job_id):
492     return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
493
494   def ChangeJobPriority(self, job_id, priority):
495     return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
496
497   def AutoArchiveJobs(self, age):
498     timeout = (DEF_RWTO - 1) / 2
499     return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
500
501   def WaitForJobChangeOnce(self, job_id, fields,
502                            prev_job_info, prev_log_serial,
503                            timeout=WFJC_TIMEOUT):
504     """Waits for changes on a job.
505
506     @param job_id: Job ID
507     @type fields: list
508     @param fields: List of field names to be observed
509     @type prev_job_info: None or list
510     @param prev_job_info: Previously received job information
511     @type prev_log_serial: None or int/long
512     @param prev_log_serial: Highest log serial number previously received
513     @type timeout: int/float
514     @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
515                     be capped to that value)
516
517     """
518     assert timeout >= 0, "Timeout can not be negative"
519     return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
520                            (job_id, fields, prev_job_info,
521                             prev_log_serial,
522                             min(WFJC_TIMEOUT, timeout)))
523
524   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
525     while True:
526       result = self.WaitForJobChangeOnce(job_id, fields,
527                                          prev_job_info, prev_log_serial)
528       if result != constants.JOB_NOTCHANGED:
529         break
530     return result
531
532   def Query(self, what, fields, qfilter):
533     """Query for resources/items.
534
535     @param what: One of L{constants.QR_VIA_LUXI}
536     @type fields: List of strings
537     @param fields: List of requested fields
538     @type qfilter: None or list
539     @param qfilter: Query filter
540     @rtype: L{objects.QueryResponse}
541
542     """
543     result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
544     return objects.QueryResponse.FromDict(result)
545
546   def QueryFields(self, what, fields):
547     """Query for available fields.
548
549     @param what: One of L{constants.QR_VIA_LUXI}
550     @type fields: None or list of strings
551     @param fields: List of requested fields
552     @rtype: L{objects.QueryFieldsResponse}
553
554     """
555     result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
556     return objects.QueryFieldsResponse.FromDict(result)
557
558   def QueryJobs(self, job_ids, fields):
559     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
560
561   def QueryInstances(self, names, fields, use_locking):
562     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
563
564   def QueryNodes(self, names, fields, use_locking):
565     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
566
567   def QueryGroups(self, names, fields, use_locking):
568     return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
569
570   def QueryNetworks(self, names, fields, use_locking):
571     return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
572
573   def QueryExports(self, nodes, use_locking):
574     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
575
576   def QueryClusterInfo(self):
577     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
578
579   def QueryConfigValues(self, fields):
580     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
581
582   def QueryTags(self, kind, name):
583     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))