Merge branch 'devel-2.1'
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36 import logging
37
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
42
43
44 KEY_METHOD = "method"
45 KEY_ARGS = "args"
46 KEY_SUCCESS = "success"
47 KEY_RESULT = "result"
48
49 REQ_SUBMIT_JOB = "SubmitJob"
50 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
51 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
52 REQ_CANCEL_JOB = "CancelJob"
53 REQ_ARCHIVE_JOB = "ArchiveJob"
54 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
55 REQ_QUERY_JOBS = "QueryJobs"
56 REQ_QUERY_INSTANCES = "QueryInstances"
57 REQ_QUERY_NODES = "QueryNodes"
58 REQ_QUERY_EXPORTS = "QueryExports"
59 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
60 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
61 REQ_QUERY_TAGS = "QueryTags"
62 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
63 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
64
65 DEF_CTMO = 10
66 DEF_RWTO = 60
67
68 # WaitForJobChange timeout
69 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
70
71
72 class ProtocolError(errors.GenericError):
73   """Denotes an error in the LUXI protocol"""
74
75
76 class ConnectionClosedError(ProtocolError):
77   """Connection closed error"""
78
79
80 class TimeoutError(ProtocolError):
81   """Operation timeout error"""
82
83
84 class RequestError(ProtocolError):
85   """Error on request
86
87   This signifies an error in the request format or request handling,
88   but not (e.g.) an error in starting up an instance.
89
90   Some common conditions that can trigger this exception:
91     - job submission failed because the job data was wrong
92     - query failed because required fields were missing
93
94   """
95
96
97 class NoMasterError(ProtocolError):
98   """The master cannot be reached
99
100   This means that the master daemon is not running or the socket has
101   been removed.
102
103   """
104
105
106 class Transport:
107   """Low-level transport class.
108
109   This is used on the client side.
110
111   This could be replace by any other class that provides the same
112   semantics to the Client. This means:
113     - can send messages and receive messages
114     - safe for multithreading
115
116   """
117
118   def __init__(self, address, timeouts=None):
119     """Constructor for the Client class.
120
121     Arguments:
122       - address: a valid address the the used transport class
123       - timeout: a list of timeouts, to be used on connect and read/write
124
125     There are two timeouts used since we might want to wait for a long
126     time for a response, but the connect timeout should be lower.
127
128     If not passed, we use a default of 10 and respectively 60 seconds.
129
130     Note that on reading data, since the timeout applies to an
131     invidual receive, it might be that the total duration is longer
132     than timeout value passed (we make a hard limit at twice the read
133     timeout).
134
135     """
136     self.address = address
137     if timeouts is None:
138       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
139     else:
140       self._ctimeout, self._rwtimeout = timeouts
141
142     self.socket = None
143     self._buffer = ""
144     self._msgs = collections.deque()
145
146     try:
147       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
148
149       # Try to connect
150       try:
151         utils.Retry(self._Connect, 1.0, self._ctimeout,
152                     args=(self.socket, address, self._ctimeout))
153       except utils.RetryTimeout:
154         raise TimeoutError("Connect timed out")
155
156       self.socket.settimeout(self._rwtimeout)
157     except (socket.error, NoMasterError):
158       if self.socket is not None:
159         self.socket.close()
160       self.socket = None
161       raise
162
163   @staticmethod
164   def _Connect(sock, address, timeout):
165     sock.settimeout(timeout)
166     try:
167       sock.connect(address)
168     except socket.timeout, err:
169       raise TimeoutError("Connect timed out: %s" % str(err))
170     except socket.error, err:
171       if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
172         raise NoMasterError(address)
173       if err.args[0] == errno.EAGAIN:
174         # Server's socket backlog is full at the moment
175         raise utils.RetryAgain()
176       raise
177
178   def _CheckSocket(self):
179     """Make sure we are connected.
180
181     """
182     if self.socket is None:
183       raise ProtocolError("Connection is closed")
184
185   def Send(self, msg):
186     """Send a message.
187
188     This just sends a message and doesn't wait for the response.
189
190     """
191     if constants.LUXI_EOM in msg:
192       raise ProtocolError("Message terminator found in payload")
193
194     self._CheckSocket()
195     try:
196       # TODO: sendall is not guaranteed to send everything
197       self.socket.sendall(msg + constants.LUXI_EOM)
198     except socket.timeout, err:
199       raise TimeoutError("Sending timeout: %s" % str(err))
200
201   def Recv(self):
202     """Try to receive a message from the socket.
203
204     In case we already have messages queued, we just return from the
205     queue. Otherwise, we try to read data with a _rwtimeout network
206     timeout, and making sure we don't go over 2x_rwtimeout as a global
207     limit.
208
209     """
210     self._CheckSocket()
211     etime = time.time() + self._rwtimeout
212     while not self._msgs:
213       if time.time() > etime:
214         raise TimeoutError("Extended receive timeout")
215       while True:
216         try:
217           data = self.socket.recv(4096)
218         except socket.error, err:
219           if err.args and err.args[0] == errno.EAGAIN:
220             continue
221           raise
222         except socket.timeout, err:
223           raise TimeoutError("Receive timeout: %s" % str(err))
224         break
225       if not data:
226         raise ConnectionClosedError("Connection closed while reading")
227       new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
228       self._buffer = new_msgs.pop()
229       self._msgs.extend(new_msgs)
230     return self._msgs.popleft()
231
232   def Call(self, msg):
233     """Send a message and wait for the response.
234
235     This is just a wrapper over Send and Recv.
236
237     """
238     self.Send(msg)
239     return self.Recv()
240
241   def Close(self):
242     """Close the socket"""
243     if self.socket is not None:
244       self.socket.close()
245       self.socket = None
246
247
248 def ParseRequest(msg):
249   """Parses a LUXI request message.
250
251   """
252   try:
253     request = serializer.LoadJson(msg)
254   except ValueError, err:
255     raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
256
257   logging.debug("LUXI request: %s", request)
258
259   if not isinstance(request, dict):
260     logging.error("LUXI request not a dict: %r", msg)
261     raise ProtocolError("Invalid LUXI request (not a dict)")
262
263   method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
264   args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
265
266   if method is None or args is None:
267     logging.error("LUXI request missing method or arguments: %r", msg)
268     raise ProtocolError(("Invalid LUXI request (no method or arguments"
269                          " in request): %r") % msg)
270
271   return (method, args)
272
273
274 def ParseResponse(msg):
275   """Parses a LUXI response message.
276
277   """
278   # Parse the result
279   try:
280     data = serializer.LoadJson(msg)
281   except Exception, err:
282     raise ProtocolError("Error while deserializing response: %s" % str(err))
283
284   # Validate response
285   if not (isinstance(data, dict) and
286           KEY_SUCCESS in data and
287           KEY_RESULT in data):
288     raise ProtocolError("Invalid response from server: %r" % data)
289
290   return (data[KEY_SUCCESS], data[KEY_RESULT])
291
292
293 def FormatResponse(success, result):
294   """Formats a LUXI response message.
295
296   """
297   response = {
298     KEY_SUCCESS: success,
299     KEY_RESULT: result,
300     }
301
302   logging.debug("LUXI response: %s", response)
303
304   return serializer.DumpJson(response)
305
306
307 def FormatRequest(method, args):
308   """Formats a LUXI request message.
309
310   """
311   # Build request
312   request = {
313     KEY_METHOD: method,
314     KEY_ARGS: args,
315     }
316
317   # Serialize the request
318   return serializer.DumpJson(request, indent=False)
319
320
321 def CallLuxiMethod(transport_cb, method, args):
322   """Send a LUXI request via a transport and return the response.
323
324   """
325   assert callable(transport_cb)
326
327   request_msg = FormatRequest(method, args)
328
329   # Send request and wait for response
330   response_msg = transport_cb(request_msg)
331
332   (success, result) = ParseResponse(response_msg)
333
334   if success:
335     return result
336
337   errors.MaybeRaise(result)
338   raise RequestError(result)
339
340
341 class Client(object):
342   """High-level client implementation.
343
344   This uses a backing Transport-like class on top of which it
345   implements data serialization/deserialization.
346
347   """
348   def __init__(self, address=None, timeouts=None, transport=Transport):
349     """Constructor for the Client class.
350
351     Arguments:
352       - address: a valid address the the used transport class
353       - timeout: a list of timeouts, to be used on connect and read/write
354       - transport: a Transport-like class
355
356
357     If timeout is not passed, the default timeouts of the transport
358     class are used.
359
360     """
361     if address is None:
362       address = constants.MASTER_SOCKET
363     self.address = address
364     self.timeouts = timeouts
365     self.transport_class = transport
366     self.transport = None
367     self._InitTransport()
368
369   def _InitTransport(self):
370     """(Re)initialize the transport if needed.
371
372     """
373     if self.transport is None:
374       self.transport = self.transport_class(self.address,
375                                             timeouts=self.timeouts)
376
377   def _CloseTransport(self):
378     """Close the transport, ignoring errors.
379
380     """
381     if self.transport is None:
382       return
383     try:
384       old_transp = self.transport
385       self.transport = None
386       old_transp.Close()
387     except Exception: # pylint: disable-msg=W0703
388       pass
389
390   def _SendMethodCall(self, data):
391     # Send request and wait for response
392     try:
393       self._InitTransport()
394       return self.transport.Call(data)
395     except Exception:
396       self._CloseTransport()
397       raise
398
399   def CallMethod(self, method, args):
400     """Send a generic request and return the response.
401
402     """
403     return CallLuxiMethod(self._SendMethodCall, method, args)
404
405   def SetQueueDrainFlag(self, drain_flag):
406     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
407
408   def SetWatcherPause(self, until):
409     return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
410
411   def SubmitJob(self, ops):
412     ops_state = map(lambda op: op.__getstate__(), ops)
413     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
414
415   def SubmitManyJobs(self, jobs):
416     jobs_state = []
417     for ops in jobs:
418       jobs_state.append([op.__getstate__() for op in ops])
419     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
420
421   def CancelJob(self, job_id):
422     return self.CallMethod(REQ_CANCEL_JOB, job_id)
423
424   def ArchiveJob(self, job_id):
425     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
426
427   def AutoArchiveJobs(self, age):
428     timeout = (DEF_RWTO - 1) / 2
429     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
430
431   def WaitForJobChangeOnce(self, job_id, fields,
432                            prev_job_info, prev_log_serial,
433                            timeout=WFJC_TIMEOUT):
434     """Waits for changes on a job.
435
436     @param job_id: Job ID
437     @type fields: list
438     @param fields: List of field names to be observed
439     @type prev_job_info: None or list
440     @param prev_job_info: Previously received job information
441     @type prev_log_serial: None or int/long
442     @param prev_log_serial: Highest log serial number previously received
443     @type timeout: int/float
444     @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
445                     be capped to that value)
446
447     """
448     assert timeout >= 0, "Timeout can not be negative"
449     return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
450                            (job_id, fields, prev_job_info,
451                             prev_log_serial,
452                             min(WFJC_TIMEOUT, timeout)))
453
454   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
455     while True:
456       result = self.WaitForJobChangeOnce(job_id, fields,
457                                          prev_job_info, prev_log_serial)
458       if result != constants.JOB_NOTCHANGED:
459         break
460     return result
461
462   def QueryJobs(self, job_ids, fields):
463     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
464
465   def QueryInstances(self, names, fields, use_locking):
466     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
467
468   def QueryNodes(self, names, fields, use_locking):
469     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
470
471   def QueryExports(self, nodes, use_locking):
472     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
473
474   def QueryClusterInfo(self):
475     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
476
477   def QueryConfigValues(self, fields):
478     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
479
480   def QueryTags(self, kind, name):
481     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))