Enable lockless node queries
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
40
41
42 KEY_METHOD = 'method'
43 KEY_ARGS = 'args'
44 KEY_SUCCESS = "success"
45 KEY_RESULT = "result"
46
47 REQ_SUBMIT_JOB = "SubmitJob"
48 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
49 REQ_CANCEL_JOB = "CancelJob"
50 REQ_ARCHIVE_JOB = "ArchiveJob"
51 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
52 REQ_QUERY_JOBS = "QueryJobs"
53 REQ_QUERY_INSTANCES = "QueryInstances"
54 REQ_QUERY_NODES = "QueryNodes"
55 REQ_QUERY_EXPORTS = "QueryExports"
56 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
57 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
58 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
59
60 DEF_CTMO = 10
61 DEF_RWTO = 60
62
63
64 class ProtocolError(Exception):
65   """Denotes an error in the server communication"""
66
67
68 class ConnectionClosedError(ProtocolError):
69   """Connection closed error"""
70
71
72 class TimeoutError(ProtocolError):
73   """Operation timeout error"""
74
75
76 class EncodingError(ProtocolError):
77   """Encoding failure on the sending side"""
78
79
80 class DecodingError(ProtocolError):
81   """Decoding failure on the receiving side"""
82
83
84 class RequestError(ProtocolError):
85   """Error on request
86
87   This signifies an error in the request format or request handling,
88   but not (e.g.) an error in starting up an instance.
89
90   Some common conditions that can trigger this exception:
91     - job submission failed because the job data was wrong
92     - query failed because required fields were missing
93
94   """
95
96
97 class NoMasterError(ProtocolError):
98   """The master cannot be reached
99
100   This means that the master daemon is not running or the socket has
101   been removed.
102
103   """
104
105
106 class Transport:
107   """Low-level transport class.
108
109   This is used on the client side.
110
111   This could be replace by any other class that provides the same
112   semantics to the Client. This means:
113     - can send messages and receive messages
114     - safe for multithreading
115
116   """
117
118   def __init__(self, address, timeouts=None, eom=None):
119     """Constructor for the Client class.
120
121     Arguments:
122       - address: a valid address the the used transport class
123       - timeout: a list of timeouts, to be used on connect and read/write
124       - eom: an identifier to be used as end-of-message which the
125         upper-layer will guarantee that this identifier will not appear
126         in any message
127
128     There are two timeouts used since we might want to wait for a long
129     time for a response, but the connect timeout should be lower.
130
131     If not passed, we use a default of 10 and respectively 60 seconds.
132
133     Note that on reading data, since the timeout applies to an
134     invidual receive, it might be that the total duration is longer
135     than timeout value passed (we make a hard limit at twice the read
136     timeout).
137
138     """
139     self.address = address
140     if timeouts is None:
141       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
142     else:
143       self._ctimeout, self._rwtimeout = timeouts
144
145     self.socket = None
146     self._buffer = ""
147     self._msgs = collections.deque()
148
149     if eom is None:
150       self.eom = '\3'
151     else:
152       self.eom = eom
153
154     try:
155       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
156       self.socket.settimeout(self._ctimeout)
157       try:
158         self.socket.connect(address)
159       except socket.timeout, err:
160         raise TimeoutError("Connect timed out: %s" % str(err))
161       except socket.error, err:
162         if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
163           raise NoMasterError((address,))
164         raise
165       self.socket.settimeout(self._rwtimeout)
166     except (socket.error, NoMasterError):
167       if self.socket is not None:
168         self.socket.close()
169       self.socket = None
170       raise
171
172   def _CheckSocket(self):
173     """Make sure we are connected.
174
175     """
176     if self.socket is None:
177       raise ProtocolError("Connection is closed")
178
179   def Send(self, msg):
180     """Send a message.
181
182     This just sends a message and doesn't wait for the response.
183
184     """
185     if self.eom in msg:
186       raise EncodingError("Message terminator found in payload")
187     self._CheckSocket()
188     try:
189       self.socket.sendall(msg + self.eom)
190     except socket.timeout, err:
191       raise TimeoutError("Sending timeout: %s" % str(err))
192
193   def Recv(self):
194     """Try to receive a messae from the socket.
195
196     In case we already have messages queued, we just return from the
197     queue. Otherwise, we try to read data with a _rwtimeout network
198     timeout, and making sure we don't go over 2x_rwtimeout as a global
199     limit.
200
201     """
202     self._CheckSocket()
203     etime = time.time() + self._rwtimeout
204     while not self._msgs:
205       if time.time() > etime:
206         raise TimeoutError("Extended receive timeout")
207       try:
208         data = self.socket.recv(4096)
209       except socket.timeout, err:
210         raise TimeoutError("Receive timeout: %s" % str(err))
211       if not data:
212         raise ConnectionClosedError("Connection closed while reading")
213       new_msgs = (self._buffer + data).split(self.eom)
214       self._buffer = new_msgs.pop()
215       self._msgs.extend(new_msgs)
216     return self._msgs.popleft()
217
218   def Call(self, msg):
219     """Send a message and wait for the response.
220
221     This is just a wrapper over Send and Recv.
222
223     """
224     self.Send(msg)
225     return self.Recv()
226
227   def Close(self):
228     """Close the socket"""
229     if self.socket is not None:
230       self.socket.close()
231       self.socket = None
232
233
234 class Client(object):
235   """High-level client implementation.
236
237   This uses a backing Transport-like class on top of which it
238   implements data serialization/deserialization.
239
240   """
241   def __init__(self, address=None, timeouts=None, transport=Transport):
242     """Constructor for the Client class.
243
244     Arguments:
245       - address: a valid address the the used transport class
246       - timeout: a list of timeouts, to be used on connect and read/write
247       - transport: a Transport-like class
248
249
250     If timeout is not passed, the default timeouts of the transport
251     class are used.
252
253     """
254     if address is None:
255       address = constants.MASTER_SOCKET
256     self.address = address
257     self.timeouts = timeouts
258     self.transport_class = transport
259     self.transport = None
260     self._InitTransport()
261
262   def _InitTransport(self):
263     """(Re)initialize the transport if needed.
264
265     """
266     if self.transport is None:
267       self.transport = self.transport_class(self.address,
268                                             timeouts=self.timeouts)
269
270   def _CloseTransport(self):
271     """Close the transport, ignoring errors.
272
273     """
274     if self.transport is None:
275       return
276     try:
277       old_transp = self.transport
278       self.transport = None
279       old_transp.Close()
280     except Exception, err:
281       pass
282
283   def CallMethod(self, method, args):
284     """Send a generic request and return the response.
285
286     """
287     # Build request
288     request = {
289       KEY_METHOD: method,
290       KEY_ARGS: args,
291       }
292
293     # Serialize the request
294     send_data = serializer.DumpJson(request, indent=False)
295
296     # Send request and wait for response
297     try:
298       self._InitTransport()
299       result = self.transport.Call(send_data)
300     except Exception:
301       self._CloseTransport()
302       raise
303
304     # Parse the result
305     try:
306       data = serializer.LoadJson(result)
307     except Exception, err:
308       raise ProtocolError("Error while deserializing response: %s" % str(err))
309
310     # Validate response
311     if (not isinstance(data, dict) or
312         KEY_SUCCESS not in data or
313         KEY_RESULT not in data):
314       raise DecodingError("Invalid response from server: %s" % str(data))
315
316     result = data[KEY_RESULT]
317
318     if not data[KEY_SUCCESS]:
319       # TODO: decide on a standard exception
320       if (isinstance(result, (tuple, list)) and len(result) == 2 and
321           isinstance(result[1], (tuple, list))):
322         # custom ganeti errors
323         err_class = errors.GetErrorClass(result[0])
324         if err_class is not None:
325           raise err_class, tuple(result[1])
326
327       raise RequestError(result)
328
329     return result
330
331   def SetQueueDrainFlag(self, drain_flag):
332     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
333
334   def SubmitJob(self, ops):
335     ops_state = map(lambda op: op.__getstate__(), ops)
336     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
337
338   def CancelJob(self, job_id):
339     return self.CallMethod(REQ_CANCEL_JOB, job_id)
340
341   def ArchiveJob(self, job_id):
342     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
343
344   def AutoArchiveJobs(self, age):
345     timeout = (DEF_RWTO - 1) / 2
346     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
347
348   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
349     timeout = (DEF_RWTO - 1) / 2
350     while True:
351       result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
352                                (job_id, fields, prev_job_info,
353                                 prev_log_serial, timeout))
354       if result != constants.JOB_NOTCHANGED:
355         break
356     return result
357
358   def QueryJobs(self, job_ids, fields):
359     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
360
361   def QueryInstances(self, names, fields, use_locking):
362     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
363
364   def QueryNodes(self, names, fields, use_locking):
365     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
366
367   def QueryExports(self, nodes, use_locking):
368     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
369
370   def QueryClusterInfo(self):
371     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
372
373   def QueryConfigValues(self, fields):
374     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
375
376
377 # TODO: class Server(object)