Factorize LUXI parsing and handling code
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36 import logging
37
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41
42
43 KEY_METHOD = "method"
44 KEY_ARGS = "args"
45 KEY_SUCCESS = "success"
46 KEY_RESULT = "result"
47
48 REQ_SUBMIT_JOB = "SubmitJob"
49 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
50 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
51 REQ_CANCEL_JOB = "CancelJob"
52 REQ_ARCHIVE_JOB = "ArchiveJob"
53 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
54 REQ_QUERY_JOBS = "QueryJobs"
55 REQ_QUERY_INSTANCES = "QueryInstances"
56 REQ_QUERY_NODES = "QueryNodes"
57 REQ_QUERY_EXPORTS = "QueryExports"
58 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
59 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
60 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
61 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
62
63 DEF_CTMO = 10
64 DEF_RWTO = 60
65
66
67 class ProtocolError(errors.GenericError):
68   """Denotes an error in the LUXI protocol"""
69
70
71 class ConnectionClosedError(ProtocolError):
72   """Connection closed error"""
73
74
75 class TimeoutError(ProtocolError):
76   """Operation timeout error"""
77
78
79 class RequestError(ProtocolError):
80   """Error on request
81
82   This signifies an error in the request format or request handling,
83   but not (e.g.) an error in starting up an instance.
84
85   Some common conditions that can trigger this exception:
86     - job submission failed because the job data was wrong
87     - query failed because required fields were missing
88
89   """
90
91
92 class NoMasterError(ProtocolError):
93   """The master cannot be reached
94
95   This means that the master daemon is not running or the socket has
96   been removed.
97
98   """
99
100
101 class Transport:
102   """Low-level transport class.
103
104   This is used on the client side.
105
106   This could be replace by any other class that provides the same
107   semantics to the Client. This means:
108     - can send messages and receive messages
109     - safe for multithreading
110
111   """
112
113   def __init__(self, address, timeouts=None, eom=None):
114     """Constructor for the Client class.
115
116     Arguments:
117       - address: a valid address the the used transport class
118       - timeout: a list of timeouts, to be used on connect and read/write
119       - eom: an identifier to be used as end-of-message which the
120         upper-layer will guarantee that this identifier will not appear
121         in any message
122
123     There are two timeouts used since we might want to wait for a long
124     time for a response, but the connect timeout should be lower.
125
126     If not passed, we use a default of 10 and respectively 60 seconds.
127
128     Note that on reading data, since the timeout applies to an
129     invidual receive, it might be that the total duration is longer
130     than timeout value passed (we make a hard limit at twice the read
131     timeout).
132
133     """
134     self.address = address
135     if timeouts is None:
136       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
137     else:
138       self._ctimeout, self._rwtimeout = timeouts
139
140     self.socket = None
141     self._buffer = ""
142     self._msgs = collections.deque()
143
144     if eom is None:
145       self.eom = '\3'
146     else:
147       self.eom = eom
148
149     try:
150       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
151       self.socket.settimeout(self._ctimeout)
152       try:
153         self.socket.connect(address)
154       except socket.timeout, err:
155         raise TimeoutError("Connect timed out: %s" % str(err))
156       except socket.error, err:
157         if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
158           raise NoMasterError(address)
159         raise
160       self.socket.settimeout(self._rwtimeout)
161     except (socket.error, NoMasterError):
162       if self.socket is not None:
163         self.socket.close()
164       self.socket = None
165       raise
166
167   def _CheckSocket(self):
168     """Make sure we are connected.
169
170     """
171     if self.socket is None:
172       raise ProtocolError("Connection is closed")
173
174   def Send(self, msg):
175     """Send a message.
176
177     This just sends a message and doesn't wait for the response.
178
179     """
180     if self.eom in msg:
181       raise ProtocolError("Message terminator found in payload")
182
183     self._CheckSocket()
184     try:
185       # TODO: sendall is not guaranteed to send everything
186       self.socket.sendall(msg + self.eom)
187     except socket.timeout, err:
188       raise TimeoutError("Sending timeout: %s" % str(err))
189
190   def Recv(self):
191     """Try to receive a message from the socket.
192
193     In case we already have messages queued, we just return from the
194     queue. Otherwise, we try to read data with a _rwtimeout network
195     timeout, and making sure we don't go over 2x_rwtimeout as a global
196     limit.
197
198     """
199     self._CheckSocket()
200     etime = time.time() + self._rwtimeout
201     while not self._msgs:
202       if time.time() > etime:
203         raise TimeoutError("Extended receive timeout")
204       while True:
205         try:
206           data = self.socket.recv(4096)
207         except socket.error, err:
208           if err.args and err.args[0] == errno.EAGAIN:
209             continue
210           raise
211         except socket.timeout, err:
212           raise TimeoutError("Receive timeout: %s" % str(err))
213         break
214       if not data:
215         raise ConnectionClosedError("Connection closed while reading")
216       new_msgs = (self._buffer + data).split(self.eom)
217       self._buffer = new_msgs.pop()
218       self._msgs.extend(new_msgs)
219     return self._msgs.popleft()
220
221   def Call(self, msg):
222     """Send a message and wait for the response.
223
224     This is just a wrapper over Send and Recv.
225
226     """
227     self.Send(msg)
228     return self.Recv()
229
230   def Close(self):
231     """Close the socket"""
232     if self.socket is not None:
233       self.socket.close()
234       self.socket = None
235
236
237 def ParseRequest(msg):
238   """Parses a LUXI request message.
239
240   """
241   try:
242     request = serializer.LoadJson(msg)
243   except ValueError, err:
244     raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
245
246   logging.debug("LUXI request: %s", request)
247
248   if not isinstance(request, dict):
249     logging.error("LUXI request not a dict: %r", msg)
250     raise ProtocolError("Invalid LUXI request (not a dict)")
251
252   method = request.get(KEY_METHOD, None)
253   args = request.get(KEY_ARGS, None)
254   if method is None or args is None:
255     logging.error("LUXI request missing method or arguments: %r", msg)
256     raise ProtocolError(("Invalid LUXI request (no method or arguments"
257                          " in request): %r") % msg)
258
259   return (method, args)
260
261
262 def ParseResponse(msg):
263   """Parses a LUXI response message.
264
265   """
266   # Parse the result
267   try:
268     data = serializer.LoadJson(msg)
269   except Exception, err:
270     raise ProtocolError("Error while deserializing response: %s" % str(err))
271
272   # Validate response
273   if not (isinstance(data, dict) and
274           KEY_SUCCESS in data and
275           KEY_RESULT in data):
276     raise ProtocolError("Invalid response from server: %r" % data)
277
278   return (data[KEY_SUCCESS], data[KEY_RESULT])
279
280
281 def FormatResponse(success, result):
282   """Formats a LUXI response message.
283
284   """
285   response = {
286     KEY_SUCCESS: success,
287     KEY_RESULT: result,
288     }
289
290   logging.debug("LUXI response: %s", response)
291
292   return serializer.DumpJson(response)
293
294
295 def FormatRequest(method, args):
296   """Formats a LUXI request message.
297
298   """
299   # Build request
300   request = {
301     KEY_METHOD: method,
302     KEY_ARGS: args,
303     }
304
305   # Serialize the request
306   return serializer.DumpJson(request, indent=False)
307
308
309 def CallLuxiMethod(transport_cb, method, args):
310   """Send a LUXI request via a transport and return the response.
311
312   """
313   assert callable(transport_cb)
314
315   request_msg = FormatRequest(method, args)
316
317   # Send request and wait for response
318   response_msg = transport_cb(request_msg)
319
320   (success, result) = ParseResponse(response_msg)
321
322   if success:
323     return result
324
325   errors.MaybeRaise(result)
326   raise RequestError(result)
327
328
329 class Client(object):
330   """High-level client implementation.
331
332   This uses a backing Transport-like class on top of which it
333   implements data serialization/deserialization.
334
335   """
336   def __init__(self, address=None, timeouts=None, transport=Transport):
337     """Constructor for the Client class.
338
339     Arguments:
340       - address: a valid address the the used transport class
341       - timeout: a list of timeouts, to be used on connect and read/write
342       - transport: a Transport-like class
343
344
345     If timeout is not passed, the default timeouts of the transport
346     class are used.
347
348     """
349     if address is None:
350       address = constants.MASTER_SOCKET
351     self.address = address
352     self.timeouts = timeouts
353     self.transport_class = transport
354     self.transport = None
355     self._InitTransport()
356
357   def _InitTransport(self):
358     """(Re)initialize the transport if needed.
359
360     """
361     if self.transport is None:
362       self.transport = self.transport_class(self.address,
363                                             timeouts=self.timeouts)
364
365   def _CloseTransport(self):
366     """Close the transport, ignoring errors.
367
368     """
369     if self.transport is None:
370       return
371     try:
372       old_transp = self.transport
373       self.transport = None
374       old_transp.Close()
375     except Exception: # pylint: disable-msg=W0703
376       pass
377
378   def _SendMethodCall(self, data):
379     # Send request and wait for response
380     try:
381       self._InitTransport()
382       return self.transport.Call(data)
383     except Exception:
384       self._CloseTransport()
385       raise
386
387   def CallMethod(self, method, args):
388     """Send a generic request and return the response.
389
390     """
391     return CallLuxiMethod(self._SendMethodCall, method, args)
392
393   def SetQueueDrainFlag(self, drain_flag):
394     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
395
396   def SetWatcherPause(self, until):
397     return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
398
399   def SubmitJob(self, ops):
400     ops_state = map(lambda op: op.__getstate__(), ops)
401     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
402
403   def SubmitManyJobs(self, jobs):
404     jobs_state = []
405     for ops in jobs:
406       jobs_state.append([op.__getstate__() for op in ops])
407     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
408
409   def CancelJob(self, job_id):
410     return self.CallMethod(REQ_CANCEL_JOB, job_id)
411
412   def ArchiveJob(self, job_id):
413     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
414
415   def AutoArchiveJobs(self, age):
416     timeout = (DEF_RWTO - 1) / 2
417     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
418
419   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
420     timeout = (DEF_RWTO - 1) / 2
421     while True:
422       result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
423                                (job_id, fields, prev_job_info,
424                                 prev_log_serial, timeout))
425       if result != constants.JOB_NOTCHANGED:
426         break
427     return result
428
429   def QueryJobs(self, job_ids, fields):
430     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
431
432   def QueryInstances(self, names, fields, use_locking):
433     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
434
435   def QueryNodes(self, names, fields, use_locking):
436     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
437
438   def QueryExports(self, nodes, use_locking):
439     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
440
441   def QueryClusterInfo(self):
442     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
443
444   def QueryConfigValues(self, fields):
445     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
446
447
448 # TODO: class Server(object)