Fix a couple of epydoc warnings
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
40
41
42 KEY_METHOD = 'method'
43 KEY_ARGS = 'args'
44 KEY_SUCCESS = "success"
45 KEY_RESULT = "result"
46
47 REQ_SUBMIT_JOB = "SubmitJob"
48 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
49 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
50 REQ_CANCEL_JOB = "CancelJob"
51 REQ_ARCHIVE_JOB = "ArchiveJob"
52 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
53 REQ_QUERY_JOBS = "QueryJobs"
54 REQ_QUERY_INSTANCES = "QueryInstances"
55 REQ_QUERY_NODES = "QueryNodes"
56 REQ_QUERY_EXPORTS = "QueryExports"
57 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
58 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
59 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
60
61 DEF_CTMO = 10
62 DEF_RWTO = 60
63
64
65 class ProtocolError(Exception):
66   """Denotes an error in the server communication"""
67
68
69 class ConnectionClosedError(ProtocolError):
70   """Connection closed error"""
71
72
73 class TimeoutError(ProtocolError):
74   """Operation timeout error"""
75
76
77 class EncodingError(ProtocolError):
78   """Encoding failure on the sending side"""
79
80
81 class DecodingError(ProtocolError):
82   """Decoding failure on the receiving side"""
83
84
85 class RequestError(ProtocolError):
86   """Error on request
87
88   This signifies an error in the request format or request handling,
89   but not (e.g.) an error in starting up an instance.
90
91   Some common conditions that can trigger this exception:
92     - job submission failed because the job data was wrong
93     - query failed because required fields were missing
94
95   """
96
97
98 class NoMasterError(ProtocolError):
99   """The master cannot be reached
100
101   This means that the master daemon is not running or the socket has
102   been removed.
103
104   """
105
106
107 class Transport:
108   """Low-level transport class.
109
110   This is used on the client side.
111
112   This could be replace by any other class that provides the same
113   semantics to the Client. This means:
114     - can send messages and receive messages
115     - safe for multithreading
116
117   """
118
119   def __init__(self, address, timeouts=None, eom=None):
120     """Constructor for the Client class.
121
122     Arguments:
123       - address: a valid address the the used transport class
124       - timeout: a list of timeouts, to be used on connect and read/write
125       - eom: an identifier to be used as end-of-message which the
126         upper-layer will guarantee that this identifier will not appear
127         in any message
128
129     There are two timeouts used since we might want to wait for a long
130     time for a response, but the connect timeout should be lower.
131
132     If not passed, we use a default of 10 and respectively 60 seconds.
133
134     Note that on reading data, since the timeout applies to an
135     invidual receive, it might be that the total duration is longer
136     than timeout value passed (we make a hard limit at twice the read
137     timeout).
138
139     """
140     self.address = address
141     if timeouts is None:
142       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
143     else:
144       self._ctimeout, self._rwtimeout = timeouts
145
146     self.socket = None
147     self._buffer = ""
148     self._msgs = collections.deque()
149
150     if eom is None:
151       self.eom = '\3'
152     else:
153       self.eom = eom
154
155     try:
156       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
157       self.socket.settimeout(self._ctimeout)
158       try:
159         self.socket.connect(address)
160       except socket.timeout, err:
161         raise TimeoutError("Connect timed out: %s" % str(err))
162       except socket.error, err:
163         if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
164           raise NoMasterError((address,))
165         raise
166       self.socket.settimeout(self._rwtimeout)
167     except (socket.error, NoMasterError):
168       if self.socket is not None:
169         self.socket.close()
170       self.socket = None
171       raise
172
173   def _CheckSocket(self):
174     """Make sure we are connected.
175
176     """
177     if self.socket is None:
178       raise ProtocolError("Connection is closed")
179
180   def Send(self, msg):
181     """Send a message.
182
183     This just sends a message and doesn't wait for the response.
184
185     """
186     if self.eom in msg:
187       raise EncodingError("Message terminator found in payload")
188     self._CheckSocket()
189     try:
190       # TODO: sendall is not guaranteed to send everything
191       self.socket.sendall(msg + self.eom)
192     except socket.timeout, err:
193       raise TimeoutError("Sending timeout: %s" % str(err))
194
195   def Recv(self):
196     """Try to receive a message from the socket.
197
198     In case we already have messages queued, we just return from the
199     queue. Otherwise, we try to read data with a _rwtimeout network
200     timeout, and making sure we don't go over 2x_rwtimeout as a global
201     limit.
202
203     """
204     self._CheckSocket()
205     etime = time.time() + self._rwtimeout
206     while not self._msgs:
207       if time.time() > etime:
208         raise TimeoutError("Extended receive timeout")
209       while True:
210         try:
211           data = self.socket.recv(4096)
212         except socket.error, err:
213           if err.args and err.args[0] == errno.EAGAIN:
214             continue
215           raise
216         except socket.timeout, err:
217           raise TimeoutError("Receive timeout: %s" % str(err))
218         break
219       if not data:
220         raise ConnectionClosedError("Connection closed while reading")
221       new_msgs = (self._buffer + data).split(self.eom)
222       self._buffer = new_msgs.pop()
223       self._msgs.extend(new_msgs)
224     return self._msgs.popleft()
225
226   def Call(self, msg):
227     """Send a message and wait for the response.
228
229     This is just a wrapper over Send and Recv.
230
231     """
232     self.Send(msg)
233     return self.Recv()
234
235   def Close(self):
236     """Close the socket"""
237     if self.socket is not None:
238       self.socket.close()
239       self.socket = None
240
241
242 class Client(object):
243   """High-level client implementation.
244
245   This uses a backing Transport-like class on top of which it
246   implements data serialization/deserialization.
247
248   """
249   def __init__(self, address=None, timeouts=None, transport=Transport):
250     """Constructor for the Client class.
251
252     Arguments:
253       - address: a valid address the the used transport class
254       - timeout: a list of timeouts, to be used on connect and read/write
255       - transport: a Transport-like class
256
257
258     If timeout is not passed, the default timeouts of the transport
259     class are used.
260
261     """
262     if address is None:
263       address = constants.MASTER_SOCKET
264     self.address = address
265     self.timeouts = timeouts
266     self.transport_class = transport
267     self.transport = None
268     self._InitTransport()
269
270   def _InitTransport(self):
271     """(Re)initialize the transport if needed.
272
273     """
274     if self.transport is None:
275       self.transport = self.transport_class(self.address,
276                                             timeouts=self.timeouts)
277
278   def _CloseTransport(self):
279     """Close the transport, ignoring errors.
280
281     """
282     if self.transport is None:
283       return
284     try:
285       old_transp = self.transport
286       self.transport = None
287       old_transp.Close()
288     except Exception:
289       pass
290
291   def CallMethod(self, method, args):
292     """Send a generic request and return the response.
293
294     """
295     # Build request
296     request = {
297       KEY_METHOD: method,
298       KEY_ARGS: args,
299       }
300
301     # Serialize the request
302     send_data = serializer.DumpJson(request, indent=False)
303
304     # Send request and wait for response
305     try:
306       self._InitTransport()
307       result = self.transport.Call(send_data)
308     except Exception:
309       self._CloseTransport()
310       raise
311
312     # Parse the result
313     try:
314       data = serializer.LoadJson(result)
315     except Exception, err:
316       raise ProtocolError("Error while deserializing response: %s" % str(err))
317
318     # Validate response
319     if (not isinstance(data, dict) or
320         KEY_SUCCESS not in data or
321         KEY_RESULT not in data):
322       raise DecodingError("Invalid response from server: %s" % str(data))
323
324     result = data[KEY_RESULT]
325
326     if not data[KEY_SUCCESS]:
327       # TODO: decide on a standard exception
328       if (isinstance(result, (tuple, list)) and len(result) == 2 and
329           isinstance(result[1], (tuple, list))):
330         # custom ganeti errors
331         err_class = errors.GetErrorClass(result[0])
332         if err_class is not None:
333           raise err_class, tuple(result[1])
334
335       raise RequestError(result)
336
337     return result
338
339   def SetQueueDrainFlag(self, drain_flag):
340     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
341
342   def SubmitJob(self, ops):
343     ops_state = map(lambda op: op.__getstate__(), ops)
344     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
345
346   def SubmitManyJobs(self, jobs):
347     jobs_state = []
348     for ops in jobs:
349       jobs_state.append([op.__getstate__() for op in ops])
350     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
351
352   def CancelJob(self, job_id):
353     return self.CallMethod(REQ_CANCEL_JOB, job_id)
354
355   def ArchiveJob(self, job_id):
356     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
357
358   def AutoArchiveJobs(self, age):
359     timeout = (DEF_RWTO - 1) / 2
360     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
361
362   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
363     timeout = (DEF_RWTO - 1) / 2
364     while True:
365       result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
366                                (job_id, fields, prev_job_info,
367                                 prev_log_serial, timeout))
368       if result != constants.JOB_NOTCHANGED:
369         break
370     return result
371
372   def QueryJobs(self, job_ids, fields):
373     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
374
375   def QueryInstances(self, names, fields, use_locking):
376     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
377
378   def QueryNodes(self, names, fields, use_locking):
379     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
380
381   def QueryExports(self, nodes, use_locking):
382     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
383
384   def QueryClusterInfo(self):
385     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
386
387   def QueryConfigValues(self, fields):
388     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
389
390
391 # TODO: class Server(object)