004eeff857a3aa1acdb1ea843f3caa0b9ca39a6c
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
40
41
42 KEY_METHOD = 'method'
43 KEY_ARGS = 'args'
44 KEY_SUCCESS = "success"
45 KEY_RESULT = "result"
46
47 REQ_SUBMIT_JOB = "SubmitJob"
48 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
49 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
50 REQ_CANCEL_JOB = "CancelJob"
51 REQ_ARCHIVE_JOB = "ArchiveJob"
52 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
53 REQ_QUERY_JOBS = "QueryJobs"
54 REQ_QUERY_INSTANCES = "QueryInstances"
55 REQ_QUERY_NODES = "QueryNodes"
56 REQ_QUERY_EXPORTS = "QueryExports"
57 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
58 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
59 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
60 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
61
62 DEF_CTMO = 10
63 DEF_RWTO = 60
64
65
66 class ProtocolError(Exception):
67   """Denotes an error in the server communication"""
68
69
70 class ConnectionClosedError(ProtocolError):
71   """Connection closed error"""
72
73
74 class TimeoutError(ProtocolError):
75   """Operation timeout error"""
76
77
78 class EncodingError(ProtocolError):
79   """Encoding failure on the sending side"""
80
81
82 class DecodingError(ProtocolError):
83   """Decoding failure on the receiving side"""
84
85
86 class RequestError(ProtocolError):
87   """Error on request
88
89   This signifies an error in the request format or request handling,
90   but not (e.g.) an error in starting up an instance.
91
92   Some common conditions that can trigger this exception:
93     - job submission failed because the job data was wrong
94     - query failed because required fields were missing
95
96   """
97
98
99 class NoMasterError(ProtocolError):
100   """The master cannot be reached
101
102   This means that the master daemon is not running or the socket has
103   been removed.
104
105   """
106
107
108 class Transport:
109   """Low-level transport class.
110
111   This is used on the client side.
112
113   This could be replace by any other class that provides the same
114   semantics to the Client. This means:
115     - can send messages and receive messages
116     - safe for multithreading
117
118   """
119
120   def __init__(self, address, timeouts=None, eom=None):
121     """Constructor for the Client class.
122
123     Arguments:
124       - address: a valid address the the used transport class
125       - timeout: a list of timeouts, to be used on connect and read/write
126       - eom: an identifier to be used as end-of-message which the
127         upper-layer will guarantee that this identifier will not appear
128         in any message
129
130     There are two timeouts used since we might want to wait for a long
131     time for a response, but the connect timeout should be lower.
132
133     If not passed, we use a default of 10 and respectively 60 seconds.
134
135     Note that on reading data, since the timeout applies to an
136     invidual receive, it might be that the total duration is longer
137     than timeout value passed (we make a hard limit at twice the read
138     timeout).
139
140     """
141     self.address = address
142     if timeouts is None:
143       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
144     else:
145       self._ctimeout, self._rwtimeout = timeouts
146
147     self.socket = None
148     self._buffer = ""
149     self._msgs = collections.deque()
150
151     if eom is None:
152       self.eom = '\3'
153     else:
154       self.eom = eom
155
156     try:
157       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
158       self.socket.settimeout(self._ctimeout)
159       try:
160         self.socket.connect(address)
161       except socket.timeout, err:
162         raise TimeoutError("Connect timed out: %s" % str(err))
163       except socket.error, err:
164         if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
165           raise NoMasterError((address,))
166         raise
167       self.socket.settimeout(self._rwtimeout)
168     except (socket.error, NoMasterError):
169       if self.socket is not None:
170         self.socket.close()
171       self.socket = None
172       raise
173
174   def _CheckSocket(self):
175     """Make sure we are connected.
176
177     """
178     if self.socket is None:
179       raise ProtocolError("Connection is closed")
180
181   def Send(self, msg):
182     """Send a message.
183
184     This just sends a message and doesn't wait for the response.
185
186     """
187     if self.eom in msg:
188       raise EncodingError("Message terminator found in payload")
189     self._CheckSocket()
190     try:
191       # TODO: sendall is not guaranteed to send everything
192       self.socket.sendall(msg + self.eom)
193     except socket.timeout, err:
194       raise TimeoutError("Sending timeout: %s" % str(err))
195
196   def Recv(self):
197     """Try to receive a message from the socket.
198
199     In case we already have messages queued, we just return from the
200     queue. Otherwise, we try to read data with a _rwtimeout network
201     timeout, and making sure we don't go over 2x_rwtimeout as a global
202     limit.
203
204     """
205     self._CheckSocket()
206     etime = time.time() + self._rwtimeout
207     while not self._msgs:
208       if time.time() > etime:
209         raise TimeoutError("Extended receive timeout")
210       while True:
211         try:
212           data = self.socket.recv(4096)
213         except socket.error, err:
214           if err.args and err.args[0] == errno.EAGAIN:
215             continue
216           raise
217         except socket.timeout, err:
218           raise TimeoutError("Receive timeout: %s" % str(err))
219         break
220       if not data:
221         raise ConnectionClosedError("Connection closed while reading")
222       new_msgs = (self._buffer + data).split(self.eom)
223       self._buffer = new_msgs.pop()
224       self._msgs.extend(new_msgs)
225     return self._msgs.popleft()
226
227   def Call(self, msg):
228     """Send a message and wait for the response.
229
230     This is just a wrapper over Send and Recv.
231
232     """
233     self.Send(msg)
234     return self.Recv()
235
236   def Close(self):
237     """Close the socket"""
238     if self.socket is not None:
239       self.socket.close()
240       self.socket = None
241
242
243 class Client(object):
244   """High-level client implementation.
245
246   This uses a backing Transport-like class on top of which it
247   implements data serialization/deserialization.
248
249   """
250   def __init__(self, address=None, timeouts=None, transport=Transport):
251     """Constructor for the Client class.
252
253     Arguments:
254       - address: a valid address the the used transport class
255       - timeout: a list of timeouts, to be used on connect and read/write
256       - transport: a Transport-like class
257
258
259     If timeout is not passed, the default timeouts of the transport
260     class are used.
261
262     """
263     if address is None:
264       address = constants.MASTER_SOCKET
265     self.address = address
266     self.timeouts = timeouts
267     self.transport_class = transport
268     self.transport = None
269     self._InitTransport()
270
271   def _InitTransport(self):
272     """(Re)initialize the transport if needed.
273
274     """
275     if self.transport is None:
276       self.transport = self.transport_class(self.address,
277                                             timeouts=self.timeouts)
278
279   def _CloseTransport(self):
280     """Close the transport, ignoring errors.
281
282     """
283     if self.transport is None:
284       return
285     try:
286       old_transp = self.transport
287       self.transport = None
288       old_transp.Close()
289     except Exception:
290       pass
291
292   def CallMethod(self, method, args):
293     """Send a generic request and return the response.
294
295     """
296     # Build request
297     request = {
298       KEY_METHOD: method,
299       KEY_ARGS: args,
300       }
301
302     # Serialize the request
303     send_data = serializer.DumpJson(request, indent=False)
304
305     # Send request and wait for response
306     try:
307       self._InitTransport()
308       result = self.transport.Call(send_data)
309     except Exception:
310       self._CloseTransport()
311       raise
312
313     # Parse the result
314     try:
315       data = serializer.LoadJson(result)
316     except Exception, err:
317       raise ProtocolError("Error while deserializing response: %s" % str(err))
318
319     # Validate response
320     if (not isinstance(data, dict) or
321         KEY_SUCCESS not in data or
322         KEY_RESULT not in data):
323       raise DecodingError("Invalid response from server: %s" % str(data))
324
325     result = data[KEY_RESULT]
326
327     if not data[KEY_SUCCESS]:
328       # TODO: decide on a standard exception
329       if (isinstance(result, (tuple, list)) and len(result) == 2 and
330           isinstance(result[1], (tuple, list))):
331         # custom ganeti errors
332         err_class = errors.GetErrorClass(result[0])
333         if err_class is not None:
334           raise err_class, tuple(result[1])
335
336       raise RequestError(result)
337
338     return result
339
340   def SetQueueDrainFlag(self, drain_flag):
341     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
342
343   def SetWatcherPause(self, until):
344     return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
345
346   def SubmitJob(self, ops):
347     ops_state = map(lambda op: op.__getstate__(), ops)
348     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
349
350   def SubmitManyJobs(self, jobs):
351     jobs_state = []
352     for ops in jobs:
353       jobs_state.append([op.__getstate__() for op in ops])
354     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
355
356   def CancelJob(self, job_id):
357     return self.CallMethod(REQ_CANCEL_JOB, job_id)
358
359   def ArchiveJob(self, job_id):
360     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
361
362   def AutoArchiveJobs(self, age):
363     timeout = (DEF_RWTO - 1) / 2
364     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
365
366   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
367     timeout = (DEF_RWTO - 1) / 2
368     while True:
369       result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
370                                (job_id, fields, prev_job_info,
371                                 prev_log_serial, timeout))
372       if result != constants.JOB_NOTCHANGED:
373         break
374     return result
375
376   def QueryJobs(self, job_ids, fields):
377     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
378
379   def QueryInstances(self, names, fields, use_locking):
380     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
381
382   def QueryNodes(self, names, fields, use_locking):
383     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
384
385   def QueryExports(self, nodes, use_locking):
386     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
387
388   def QueryClusterInfo(self):
389     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
390
391   def QueryConfigValues(self, fields):
392     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
393
394
395 # TODO: class Server(object)