Release ganeti 2.0~alpha1
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocl. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also be used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
40
41
42 KEY_METHOD = 'method'
43 KEY_ARGS = 'args'
44 KEY_SUCCESS = "success"
45 KEY_RESULT = "result"
46
47 REQ_SUBMIT_JOB = "SubmitJob"
48 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
49 REQ_CANCEL_JOB = "CancelJob"
50 REQ_ARCHIVE_JOB = "ArchiveJob"
51 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
52 REQ_QUERY_JOBS = "QueryJobs"
53 REQ_QUERY_INSTANCES = "QueryInstances"
54 REQ_QUERY_NODES = "QueryNodes"
55 REQ_QUERY_EXPORTS = "QueryExports"
56 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
57 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
58
59 DEF_CTMO = 10
60 DEF_RWTO = 60
61
62
63 class ProtocolError(Exception):
64   """Denotes an error in the server communication"""
65
66
67 class ConnectionClosedError(ProtocolError):
68   """Connection closed error"""
69
70
71 class TimeoutError(ProtocolError):
72   """Operation timeout error"""
73
74
75 class EncodingError(ProtocolError):
76   """Encoding failure on the sending side"""
77
78
79 class DecodingError(ProtocolError):
80   """Decoding failure on the receiving side"""
81
82
83 class RequestError(ProtocolError):
84   """Error on request
85
86   This signifies an error in the request format or request handling,
87   but not (e.g.) an error in starting up an instance.
88
89   Some common conditions that can trigger this exception:
90     - job submission failed because the job data was wrong
91     - query failed because required fields were missing
92
93   """
94
95
96 class NoMasterError(ProtocolError):
97   """The master cannot be reached
98
99   This means that the master daemon is not running or the socket has
100   been removed.
101
102   """
103
104
105 class Transport:
106   """Low-level transport class.
107
108   This is used on the client side.
109
110   This could be replace by any other class that provides the same
111   semantics to the Client. This means:
112     - can send messages and receive messages
113     - safe for multithreading
114
115   """
116
117   def __init__(self, address, timeouts=None, eom=None):
118     """Constructor for the Client class.
119
120     Arguments:
121       - address: a valid address the the used transport class
122       - timeout: a list of timeouts, to be used on connect and read/write
123       - eom: an identifier to be used as end-of-message which the
124         upper-layer will guarantee that this identifier will not appear
125         in any message
126
127     There are two timeouts used since we might want to wait for a long
128     time for a response, but the connect timeout should be lower.
129
130     If not passed, we use a default of 10 and respectively 60 seconds.
131
132     Note that on reading data, since the timeout applies to an
133     invidual receive, it might be that the total duration is longer
134     than timeout value passed (we make a hard limit at twice the read
135     timeout).
136
137     """
138     self.address = address
139     if timeouts is None:
140       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
141     else:
142       self._ctimeout, self._rwtimeout = timeouts
143
144     self.socket = None
145     self._buffer = ""
146     self._msgs = collections.deque()
147
148     if eom is None:
149       self.eom = '\3'
150     else:
151       self.eom = eom
152
153     try:
154       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
155       self.socket.settimeout(self._ctimeout)
156       try:
157         self.socket.connect(address)
158       except socket.timeout, err:
159         raise TimeoutError("Connect timed out: %s" % str(err))
160       except socket.error, err:
161         if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
162           raise NoMasterError((address,))
163         raise
164       self.socket.settimeout(self._rwtimeout)
165     except (socket.error, NoMasterError):
166       if self.socket is not None:
167         self.socket.close()
168       self.socket = None
169       raise
170
171   def _CheckSocket(self):
172     """Make sure we are connected.
173
174     """
175     if self.socket is None:
176       raise ProtocolError("Connection is closed")
177
178   def Send(self, msg):
179     """Send a message.
180
181     This just sends a message and doesn't wait for the response.
182
183     """
184     if self.eom in msg:
185       raise EncodingError("Message terminator found in payload")
186     self._CheckSocket()
187     try:
188       self.socket.sendall(msg + self.eom)
189     except socket.timeout, err:
190       raise TimeoutError("Sending timeout: %s" % str(err))
191
192   def Recv(self):
193     """Try to receive a messae from the socket.
194
195     In case we already have messages queued, we just return from the
196     queue. Otherwise, we try to read data with a _rwtimeout network
197     timeout, and making sure we don't go over 2x_rwtimeout as a global
198     limit.
199
200     """
201     self._CheckSocket()
202     etime = time.time() + self._rwtimeout
203     while not self._msgs:
204       if time.time() > etime:
205         raise TimeoutError("Extended receive timeout")
206       try:
207         data = self.socket.recv(4096)
208       except socket.timeout, err:
209         raise TimeoutError("Receive timeout: %s" % str(err))
210       if not data:
211         raise ConnectionClosedError("Connection closed while reading")
212       new_msgs = (self._buffer + data).split(self.eom)
213       self._buffer = new_msgs.pop()
214       self._msgs.extend(new_msgs)
215     return self._msgs.popleft()
216
217   def Call(self, msg):
218     """Send a message and wait for the response.
219
220     This is just a wrapper over Send and Recv.
221
222     """
223     self.Send(msg)
224     return self.Recv()
225
226   def Close(self):
227     """Close the socket"""
228     if self.socket is not None:
229       self.socket.close()
230       self.socket = None
231
232
233 class Client(object):
234   """High-level client implementation.
235
236   This uses a backing Transport-like class on top of which it
237   implements data serialization/deserialization.
238
239   """
240   def __init__(self, address=None, timeouts=None, transport=Transport):
241     """Constructor for the Client class.
242
243     Arguments:
244       - address: a valid address the the used transport class
245       - timeout: a list of timeouts, to be used on connect and read/write
246       - transport: a Transport-like class
247
248
249     If timeout is not passed, the default timeouts of the transport
250     class are used.
251
252     """
253     if address is None:
254       address = constants.MASTER_SOCKET
255     self.transport = transport(address, timeouts=timeouts)
256
257   def CallMethod(self, method, args):
258     """Send a generic request and return the response.
259
260     """
261     # Build request
262     request = {
263       KEY_METHOD: method,
264       KEY_ARGS: args,
265       }
266
267     # Send request and wait for response
268     result = self.transport.Call(serializer.DumpJson(request, indent=False))
269     try:
270       data = serializer.LoadJson(result)
271     except Exception, err:
272       raise ProtocolError("Error while deserializing response: %s" % str(err))
273
274     # Validate response
275     if (not isinstance(data, dict) or
276         KEY_SUCCESS not in data or
277         KEY_RESULT not in data):
278       raise DecodingError("Invalid response from server: %s" % str(data))
279
280     result = data[KEY_RESULT]
281
282     if not data[KEY_SUCCESS]:
283       # TODO: decide on a standard exception
284       if (isinstance(result, (tuple, list)) and len(result) == 2 and
285           isinstance(result[1], (tuple, list))):
286         # custom ganeti errors
287         err_class = errors.GetErrorClass(result[0])
288         if err_class is not None:
289           raise err_class, tuple(result[1])
290
291       raise RequestError(result)
292
293     return result
294
295   def SetQueueDrainFlag(self, drain_flag):
296     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
297
298   def SubmitJob(self, ops):
299     ops_state = map(lambda op: op.__getstate__(), ops)
300     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
301
302   def CancelJob(self, job_id):
303     return self.CallMethod(REQ_CANCEL_JOB, job_id)
304
305   def ArchiveJob(self, job_id):
306     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
307
308   def AutoArchiveJobs(self, age):
309     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, age)
310
311   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
312     timeout = (DEF_RWTO - 1) / 2
313     while True:
314       result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
315                                (job_id, fields, prev_job_info,
316                                 prev_log_serial, timeout))
317       if result != constants.JOB_NOTCHANGED:
318         break
319     return result
320
321   def QueryJobs(self, job_ids, fields):
322     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
323
324   def QueryInstances(self, names, fields):
325     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields))
326
327   def QueryNodes(self, names, fields):
328     return self.CallMethod(REQ_QUERY_NODES, (names, fields))
329
330   def QueryExports(self, nodes):
331     return self.CallMethod(REQ_QUERY_EXPORTS, nodes)
332
333   def QueryConfigValues(self, fields):
334     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
335
336
337 # TODO: class Server(object)