Implement handling of luxi errors in cli.py
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocl. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also be used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import simplejson
35 import time
36 import errno
37
38 from ganeti import opcodes
39 from ganeti import constants
40
41
42 KEY_REQUEST = 'request'
43 KEY_DATA = 'data'
44 REQ_SUBMIT = 'submit'
45 REQ_ABORT = 'abort'
46 REQ_QUERY = 'query'
47
48 DEF_CTMO = 10
49 DEF_RWTO = 60
50
51
52 class ProtocolError(Exception):
53   """Denotes an error in the server communication"""
54
55
56 class ConnectionClosedError(ProtocolError):
57   """Connection closed error"""
58
59
60 class TimeoutError(ProtocolError):
61   """Operation timeout error"""
62
63
64 class EncodingError(ProtocolError):
65   """Encoding failure on the sending side"""
66
67
68 class DecodingError(ProtocolError):
69   """Decoding failure on the receiving side"""
70
71
72 class RequestError(ProtocolError):
73   """Error on request
74
75   This signifies an error in the request format or request handling,
76   but not (e.g.) an error in starting up an instance.
77
78   Some common conditions that can trigger this exception:
79     - job submission failed because the job data was wrong
80     - query failed because required fields were missing
81
82   """
83
84 class NoMasterError(ProtocolError):
85   """The master cannot be reached
86
87   This means that the master daemon is not running or the socket has
88   been removed.
89
90   """
91
92
93 def SerializeJob(job):
94   """Convert a job description to a string format.
95
96   """
97   return simplejson.dumps(job.__getstate__())
98
99
100 def UnserializeJob(data):
101   """Load a job from a string format"""
102   try:
103     new_data = simplejson.loads(data)
104   except Exception, err:
105     raise DecodingError("Error while unserializing: %s" % str(err))
106   job = opcodes.Job()
107   job.__setstate__(new_data)
108   return job
109
110
111 class Transport:
112   """Low-level transport class.
113
114   This is used on the client side.
115
116   This could be replace by any other class that provides the same
117   semantics to the Client. This means:
118     - can send messages and receive messages
119     - safe for multithreading
120
121   """
122
123   def __init__(self, address, timeouts=None, eom=None):
124     """Constructor for the Client class.
125
126     Arguments:
127       - address: a valid address the the used transport class
128       - timeout: a list of timeouts, to be used on connect and read/write
129       - eom: an identifier to be used as end-of-message which the
130         upper-layer will guarantee that this identifier will not appear
131         in any message
132
133     There are two timeouts used since we might want to wait for a long
134     time for a response, but the connect timeout should be lower.
135
136     If not passed, we use a default of 10 and respectively 60 seconds.
137
138     Note that on reading data, since the timeout applies to an
139     invidual receive, it might be that the total duration is longer
140     than timeout value passed (we make a hard limit at twice the read
141     timeout).
142
143     """
144     self.address = address
145     if timeouts is None:
146       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
147     else:
148       self._ctimeout, self._rwtimeout = timeouts
149
150     self.socket = None
151     self._buffer = ""
152     self._msgs = collections.deque()
153
154     if eom is None:
155       self.eom = '\3'
156     else:
157       self.eom = eom
158
159     try:
160       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
161       self.socket.settimeout(self._ctimeout)
162       try:
163         self.socket.connect(address)
164       except socket.timeout, err:
165         raise TimeoutError("Connect timed out: %s" % str(err))
166       except socket.error, err:
167         if err.args[0] == errno.ENOENT:
168           raise NoMasterError((address,))
169         raise
170       self.socket.settimeout(self._rwtimeout)
171     except (socket.error, NoMasterError):
172       if self.socket is not None:
173         self.socket.close()
174       self.socket = None
175       raise
176
177   def _CheckSocket(self):
178     """Make sure we are connected.
179
180     """
181     if self.socket is None:
182       raise ProtocolError("Connection is closed")
183
184   def Send(self, msg):
185     """Send a message.
186
187     This just sends a message and doesn't wait for the response.
188
189     """
190     if self.eom in msg:
191       raise EncodingError("Message terminator found in payload")
192     self._CheckSocket()
193     try:
194       self.socket.sendall(msg + self.eom)
195     except socket.timeout, err:
196       raise TimeoutError("Sending timeout: %s" % str(err))
197
198   def Recv(self):
199     """Try to receive a messae from the socket.
200
201     In case we already have messages queued, we just return from the
202     queue. Otherwise, we try to read data with a _rwtimeout network
203     timeout, and making sure we don't go over 2x_rwtimeout as a global
204     limit.
205
206     """
207     self._CheckSocket()
208     etime = time.time() + self._rwtimeout
209     while not self._msgs:
210       if time.time() > etime:
211         raise TimeoutError("Extended receive timeout")
212       try:
213         data = self.socket.recv(4096)
214       except socket.timeout, err:
215         raise TimeoutError("Receive timeout: %s" % str(err))
216       if not data:
217         raise ConnectionClosedError("Connection closed while reading")
218       new_msgs = (self._buffer + data).split(self.eom)
219       self._buffer = new_msgs.pop()
220       self._msgs.extend(new_msgs)
221     return self._msgs.popleft()
222
223   def Call(self, msg):
224     """Send a message and wait for the response.
225
226     This is just a wrapper over Send and Recv.
227
228     """
229     self.Send(msg)
230     return self.Recv()
231
232   def Close(self):
233     """Close the socket"""
234     if self.socket is not None:
235       self.socket.close()
236       self.socket = None
237
238
239 class Client(object):
240   """High-level client implementation.
241
242   This uses a backing Transport-like class on top of which it
243   implements data serialization/deserialization.
244
245   """
246   def __init__(self, address=None, timeouts=None, transport=Transport):
247     """Constructor for the Client class.
248
249     Arguments:
250       - address: a valid address the the used transport class
251       - timeout: a list of timeouts, to be used on connect and read/write
252       - transport: a Transport-like class
253
254
255     If timeout is not passed, the default timeouts of the transport
256     class are used.
257
258     """
259     if address is None:
260       address = constants.MASTER_SOCKET
261     self.transport = transport(address, timeouts=timeouts)
262
263   def SendRequest(self, request, data):
264     """Send a generic request and return the response.
265
266     """
267     msg = {KEY_REQUEST: request, KEY_DATA: data}
268     result = self.transport.Call(simplejson.dumps(msg))
269     try:
270       data = simplejson.loads(result)
271     except Exception, err:
272       raise ProtocolError("Error while deserializing response: %s" % str(err))
273     if (not isinstance(data, dict) or
274         'success' not in data or
275         'result' not in data):
276       raise DecodingError("Invalid response from server: %s" % str(data))
277     return data
278
279   def SubmitJob(self, job):
280     """Submit a job"""
281     result = self.SendRequest(REQ_SUBMIT, SerializeJob(job))
282     if not result['success']:
283       raise RequestError(result['result'])
284     return result['result']
285
286   def Query(self, data):
287     """Make a query"""
288     result = self.SendRequest(REQ_QUERY, data)
289     if not result['success']:
290       raise RequestError(result[result])
291     result = result['result']
292     if data["object"] == "jobs":
293       # custom job processing of query values
294       for row in result:
295         for idx, field in enumerate(data["fields"]):
296           if field == "op_list":
297             row[idx] = [opcodes.OpCode.LoadOpCode(i) for i in row[idx]]
298     return result