Merge branch 'devel-2.3'
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36 import logging
37
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
42 from ganeti import objects
43
44
45 KEY_METHOD = "method"
46 KEY_ARGS = "args"
47 KEY_SUCCESS = "success"
48 KEY_RESULT = "result"
49 KEY_VERSION = "version"
50
51 REQ_SUBMIT_JOB = "SubmitJob"
52 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
53 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
54 REQ_CANCEL_JOB = "CancelJob"
55 REQ_ARCHIVE_JOB = "ArchiveJob"
56 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
57 REQ_QUERY = "Query"
58 REQ_QUERY_FIELDS = "QueryFields"
59 REQ_QUERY_JOBS = "QueryJobs"
60 REQ_QUERY_INSTANCES = "QueryInstances"
61 REQ_QUERY_NODES = "QueryNodes"
62 REQ_QUERY_GROUPS = "QueryGroups"
63 REQ_QUERY_EXPORTS = "QueryExports"
64 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
65 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
66 REQ_QUERY_TAGS = "QueryTags"
67 REQ_QUERY_LOCKS = "QueryLocks"
68 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
69 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
70
71 DEF_CTMO = 10
72 DEF_RWTO = 60
73
74 # WaitForJobChange timeout
75 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
76
77
78 class ProtocolError(errors.LuxiError):
79   """Denotes an error in the LUXI protocol."""
80
81
82 class ConnectionClosedError(ProtocolError):
83   """Connection closed error."""
84
85
86 class TimeoutError(ProtocolError):
87   """Operation timeout error."""
88
89
90 class RequestError(ProtocolError):
91   """Error on request.
92
93   This signifies an error in the request format or request handling,
94   but not (e.g.) an error in starting up an instance.
95
96   Some common conditions that can trigger this exception:
97     - job submission failed because the job data was wrong
98     - query failed because required fields were missing
99
100   """
101
102
103 class NoMasterError(ProtocolError):
104   """The master cannot be reached.
105
106   This means that the master daemon is not running or the socket has
107   been removed.
108
109   """
110
111
112 class PermissionError(ProtocolError):
113   """Permission denied while connecting to the master socket.
114
115   This means the user doesn't have the proper rights.
116
117   """
118
119
120 class Transport:
121   """Low-level transport class.
122
123   This is used on the client side.
124
125   This could be replace by any other class that provides the same
126   semantics to the Client. This means:
127     - can send messages and receive messages
128     - safe for multithreading
129
130   """
131
132   def __init__(self, address, timeouts=None):
133     """Constructor for the Client class.
134
135     Arguments:
136       - address: a valid address the the used transport class
137       - timeout: a list of timeouts, to be used on connect and read/write
138
139     There are two timeouts used since we might want to wait for a long
140     time for a response, but the connect timeout should be lower.
141
142     If not passed, we use a default of 10 and respectively 60 seconds.
143
144     Note that on reading data, since the timeout applies to an
145     invidual receive, it might be that the total duration is longer
146     than timeout value passed (we make a hard limit at twice the read
147     timeout).
148
149     """
150     self.address = address
151     if timeouts is None:
152       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
153     else:
154       self._ctimeout, self._rwtimeout = timeouts
155
156     self.socket = None
157     self._buffer = ""
158     self._msgs = collections.deque()
159
160     try:
161       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
162
163       # Try to connect
164       try:
165         utils.Retry(self._Connect, 1.0, self._ctimeout,
166                     args=(self.socket, address, self._ctimeout))
167       except utils.RetryTimeout:
168         raise TimeoutError("Connect timed out")
169
170       self.socket.settimeout(self._rwtimeout)
171     except (socket.error, NoMasterError):
172       if self.socket is not None:
173         self.socket.close()
174       self.socket = None
175       raise
176
177   @staticmethod
178   def _Connect(sock, address, timeout):
179     sock.settimeout(timeout)
180     try:
181       sock.connect(address)
182     except socket.timeout, err:
183       raise TimeoutError("Connect timed out: %s" % str(err))
184     except socket.error, err:
185       error_code = err.args[0]
186       if error_code in (errno.ENOENT, errno.ECONNREFUSED):
187         raise NoMasterError(address)
188       elif error_code in (errno.EPERM, errno.EACCES):
189         raise PermissionError(address)
190       elif error_code == errno.EAGAIN:
191         # Server's socket backlog is full at the moment
192         raise utils.RetryAgain()
193       raise
194
195   def _CheckSocket(self):
196     """Make sure we are connected.
197
198     """
199     if self.socket is None:
200       raise ProtocolError("Connection is closed")
201
202   def Send(self, msg):
203     """Send a message.
204
205     This just sends a message and doesn't wait for the response.
206
207     """
208     if constants.LUXI_EOM in msg:
209       raise ProtocolError("Message terminator found in payload")
210
211     self._CheckSocket()
212     try:
213       # TODO: sendall is not guaranteed to send everything
214       self.socket.sendall(msg + constants.LUXI_EOM)
215     except socket.timeout, err:
216       raise TimeoutError("Sending timeout: %s" % str(err))
217
218   def Recv(self):
219     """Try to receive a message from the socket.
220
221     In case we already have messages queued, we just return from the
222     queue. Otherwise, we try to read data with a _rwtimeout network
223     timeout, and making sure we don't go over 2x_rwtimeout as a global
224     limit.
225
226     """
227     self._CheckSocket()
228     etime = time.time() + self._rwtimeout
229     while not self._msgs:
230       if time.time() > etime:
231         raise TimeoutError("Extended receive timeout")
232       while True:
233         try:
234           data = self.socket.recv(4096)
235         except socket.timeout, err:
236           raise TimeoutError("Receive timeout: %s" % str(err))
237         except socket.error, err:
238           if err.args and err.args[0] == errno.EAGAIN:
239             continue
240           raise
241         break
242       if not data:
243         raise ConnectionClosedError("Connection closed while reading")
244       new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
245       self._buffer = new_msgs.pop()
246       self._msgs.extend(new_msgs)
247     return self._msgs.popleft()
248
249   def Call(self, msg):
250     """Send a message and wait for the response.
251
252     This is just a wrapper over Send and Recv.
253
254     """
255     self.Send(msg)
256     return self.Recv()
257
258   def Close(self):
259     """Close the socket"""
260     if self.socket is not None:
261       self.socket.close()
262       self.socket = None
263
264
265 def ParseRequest(msg):
266   """Parses a LUXI request message.
267
268   """
269   try:
270     request = serializer.LoadJson(msg)
271   except ValueError, err:
272     raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
273
274   logging.debug("LUXI request: %s", request)
275
276   if not isinstance(request, dict):
277     logging.error("LUXI request not a dict: %r", msg)
278     raise ProtocolError("Invalid LUXI request (not a dict)")
279
280   method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
281   args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
282   version = request.get(KEY_VERSION, None) # pylint: disable-msg=E1103
283
284   if method is None or args is None:
285     logging.error("LUXI request missing method or arguments: %r", msg)
286     raise ProtocolError(("Invalid LUXI request (no method or arguments"
287                          " in request): %r") % msg)
288
289   return (method, args, version)
290
291
292 def ParseResponse(msg):
293   """Parses a LUXI response message.
294
295   """
296   # Parse the result
297   try:
298     data = serializer.LoadJson(msg)
299   except Exception, err:
300     raise ProtocolError("Error while deserializing response: %s" % str(err))
301
302   # Validate response
303   if not (isinstance(data, dict) and
304           KEY_SUCCESS in data and
305           KEY_RESULT in data):
306     raise ProtocolError("Invalid response from server: %r" % data)
307
308   return (data[KEY_SUCCESS], data[KEY_RESULT],
309           data.get(KEY_VERSION, None)) # pylint: disable-msg=E1103
310
311
312 def FormatResponse(success, result, version=None):
313   """Formats a LUXI response message.
314
315   """
316   response = {
317     KEY_SUCCESS: success,
318     KEY_RESULT: result,
319     }
320
321   if version is not None:
322     response[KEY_VERSION] = version
323
324   logging.debug("LUXI response: %s", response)
325
326   return serializer.DumpJson(response)
327
328
329 def FormatRequest(method, args, version=None):
330   """Formats a LUXI request message.
331
332   """
333   # Build request
334   request = {
335     KEY_METHOD: method,
336     KEY_ARGS: args,
337     }
338
339   if version is not None:
340     request[KEY_VERSION] = version
341
342   # Serialize the request
343   return serializer.DumpJson(request, indent=False)
344
345
346 def CallLuxiMethod(transport_cb, method, args, version=None):
347   """Send a LUXI request via a transport and return the response.
348
349   """
350   assert callable(transport_cb)
351
352   request_msg = FormatRequest(method, args, version=version)
353
354   # Send request and wait for response
355   response_msg = transport_cb(request_msg)
356
357   (success, result, resp_version) = ParseResponse(response_msg)
358
359   # Verify version if there was one in the response
360   if resp_version is not None and resp_version != version:
361     raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
362                            (version, resp_version))
363
364   if success:
365     return result
366
367   errors.MaybeRaise(result)
368   raise RequestError(result)
369
370
371 class Client(object):
372   """High-level client implementation.
373
374   This uses a backing Transport-like class on top of which it
375   implements data serialization/deserialization.
376
377   """
378   def __init__(self, address=None, timeouts=None, transport=Transport):
379     """Constructor for the Client class.
380
381     Arguments:
382       - address: a valid address the the used transport class
383       - timeout: a list of timeouts, to be used on connect and read/write
384       - transport: a Transport-like class
385
386
387     If timeout is not passed, the default timeouts of the transport
388     class are used.
389
390     """
391     if address is None:
392       address = constants.MASTER_SOCKET
393     self.address = address
394     self.timeouts = timeouts
395     self.transport_class = transport
396     self.transport = None
397     self._InitTransport()
398
399   def _InitTransport(self):
400     """(Re)initialize the transport if needed.
401
402     """
403     if self.transport is None:
404       self.transport = self.transport_class(self.address,
405                                             timeouts=self.timeouts)
406
407   def _CloseTransport(self):
408     """Close the transport, ignoring errors.
409
410     """
411     if self.transport is None:
412       return
413     try:
414       old_transp = self.transport
415       self.transport = None
416       old_transp.Close()
417     except Exception: # pylint: disable-msg=W0703
418       pass
419
420   def _SendMethodCall(self, data):
421     # Send request and wait for response
422     try:
423       self._InitTransport()
424       return self.transport.Call(data)
425     except Exception:
426       self._CloseTransport()
427       raise
428
429   def CallMethod(self, method, args):
430     """Send a generic request and return the response.
431
432     """
433     return CallLuxiMethod(self._SendMethodCall, method, args,
434                           version=constants.LUXI_VERSION)
435
436   def SetQueueDrainFlag(self, drain_flag):
437     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
438
439   def SetWatcherPause(self, until):
440     return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
441
442   def SubmitJob(self, ops):
443     ops_state = map(lambda op: op.__getstate__(), ops)
444     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
445
446   def SubmitManyJobs(self, jobs):
447     jobs_state = []
448     for ops in jobs:
449       jobs_state.append([op.__getstate__() for op in ops])
450     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
451
452   def CancelJob(self, job_id):
453     return self.CallMethod(REQ_CANCEL_JOB, job_id)
454
455   def ArchiveJob(self, job_id):
456     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
457
458   def AutoArchiveJobs(self, age):
459     timeout = (DEF_RWTO - 1) / 2
460     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
461
462   def WaitForJobChangeOnce(self, job_id, fields,
463                            prev_job_info, prev_log_serial,
464                            timeout=WFJC_TIMEOUT):
465     """Waits for changes on a job.
466
467     @param job_id: Job ID
468     @type fields: list
469     @param fields: List of field names to be observed
470     @type prev_job_info: None or list
471     @param prev_job_info: Previously received job information
472     @type prev_log_serial: None or int/long
473     @param prev_log_serial: Highest log serial number previously received
474     @type timeout: int/float
475     @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
476                     be capped to that value)
477
478     """
479     assert timeout >= 0, "Timeout can not be negative"
480     return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
481                            (job_id, fields, prev_job_info,
482                             prev_log_serial,
483                             min(WFJC_TIMEOUT, timeout)))
484
485   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
486     while True:
487       result = self.WaitForJobChangeOnce(job_id, fields,
488                                          prev_job_info, prev_log_serial)
489       if result != constants.JOB_NOTCHANGED:
490         break
491     return result
492
493   def Query(self, what, fields, filter_):
494     """Query for resources/items.
495
496     @param what: One of L{constants.QR_OP_LUXI}
497     @type fields: List of strings
498     @param fields: List of requested fields
499     @type filter_: None or list
500     @param filter_: Query filter
501     @rtype: L{objects.QueryResponse}
502
503     """
504     req = objects.QueryRequest(what=what, fields=fields, filter=filter_)
505     result = self.CallMethod(REQ_QUERY, req.ToDict())
506     return objects.QueryResponse.FromDict(result)
507
508   def QueryFields(self, what, fields):
509     """Query for available fields.
510
511     @param what: One of L{constants.QR_OP_LUXI}
512     @type fields: None or list of strings
513     @param fields: List of requested fields
514     @rtype: L{objects.QueryFieldsResponse}
515
516     """
517     req = objects.QueryFieldsRequest(what=what, fields=fields)
518     result = self.CallMethod(REQ_QUERY_FIELDS, req.ToDict())
519     return objects.QueryFieldsResponse.FromDict(result)
520
521   def QueryJobs(self, job_ids, fields):
522     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
523
524   def QueryInstances(self, names, fields, use_locking):
525     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
526
527   def QueryNodes(self, names, fields, use_locking):
528     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
529
530   def QueryGroups(self, names, fields, use_locking):
531     return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
532
533   def QueryExports(self, nodes, use_locking):
534     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
535
536   def QueryClusterInfo(self):
537     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
538
539   def QueryConfigValues(self, fields):
540     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
541
542   def QueryTags(self, kind, name):
543     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
544
545   def QueryLocks(self, fields, sync):
546     return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))