Simplify LUXI exceptions
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
40
41
42 KEY_METHOD = 'method'
43 KEY_ARGS = 'args'
44 KEY_SUCCESS = "success"
45 KEY_RESULT = "result"
46
47 REQ_SUBMIT_JOB = "SubmitJob"
48 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
49 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
50 REQ_CANCEL_JOB = "CancelJob"
51 REQ_ARCHIVE_JOB = "ArchiveJob"
52 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
53 REQ_QUERY_JOBS = "QueryJobs"
54 REQ_QUERY_INSTANCES = "QueryInstances"
55 REQ_QUERY_NODES = "QueryNodes"
56 REQ_QUERY_EXPORTS = "QueryExports"
57 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
58 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
59 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
60 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
61
62 DEF_CTMO = 10
63 DEF_RWTO = 60
64
65
66 class ProtocolError(errors.GenericError):
67   """Denotes an error in the LUXI protocol"""
68
69
70 class ConnectionClosedError(ProtocolError):
71   """Connection closed error"""
72
73
74 class TimeoutError(ProtocolError):
75   """Operation timeout error"""
76
77
78 class RequestError(ProtocolError):
79   """Error on request
80
81   This signifies an error in the request format or request handling,
82   but not (e.g.) an error in starting up an instance.
83
84   Some common conditions that can trigger this exception:
85     - job submission failed because the job data was wrong
86     - query failed because required fields were missing
87
88   """
89
90
91 class NoMasterError(ProtocolError):
92   """The master cannot be reached
93
94   This means that the master daemon is not running or the socket has
95   been removed.
96
97   """
98
99
100 class Transport:
101   """Low-level transport class.
102
103   This is used on the client side.
104
105   This could be replace by any other class that provides the same
106   semantics to the Client. This means:
107     - can send messages and receive messages
108     - safe for multithreading
109
110   """
111
112   def __init__(self, address, timeouts=None, eom=None):
113     """Constructor for the Client class.
114
115     Arguments:
116       - address: a valid address the the used transport class
117       - timeout: a list of timeouts, to be used on connect and read/write
118       - eom: an identifier to be used as end-of-message which the
119         upper-layer will guarantee that this identifier will not appear
120         in any message
121
122     There are two timeouts used since we might want to wait for a long
123     time for a response, but the connect timeout should be lower.
124
125     If not passed, we use a default of 10 and respectively 60 seconds.
126
127     Note that on reading data, since the timeout applies to an
128     invidual receive, it might be that the total duration is longer
129     than timeout value passed (we make a hard limit at twice the read
130     timeout).
131
132     """
133     self.address = address
134     if timeouts is None:
135       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
136     else:
137       self._ctimeout, self._rwtimeout = timeouts
138
139     self.socket = None
140     self._buffer = ""
141     self._msgs = collections.deque()
142
143     if eom is None:
144       self.eom = '\3'
145     else:
146       self.eom = eom
147
148     try:
149       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
150       self.socket.settimeout(self._ctimeout)
151       try:
152         self.socket.connect(address)
153       except socket.timeout, err:
154         raise TimeoutError("Connect timed out: %s" % str(err))
155       except socket.error, err:
156         if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
157           raise NoMasterError(address)
158         raise
159       self.socket.settimeout(self._rwtimeout)
160     except (socket.error, NoMasterError):
161       if self.socket is not None:
162         self.socket.close()
163       self.socket = None
164       raise
165
166   def _CheckSocket(self):
167     """Make sure we are connected.
168
169     """
170     if self.socket is None:
171       raise ProtocolError("Connection is closed")
172
173   def Send(self, msg):
174     """Send a message.
175
176     This just sends a message and doesn't wait for the response.
177
178     """
179     if self.eom in msg:
180       raise ProtocolError("Message terminator found in payload")
181
182     self._CheckSocket()
183     try:
184       # TODO: sendall is not guaranteed to send everything
185       self.socket.sendall(msg + self.eom)
186     except socket.timeout, err:
187       raise TimeoutError("Sending timeout: %s" % str(err))
188
189   def Recv(self):
190     """Try to receive a message from the socket.
191
192     In case we already have messages queued, we just return from the
193     queue. Otherwise, we try to read data with a _rwtimeout network
194     timeout, and making sure we don't go over 2x_rwtimeout as a global
195     limit.
196
197     """
198     self._CheckSocket()
199     etime = time.time() + self._rwtimeout
200     while not self._msgs:
201       if time.time() > etime:
202         raise TimeoutError("Extended receive timeout")
203       while True:
204         try:
205           data = self.socket.recv(4096)
206         except socket.error, err:
207           if err.args and err.args[0] == errno.EAGAIN:
208             continue
209           raise
210         except socket.timeout, err:
211           raise TimeoutError("Receive timeout: %s" % str(err))
212         break
213       if not data:
214         raise ConnectionClosedError("Connection closed while reading")
215       new_msgs = (self._buffer + data).split(self.eom)
216       self._buffer = new_msgs.pop()
217       self._msgs.extend(new_msgs)
218     return self._msgs.popleft()
219
220   def Call(self, msg):
221     """Send a message and wait for the response.
222
223     This is just a wrapper over Send and Recv.
224
225     """
226     self.Send(msg)
227     return self.Recv()
228
229   def Close(self):
230     """Close the socket"""
231     if self.socket is not None:
232       self.socket.close()
233       self.socket = None
234
235
236 class Client(object):
237   """High-level client implementation.
238
239   This uses a backing Transport-like class on top of which it
240   implements data serialization/deserialization.
241
242   """
243   def __init__(self, address=None, timeouts=None, transport=Transport):
244     """Constructor for the Client class.
245
246     Arguments:
247       - address: a valid address the the used transport class
248       - timeout: a list of timeouts, to be used on connect and read/write
249       - transport: a Transport-like class
250
251
252     If timeout is not passed, the default timeouts of the transport
253     class are used.
254
255     """
256     if address is None:
257       address = constants.MASTER_SOCKET
258     self.address = address
259     self.timeouts = timeouts
260     self.transport_class = transport
261     self.transport = None
262     self._InitTransport()
263
264   def _InitTransport(self):
265     """(Re)initialize the transport if needed.
266
267     """
268     if self.transport is None:
269       self.transport = self.transport_class(self.address,
270                                             timeouts=self.timeouts)
271
272   def _CloseTransport(self):
273     """Close the transport, ignoring errors.
274
275     """
276     if self.transport is None:
277       return
278     try:
279       old_transp = self.transport
280       self.transport = None
281       old_transp.Close()
282     except Exception: # pylint: disable-msg=W0703
283       pass
284
285   def CallMethod(self, method, args):
286     """Send a generic request and return the response.
287
288     """
289     # Build request
290     request = {
291       KEY_METHOD: method,
292       KEY_ARGS: args,
293       }
294
295     # Serialize the request
296     send_data = serializer.DumpJson(request, indent=False)
297
298     # Send request and wait for response
299     try:
300       self._InitTransport()
301       result = self.transport.Call(send_data)
302     except Exception:
303       self._CloseTransport()
304       raise
305
306     # Parse the result
307     try:
308       data = serializer.LoadJson(result)
309     except Exception, err:
310       raise ProtocolError("Error while deserializing response: %s" % str(err))
311
312     # Validate response
313     if (not isinstance(data, dict) or
314         KEY_SUCCESS not in data or
315         KEY_RESULT not in data):
316       raise ProtocolError("Invalid response from server: %s" % str(data))
317
318     result = data[KEY_RESULT]
319
320     if not data[KEY_SUCCESS]:
321       errors.MaybeRaise(result)
322       raise RequestError(result)
323
324     return result
325
326   def SetQueueDrainFlag(self, drain_flag):
327     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
328
329   def SetWatcherPause(self, until):
330     return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
331
332   def SubmitJob(self, ops):
333     ops_state = map(lambda op: op.__getstate__(), ops)
334     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
335
336   def SubmitManyJobs(self, jobs):
337     jobs_state = []
338     for ops in jobs:
339       jobs_state.append([op.__getstate__() for op in ops])
340     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
341
342   def CancelJob(self, job_id):
343     return self.CallMethod(REQ_CANCEL_JOB, job_id)
344
345   def ArchiveJob(self, job_id):
346     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
347
348   def AutoArchiveJobs(self, age):
349     timeout = (DEF_RWTO - 1) / 2
350     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
351
352   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
353     timeout = (DEF_RWTO - 1) / 2
354     while True:
355       result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
356                                (job_id, fields, prev_job_info,
357                                 prev_log_serial, timeout))
358       if result != constants.JOB_NOTCHANGED:
359         break
360     return result
361
362   def QueryJobs(self, job_ids, fields):
363     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
364
365   def QueryInstances(self, names, fields, use_locking):
366     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
367
368   def QueryNodes(self, names, fields, use_locking):
369     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
370
371   def QueryExports(self, nodes, use_locking):
372     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
373
374   def QueryClusterInfo(self):
375     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
376
377   def QueryConfigValues(self, fields):
378     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
379
380
381 # TODO: class Server(object)