Fix pylint warnings
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
40
41
42 KEY_METHOD = 'method'
43 KEY_ARGS = 'args'
44 KEY_SUCCESS = "success"
45 KEY_RESULT = "result"
46
47 REQ_SUBMIT_JOB = "SubmitJob"
48 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
49 REQ_CANCEL_JOB = "CancelJob"
50 REQ_ARCHIVE_JOB = "ArchiveJob"
51 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
52 REQ_QUERY_JOBS = "QueryJobs"
53 REQ_QUERY_INSTANCES = "QueryInstances"
54 REQ_QUERY_NODES = "QueryNodes"
55 REQ_QUERY_EXPORTS = "QueryExports"
56 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
57 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
58 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
59
60 DEF_CTMO = 10
61 DEF_RWTO = 60
62
63
64 class ProtocolError(Exception):
65   """Denotes an error in the server communication"""
66
67
68 class ConnectionClosedError(ProtocolError):
69   """Connection closed error"""
70
71
72 class TimeoutError(ProtocolError):
73   """Operation timeout error"""
74
75
76 class EncodingError(ProtocolError):
77   """Encoding failure on the sending side"""
78
79
80 class DecodingError(ProtocolError):
81   """Decoding failure on the receiving side"""
82
83
84 class RequestError(ProtocolError):
85   """Error on request
86
87   This signifies an error in the request format or request handling,
88   but not (e.g.) an error in starting up an instance.
89
90   Some common conditions that can trigger this exception:
91     - job submission failed because the job data was wrong
92     - query failed because required fields were missing
93
94   """
95
96
97 class NoMasterError(ProtocolError):
98   """The master cannot be reached
99
100   This means that the master daemon is not running or the socket has
101   been removed.
102
103   """
104
105
106 class Transport:
107   """Low-level transport class.
108
109   This is used on the client side.
110
111   This could be replace by any other class that provides the same
112   semantics to the Client. This means:
113     - can send messages and receive messages
114     - safe for multithreading
115
116   """
117
118   def __init__(self, address, timeouts=None, eom=None):
119     """Constructor for the Client class.
120
121     Arguments:
122       - address: a valid address the the used transport class
123       - timeout: a list of timeouts, to be used on connect and read/write
124       - eom: an identifier to be used as end-of-message which the
125         upper-layer will guarantee that this identifier will not appear
126         in any message
127
128     There are two timeouts used since we might want to wait for a long
129     time for a response, but the connect timeout should be lower.
130
131     If not passed, we use a default of 10 and respectively 60 seconds.
132
133     Note that on reading data, since the timeout applies to an
134     invidual receive, it might be that the total duration is longer
135     than timeout value passed (we make a hard limit at twice the read
136     timeout).
137
138     """
139     self.address = address
140     if timeouts is None:
141       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
142     else:
143       self._ctimeout, self._rwtimeout = timeouts
144
145     self.socket = None
146     self._buffer = ""
147     self._msgs = collections.deque()
148
149     if eom is None:
150       self.eom = '\3'
151     else:
152       self.eom = eom
153
154     try:
155       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
156       self.socket.settimeout(self._ctimeout)
157       try:
158         self.socket.connect(address)
159       except socket.timeout, err:
160         raise TimeoutError("Connect timed out: %s" % str(err))
161       except socket.error, err:
162         if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
163           raise NoMasterError((address,))
164         raise
165       self.socket.settimeout(self._rwtimeout)
166     except (socket.error, NoMasterError):
167       if self.socket is not None:
168         self.socket.close()
169       self.socket = None
170       raise
171
172   def _CheckSocket(self):
173     """Make sure we are connected.
174
175     """
176     if self.socket is None:
177       raise ProtocolError("Connection is closed")
178
179   def Send(self, msg):
180     """Send a message.
181
182     This just sends a message and doesn't wait for the response.
183
184     """
185     if self.eom in msg:
186       raise EncodingError("Message terminator found in payload")
187     self._CheckSocket()
188     try:
189       # TODO: sendall is not guaranteed to send everything
190       self.socket.sendall(msg + self.eom)
191     except socket.timeout, err:
192       raise TimeoutError("Sending timeout: %s" % str(err))
193
194   def Recv(self):
195     """Try to receive a message from the socket.
196
197     In case we already have messages queued, we just return from the
198     queue. Otherwise, we try to read data with a _rwtimeout network
199     timeout, and making sure we don't go over 2x_rwtimeout as a global
200     limit.
201
202     """
203     self._CheckSocket()
204     etime = time.time() + self._rwtimeout
205     while not self._msgs:
206       if time.time() > etime:
207         raise TimeoutError("Extended receive timeout")
208       while True:
209         try:
210           data = self.socket.recv(4096)
211         except socket.error, err:
212           if err.args and err.args[0] == errno.EAGAIN:
213             continue
214           raise
215         except socket.timeout, err:
216           raise TimeoutError("Receive timeout: %s" % str(err))
217         break
218       if not data:
219         raise ConnectionClosedError("Connection closed while reading")
220       new_msgs = (self._buffer + data).split(self.eom)
221       self._buffer = new_msgs.pop()
222       self._msgs.extend(new_msgs)
223     return self._msgs.popleft()
224
225   def Call(self, msg):
226     """Send a message and wait for the response.
227
228     This is just a wrapper over Send and Recv.
229
230     """
231     self.Send(msg)
232     return self.Recv()
233
234   def Close(self):
235     """Close the socket"""
236     if self.socket is not None:
237       self.socket.close()
238       self.socket = None
239
240
241 class Client(object):
242   """High-level client implementation.
243
244   This uses a backing Transport-like class on top of which it
245   implements data serialization/deserialization.
246
247   """
248   def __init__(self, address=None, timeouts=None, transport=Transport):
249     """Constructor for the Client class.
250
251     Arguments:
252       - address: a valid address the the used transport class
253       - timeout: a list of timeouts, to be used on connect and read/write
254       - transport: a Transport-like class
255
256
257     If timeout is not passed, the default timeouts of the transport
258     class are used.
259
260     """
261     if address is None:
262       address = constants.MASTER_SOCKET
263     self.address = address
264     self.timeouts = timeouts
265     self.transport_class = transport
266     self.transport = None
267     self._InitTransport()
268
269   def _InitTransport(self):
270     """(Re)initialize the transport if needed.
271
272     """
273     if self.transport is None:
274       self.transport = self.transport_class(self.address,
275                                             timeouts=self.timeouts)
276
277   def _CloseTransport(self):
278     """Close the transport, ignoring errors.
279
280     """
281     if self.transport is None:
282       return
283     try:
284       old_transp = self.transport
285       self.transport = None
286       old_transp.Close()
287     except Exception:
288       pass
289
290   def CallMethod(self, method, args):
291     """Send a generic request and return the response.
292
293     """
294     # Build request
295     request = {
296       KEY_METHOD: method,
297       KEY_ARGS: args,
298       }
299
300     # Serialize the request
301     send_data = serializer.DumpJson(request, indent=False)
302
303     # Send request and wait for response
304     try:
305       self._InitTransport()
306       result = self.transport.Call(send_data)
307     except Exception:
308       self._CloseTransport()
309       raise
310
311     # Parse the result
312     try:
313       data = serializer.LoadJson(result)
314     except Exception, err:
315       raise ProtocolError("Error while deserializing response: %s" % str(err))
316
317     # Validate response
318     if (not isinstance(data, dict) or
319         KEY_SUCCESS not in data or
320         KEY_RESULT not in data):
321       raise DecodingError("Invalid response from server: %s" % str(data))
322
323     result = data[KEY_RESULT]
324
325     if not data[KEY_SUCCESS]:
326       # TODO: decide on a standard exception
327       if (isinstance(result, (tuple, list)) and len(result) == 2 and
328           isinstance(result[1], (tuple, list))):
329         # custom ganeti errors
330         err_class = errors.GetErrorClass(result[0])
331         if err_class is not None:
332           raise err_class, tuple(result[1])
333
334       raise RequestError(result)
335
336     return result
337
338   def SetQueueDrainFlag(self, drain_flag):
339     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
340
341   def SubmitJob(self, ops):
342     ops_state = map(lambda op: op.__getstate__(), ops)
343     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
344
345   def CancelJob(self, job_id):
346     return self.CallMethod(REQ_CANCEL_JOB, job_id)
347
348   def ArchiveJob(self, job_id):
349     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
350
351   def AutoArchiveJobs(self, age):
352     timeout = (DEF_RWTO - 1) / 2
353     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
354
355   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
356     timeout = (DEF_RWTO - 1) / 2
357     while True:
358       result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
359                                (job_id, fields, prev_job_info,
360                                 prev_log_serial, timeout))
361       if result != constants.JOB_NOTCHANGED:
362         break
363     return result
364
365   def QueryJobs(self, job_ids, fields):
366     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
367
368   def QueryInstances(self, names, fields, use_locking):
369     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
370
371   def QueryNodes(self, names, fields, use_locking):
372     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
373
374   def QueryExports(self, nodes, use_locking):
375     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
376
377   def QueryClusterInfo(self):
378     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
379
380   def QueryConfigValues(self, fields):
381     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
382
383
384 # TODO: class Server(object)