Merge branch 'devel-2.1'
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36 import logging
37
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41
42
43 KEY_METHOD = "method"
44 KEY_ARGS = "args"
45 KEY_SUCCESS = "success"
46 KEY_RESULT = "result"
47
48 REQ_SUBMIT_JOB = "SubmitJob"
49 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
50 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
51 REQ_CANCEL_JOB = "CancelJob"
52 REQ_ARCHIVE_JOB = "ArchiveJob"
53 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
54 REQ_QUERY_JOBS = "QueryJobs"
55 REQ_QUERY_INSTANCES = "QueryInstances"
56 REQ_QUERY_NODES = "QueryNodes"
57 REQ_QUERY_EXPORTS = "QueryExports"
58 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
59 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
60 REQ_QUERY_TAGS = "QueryTags"
61 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
62 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
63
64 DEF_CTMO = 10
65 DEF_RWTO = 60
66
67
68 class ProtocolError(errors.GenericError):
69   """Denotes an error in the LUXI protocol"""
70
71
72 class ConnectionClosedError(ProtocolError):
73   """Connection closed error"""
74
75
76 class TimeoutError(ProtocolError):
77   """Operation timeout error"""
78
79
80 class RequestError(ProtocolError):
81   """Error on request
82
83   This signifies an error in the request format or request handling,
84   but not (e.g.) an error in starting up an instance.
85
86   Some common conditions that can trigger this exception:
87     - job submission failed because the job data was wrong
88     - query failed because required fields were missing
89
90   """
91
92
93 class NoMasterError(ProtocolError):
94   """The master cannot be reached
95
96   This means that the master daemon is not running or the socket has
97   been removed.
98
99   """
100
101
102 class Transport:
103   """Low-level transport class.
104
105   This is used on the client side.
106
107   This could be replace by any other class that provides the same
108   semantics to the Client. This means:
109     - can send messages and receive messages
110     - safe for multithreading
111
112   """
113
114   def __init__(self, address, timeouts=None, eom=None):
115     """Constructor for the Client class.
116
117     Arguments:
118       - address: a valid address the the used transport class
119       - timeout: a list of timeouts, to be used on connect and read/write
120       - eom: an identifier to be used as end-of-message which the
121         upper-layer will guarantee that this identifier will not appear
122         in any message
123
124     There are two timeouts used since we might want to wait for a long
125     time for a response, but the connect timeout should be lower.
126
127     If not passed, we use a default of 10 and respectively 60 seconds.
128
129     Note that on reading data, since the timeout applies to an
130     invidual receive, it might be that the total duration is longer
131     than timeout value passed (we make a hard limit at twice the read
132     timeout).
133
134     """
135     self.address = address
136     if timeouts is None:
137       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
138     else:
139       self._ctimeout, self._rwtimeout = timeouts
140
141     self.socket = None
142     self._buffer = ""
143     self._msgs = collections.deque()
144
145     if eom is None:
146       self.eom = '\3'
147     else:
148       self.eom = eom
149
150     try:
151       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
152       self.socket.settimeout(self._ctimeout)
153       try:
154         self.socket.connect(address)
155       except socket.timeout, err:
156         raise TimeoutError("Connect timed out: %s" % str(err))
157       except socket.error, err:
158         if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
159           raise NoMasterError(address)
160         raise
161       self.socket.settimeout(self._rwtimeout)
162     except (socket.error, NoMasterError):
163       if self.socket is not None:
164         self.socket.close()
165       self.socket = None
166       raise
167
168   def _CheckSocket(self):
169     """Make sure we are connected.
170
171     """
172     if self.socket is None:
173       raise ProtocolError("Connection is closed")
174
175   def Send(self, msg):
176     """Send a message.
177
178     This just sends a message and doesn't wait for the response.
179
180     """
181     if self.eom in msg:
182       raise ProtocolError("Message terminator found in payload")
183
184     self._CheckSocket()
185     try:
186       # TODO: sendall is not guaranteed to send everything
187       self.socket.sendall(msg + self.eom)
188     except socket.timeout, err:
189       raise TimeoutError("Sending timeout: %s" % str(err))
190
191   def Recv(self):
192     """Try to receive a message from the socket.
193
194     In case we already have messages queued, we just return from the
195     queue. Otherwise, we try to read data with a _rwtimeout network
196     timeout, and making sure we don't go over 2x_rwtimeout as a global
197     limit.
198
199     """
200     self._CheckSocket()
201     etime = time.time() + self._rwtimeout
202     while not self._msgs:
203       if time.time() > etime:
204         raise TimeoutError("Extended receive timeout")
205       while True:
206         try:
207           data = self.socket.recv(4096)
208         except socket.error, err:
209           if err.args and err.args[0] == errno.EAGAIN:
210             continue
211           raise
212         except socket.timeout, err:
213           raise TimeoutError("Receive timeout: %s" % str(err))
214         break
215       if not data:
216         raise ConnectionClosedError("Connection closed while reading")
217       new_msgs = (self._buffer + data).split(self.eom)
218       self._buffer = new_msgs.pop()
219       self._msgs.extend(new_msgs)
220     return self._msgs.popleft()
221
222   def Call(self, msg):
223     """Send a message and wait for the response.
224
225     This is just a wrapper over Send and Recv.
226
227     """
228     self.Send(msg)
229     return self.Recv()
230
231   def Close(self):
232     """Close the socket"""
233     if self.socket is not None:
234       self.socket.close()
235       self.socket = None
236
237
238 def ParseRequest(msg):
239   """Parses a LUXI request message.
240
241   """
242   try:
243     request = serializer.LoadJson(msg)
244   except ValueError, err:
245     raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
246
247   logging.debug("LUXI request: %s", request)
248
249   if not isinstance(request, dict):
250     logging.error("LUXI request not a dict: %r", msg)
251     raise ProtocolError("Invalid LUXI request (not a dict)")
252
253   method = request.get(KEY_METHOD, None)
254   args = request.get(KEY_ARGS, None)
255   if method is None or args is None:
256     logging.error("LUXI request missing method or arguments: %r", msg)
257     raise ProtocolError(("Invalid LUXI request (no method or arguments"
258                          " in request): %r") % msg)
259
260   return (method, args)
261
262
263 def ParseResponse(msg):
264   """Parses a LUXI response message.
265
266   """
267   # Parse the result
268   try:
269     data = serializer.LoadJson(msg)
270   except Exception, err:
271     raise ProtocolError("Error while deserializing response: %s" % str(err))
272
273   # Validate response
274   if not (isinstance(data, dict) and
275           KEY_SUCCESS in data and
276           KEY_RESULT in data):
277     raise ProtocolError("Invalid response from server: %r" % data)
278
279   return (data[KEY_SUCCESS], data[KEY_RESULT])
280
281
282 def FormatResponse(success, result):
283   """Formats a LUXI response message.
284
285   """
286   response = {
287     KEY_SUCCESS: success,
288     KEY_RESULT: result,
289     }
290
291   logging.debug("LUXI response: %s", response)
292
293   return serializer.DumpJson(response)
294
295
296 def FormatRequest(method, args):
297   """Formats a LUXI request message.
298
299   """
300   # Build request
301   request = {
302     KEY_METHOD: method,
303     KEY_ARGS: args,
304     }
305
306   # Serialize the request
307   return serializer.DumpJson(request, indent=False)
308
309
310 def CallLuxiMethod(transport_cb, method, args):
311   """Send a LUXI request via a transport and return the response.
312
313   """
314   assert callable(transport_cb)
315
316   request_msg = FormatRequest(method, args)
317
318   # Send request and wait for response
319   response_msg = transport_cb(request_msg)
320
321   (success, result) = ParseResponse(response_msg)
322
323   if success:
324     return result
325
326   errors.MaybeRaise(result)
327   raise RequestError(result)
328
329
330 class Client(object):
331   """High-level client implementation.
332
333   This uses a backing Transport-like class on top of which it
334   implements data serialization/deserialization.
335
336   """
337   def __init__(self, address=None, timeouts=None, transport=Transport):
338     """Constructor for the Client class.
339
340     Arguments:
341       - address: a valid address the the used transport class
342       - timeout: a list of timeouts, to be used on connect and read/write
343       - transport: a Transport-like class
344
345
346     If timeout is not passed, the default timeouts of the transport
347     class are used.
348
349     """
350     if address is None:
351       address = constants.MASTER_SOCKET
352     self.address = address
353     self.timeouts = timeouts
354     self.transport_class = transport
355     self.transport = None
356     self._InitTransport()
357
358   def _InitTransport(self):
359     """(Re)initialize the transport if needed.
360
361     """
362     if self.transport is None:
363       self.transport = self.transport_class(self.address,
364                                             timeouts=self.timeouts)
365
366   def _CloseTransport(self):
367     """Close the transport, ignoring errors.
368
369     """
370     if self.transport is None:
371       return
372     try:
373       old_transp = self.transport
374       self.transport = None
375       old_transp.Close()
376     except Exception: # pylint: disable-msg=W0703
377       pass
378
379   def _SendMethodCall(self, data):
380     # Send request and wait for response
381     try:
382       self._InitTransport()
383       return self.transport.Call(data)
384     except Exception:
385       self._CloseTransport()
386       raise
387
388   def CallMethod(self, method, args):
389     """Send a generic request and return the response.
390
391     """
392     return CallLuxiMethod(self._SendMethodCall, method, args)
393
394   def SetQueueDrainFlag(self, drain_flag):
395     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
396
397   def SetWatcherPause(self, until):
398     return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
399
400   def SubmitJob(self, ops):
401     ops_state = map(lambda op: op.__getstate__(), ops)
402     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
403
404   def SubmitManyJobs(self, jobs):
405     jobs_state = []
406     for ops in jobs:
407       jobs_state.append([op.__getstate__() for op in ops])
408     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
409
410   def CancelJob(self, job_id):
411     return self.CallMethod(REQ_CANCEL_JOB, job_id)
412
413   def ArchiveJob(self, job_id):
414     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
415
416   def AutoArchiveJobs(self, age):
417     timeout = (DEF_RWTO - 1) / 2
418     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
419
420   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
421     timeout = (DEF_RWTO - 1) / 2
422     while True:
423       result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
424                                (job_id, fields, prev_job_info,
425                                 prev_log_serial, timeout))
426       if result != constants.JOB_NOTCHANGED:
427         break
428     return result
429
430   def QueryJobs(self, job_ids, fields):
431     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
432
433   def QueryInstances(self, names, fields, use_locking):
434     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
435
436   def QueryNodes(self, names, fields, use_locking):
437     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
438
439   def QueryExports(self, nodes, use_locking):
440     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
441
442   def QueryClusterInfo(self):
443     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
444
445   def QueryConfigValues(self, fields):
446     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
447
448   def QueryTags(self, kind, name):
449     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
450
451
452 # TODO: class Server(object)