Implement transport of ganeti errors across luxi
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocl. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also be used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
40
41
42 KEY_METHOD = 'method'
43 KEY_ARGS = 'args'
44 KEY_SUCCESS = "success"
45 KEY_RESULT = "result"
46
47 REQ_SUBMIT_JOB = "SubmitJob"
48 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
49 REQ_CANCEL_JOB = "CancelJob"
50 REQ_ARCHIVE_JOB = "ArchiveJob"
51 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
52 REQ_QUERY_JOBS = "QueryJobs"
53 REQ_QUERY_INSTANCES = "QueryInstances"
54 REQ_QUERY_NODES = "QueryNodes"
55 REQ_QUERY_EXPORTS = "QueryExports"
56 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
57
58 DEF_CTMO = 10
59 DEF_RWTO = 60
60
61
62 class ProtocolError(Exception):
63   """Denotes an error in the server communication"""
64
65
66 class ConnectionClosedError(ProtocolError):
67   """Connection closed error"""
68
69
70 class TimeoutError(ProtocolError):
71   """Operation timeout error"""
72
73
74 class EncodingError(ProtocolError):
75   """Encoding failure on the sending side"""
76
77
78 class DecodingError(ProtocolError):
79   """Decoding failure on the receiving side"""
80
81
82 class RequestError(ProtocolError):
83   """Error on request
84
85   This signifies an error in the request format or request handling,
86   but not (e.g.) an error in starting up an instance.
87
88   Some common conditions that can trigger this exception:
89     - job submission failed because the job data was wrong
90     - query failed because required fields were missing
91
92   """
93
94
95 class NoMasterError(ProtocolError):
96   """The master cannot be reached
97
98   This means that the master daemon is not running or the socket has
99   been removed.
100
101   """
102
103
104 class Transport:
105   """Low-level transport class.
106
107   This is used on the client side.
108
109   This could be replace by any other class that provides the same
110   semantics to the Client. This means:
111     - can send messages and receive messages
112     - safe for multithreading
113
114   """
115
116   def __init__(self, address, timeouts=None, eom=None):
117     """Constructor for the Client class.
118
119     Arguments:
120       - address: a valid address the the used transport class
121       - timeout: a list of timeouts, to be used on connect and read/write
122       - eom: an identifier to be used as end-of-message which the
123         upper-layer will guarantee that this identifier will not appear
124         in any message
125
126     There are two timeouts used since we might want to wait for a long
127     time for a response, but the connect timeout should be lower.
128
129     If not passed, we use a default of 10 and respectively 60 seconds.
130
131     Note that on reading data, since the timeout applies to an
132     invidual receive, it might be that the total duration is longer
133     than timeout value passed (we make a hard limit at twice the read
134     timeout).
135
136     """
137     self.address = address
138     if timeouts is None:
139       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
140     else:
141       self._ctimeout, self._rwtimeout = timeouts
142
143     self.socket = None
144     self._buffer = ""
145     self._msgs = collections.deque()
146
147     if eom is None:
148       self.eom = '\3'
149     else:
150       self.eom = eom
151
152     try:
153       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
154       self.socket.settimeout(self._ctimeout)
155       try:
156         self.socket.connect(address)
157       except socket.timeout, err:
158         raise TimeoutError("Connect timed out: %s" % str(err))
159       except socket.error, err:
160         if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
161           raise NoMasterError((address,))
162         raise
163       self.socket.settimeout(self._rwtimeout)
164     except (socket.error, NoMasterError):
165       if self.socket is not None:
166         self.socket.close()
167       self.socket = None
168       raise
169
170   def _CheckSocket(self):
171     """Make sure we are connected.
172
173     """
174     if self.socket is None:
175       raise ProtocolError("Connection is closed")
176
177   def Send(self, msg):
178     """Send a message.
179
180     This just sends a message and doesn't wait for the response.
181
182     """
183     if self.eom in msg:
184       raise EncodingError("Message terminator found in payload")
185     self._CheckSocket()
186     try:
187       self.socket.sendall(msg + self.eom)
188     except socket.timeout, err:
189       raise TimeoutError("Sending timeout: %s" % str(err))
190
191   def Recv(self):
192     """Try to receive a messae from the socket.
193
194     In case we already have messages queued, we just return from the
195     queue. Otherwise, we try to read data with a _rwtimeout network
196     timeout, and making sure we don't go over 2x_rwtimeout as a global
197     limit.
198
199     """
200     self._CheckSocket()
201     etime = time.time() + self._rwtimeout
202     while not self._msgs:
203       if time.time() > etime:
204         raise TimeoutError("Extended receive timeout")
205       try:
206         data = self.socket.recv(4096)
207       except socket.timeout, err:
208         raise TimeoutError("Receive timeout: %s" % str(err))
209       if not data:
210         raise ConnectionClosedError("Connection closed while reading")
211       new_msgs = (self._buffer + data).split(self.eom)
212       self._buffer = new_msgs.pop()
213       self._msgs.extend(new_msgs)
214     return self._msgs.popleft()
215
216   def Call(self, msg):
217     """Send a message and wait for the response.
218
219     This is just a wrapper over Send and Recv.
220
221     """
222     self.Send(msg)
223     return self.Recv()
224
225   def Close(self):
226     """Close the socket"""
227     if self.socket is not None:
228       self.socket.close()
229       self.socket = None
230
231
232 class Client(object):
233   """High-level client implementation.
234
235   This uses a backing Transport-like class on top of which it
236   implements data serialization/deserialization.
237
238   """
239   def __init__(self, address=None, timeouts=None, transport=Transport):
240     """Constructor for the Client class.
241
242     Arguments:
243       - address: a valid address the the used transport class
244       - timeout: a list of timeouts, to be used on connect and read/write
245       - transport: a Transport-like class
246
247
248     If timeout is not passed, the default timeouts of the transport
249     class are used.
250
251     """
252     if address is None:
253       address = constants.MASTER_SOCKET
254     self.transport = transport(address, timeouts=timeouts)
255
256   def CallMethod(self, method, args):
257     """Send a generic request and return the response.
258
259     """
260     # Build request
261     request = {
262       KEY_METHOD: method,
263       KEY_ARGS: args,
264       }
265
266     # Send request and wait for response
267     result = self.transport.Call(serializer.DumpJson(request, indent=False))
268     try:
269       data = serializer.LoadJson(result)
270     except Exception, err:
271       raise ProtocolError("Error while deserializing response: %s" % str(err))
272
273     # Validate response
274     if (not isinstance(data, dict) or
275         KEY_SUCCESS not in data or
276         KEY_RESULT not in data):
277       raise DecodingError("Invalid response from server: %s" % str(data))
278
279     result = data[KEY_RESULT]
280
281     if not data[KEY_SUCCESS]:
282       # TODO: decide on a standard exception
283       if (isinstance(result, (tuple, list)) and len(result) == 2 and
284           isinstance(result[1], (tuple, list))):
285         # custom ganeti errors
286         err_class = errors.GetErrorClass(result[0])
287         if err_class is not None:
288           raise err_class, tuple(result[1])
289
290       raise RequestError(result)
291
292     return result
293
294   def SubmitJob(self, ops):
295     ops_state = map(lambda op: op.__getstate__(), ops)
296     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
297
298   def CancelJob(self, job_id):
299     return self.CallMethod(REQ_CANCEL_JOB, job_id)
300
301   def ArchiveJob(self, job_id):
302     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
303
304   def AutoArchiveJobs(self, age):
305     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, age)
306
307   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
308     timeout = (DEF_RWTO - 1) / 2
309     while True:
310       result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
311                                (job_id, fields, prev_job_info,
312                                 prev_log_serial, timeout))
313       if result != constants.JOB_NOTCHANGED:
314         break
315     return result
316
317   def QueryJobs(self, job_ids, fields):
318     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
319
320   def QueryInstances(self, names, fields):
321     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields))
322
323   def QueryNodes(self, names, fields):
324     return self.CallMethod(REQ_QUERY_NODES, (names, fields))
325
326   def QueryExports(self, nodes):
327     return self.CallMethod(REQ_QUERY_EXPORTS, nodes)
328
329   def QueryConfigValues(self, fields):
330     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
331
332 # TODO: class Server(object)