KVM: make the kernel and initrd arguments optional
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
40
41
42 KEY_METHOD = 'method'
43 KEY_ARGS = 'args'
44 KEY_SUCCESS = "success"
45 KEY_RESULT = "result"
46
47 REQ_SUBMIT_JOB = "SubmitJob"
48 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
49 REQ_CANCEL_JOB = "CancelJob"
50 REQ_ARCHIVE_JOB = "ArchiveJob"
51 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
52 REQ_QUERY_JOBS = "QueryJobs"
53 REQ_QUERY_INSTANCES = "QueryInstances"
54 REQ_QUERY_NODES = "QueryNodes"
55 REQ_QUERY_EXPORTS = "QueryExports"
56 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
57 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
58
59 DEF_CTMO = 10
60 DEF_RWTO = 60
61
62
63 class ProtocolError(Exception):
64   """Denotes an error in the server communication"""
65
66
67 class ConnectionClosedError(ProtocolError):
68   """Connection closed error"""
69
70
71 class TimeoutError(ProtocolError):
72   """Operation timeout error"""
73
74
75 class EncodingError(ProtocolError):
76   """Encoding failure on the sending side"""
77
78
79 class DecodingError(ProtocolError):
80   """Decoding failure on the receiving side"""
81
82
83 class RequestError(ProtocolError):
84   """Error on request
85
86   This signifies an error in the request format or request handling,
87   but not (e.g.) an error in starting up an instance.
88
89   Some common conditions that can trigger this exception:
90     - job submission failed because the job data was wrong
91     - query failed because required fields were missing
92
93   """
94
95
96 class NoMasterError(ProtocolError):
97   """The master cannot be reached
98
99   This means that the master daemon is not running or the socket has
100   been removed.
101
102   """
103
104
105 class Transport:
106   """Low-level transport class.
107
108   This is used on the client side.
109
110   This could be replace by any other class that provides the same
111   semantics to the Client. This means:
112     - can send messages and receive messages
113     - safe for multithreading
114
115   """
116
117   def __init__(self, address, timeouts=None, eom=None):
118     """Constructor for the Client class.
119
120     Arguments:
121       - address: a valid address the the used transport class
122       - timeout: a list of timeouts, to be used on connect and read/write
123       - eom: an identifier to be used as end-of-message which the
124         upper-layer will guarantee that this identifier will not appear
125         in any message
126
127     There are two timeouts used since we might want to wait for a long
128     time for a response, but the connect timeout should be lower.
129
130     If not passed, we use a default of 10 and respectively 60 seconds.
131
132     Note that on reading data, since the timeout applies to an
133     invidual receive, it might be that the total duration is longer
134     than timeout value passed (we make a hard limit at twice the read
135     timeout).
136
137     """
138     self.address = address
139     if timeouts is None:
140       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
141     else:
142       self._ctimeout, self._rwtimeout = timeouts
143
144     self.socket = None
145     self._buffer = ""
146     self._msgs = collections.deque()
147
148     if eom is None:
149       self.eom = '\3'
150     else:
151       self.eom = eom
152
153     try:
154       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
155       self.socket.settimeout(self._ctimeout)
156       try:
157         self.socket.connect(address)
158       except socket.timeout, err:
159         raise TimeoutError("Connect timed out: %s" % str(err))
160       except socket.error, err:
161         if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
162           raise NoMasterError((address,))
163         raise
164       self.socket.settimeout(self._rwtimeout)
165     except (socket.error, NoMasterError):
166       if self.socket is not None:
167         self.socket.close()
168       self.socket = None
169       raise
170
171   def _CheckSocket(self):
172     """Make sure we are connected.
173
174     """
175     if self.socket is None:
176       raise ProtocolError("Connection is closed")
177
178   def Send(self, msg):
179     """Send a message.
180
181     This just sends a message and doesn't wait for the response.
182
183     """
184     if self.eom in msg:
185       raise EncodingError("Message terminator found in payload")
186     self._CheckSocket()
187     try:
188       self.socket.sendall(msg + self.eom)
189     except socket.timeout, err:
190       raise TimeoutError("Sending timeout: %s" % str(err))
191
192   def Recv(self):
193     """Try to receive a messae from the socket.
194
195     In case we already have messages queued, we just return from the
196     queue. Otherwise, we try to read data with a _rwtimeout network
197     timeout, and making sure we don't go over 2x_rwtimeout as a global
198     limit.
199
200     """
201     self._CheckSocket()
202     etime = time.time() + self._rwtimeout
203     while not self._msgs:
204       if time.time() > etime:
205         raise TimeoutError("Extended receive timeout")
206       try:
207         data = self.socket.recv(4096)
208       except socket.timeout, err:
209         raise TimeoutError("Receive timeout: %s" % str(err))
210       if not data:
211         raise ConnectionClosedError("Connection closed while reading")
212       new_msgs = (self._buffer + data).split(self.eom)
213       self._buffer = new_msgs.pop()
214       self._msgs.extend(new_msgs)
215     return self._msgs.popleft()
216
217   def Call(self, msg):
218     """Send a message and wait for the response.
219
220     This is just a wrapper over Send and Recv.
221
222     """
223     self.Send(msg)
224     return self.Recv()
225
226   def Close(self):
227     """Close the socket"""
228     if self.socket is not None:
229       self.socket.close()
230       self.socket = None
231
232
233 class Client(object):
234   """High-level client implementation.
235
236   This uses a backing Transport-like class on top of which it
237   implements data serialization/deserialization.
238
239   """
240   def __init__(self, address=None, timeouts=None, transport=Transport):
241     """Constructor for the Client class.
242
243     Arguments:
244       - address: a valid address the the used transport class
245       - timeout: a list of timeouts, to be used on connect and read/write
246       - transport: a Transport-like class
247
248
249     If timeout is not passed, the default timeouts of the transport
250     class are used.
251
252     """
253     if address is None:
254       address = constants.MASTER_SOCKET
255     self.address = address
256     self.timeouts = timeouts
257     self.transport_class = transport
258     self.transport = None
259     self._InitTransport()
260
261   def _InitTransport(self):
262     """(Re)initialize the transport if needed.
263
264     """
265     if self.transport is None:
266       self.transport = self.transport_class(self.address,
267                                             timeouts=self.timeouts)
268
269   def _CloseTransport(self):
270     """Close the transport, ignoring errors.
271
272     """
273     if self.transport is None:
274       return
275     try:
276       old_transp = self.transport
277       self.transport = None
278       old_transp.Close()
279     except Exception, err:
280       pass
281
282   def CallMethod(self, method, args):
283     """Send a generic request and return the response.
284
285     """
286     # Build request
287     request = {
288       KEY_METHOD: method,
289       KEY_ARGS: args,
290       }
291
292     # Serialize the request
293     send_data = serializer.DumpJson(request, indent=False)
294
295     # Send request and wait for response
296     try:
297       self._InitTransport()
298       result = self.transport.Call(send_data)
299     except Exception:
300       self._CloseTransport()
301       raise
302
303     # Parse the result
304     try:
305       data = serializer.LoadJson(result)
306     except Exception, err:
307       raise ProtocolError("Error while deserializing response: %s" % str(err))
308
309     # Validate response
310     if (not isinstance(data, dict) or
311         KEY_SUCCESS not in data or
312         KEY_RESULT not in data):
313       raise DecodingError("Invalid response from server: %s" % str(data))
314
315     result = data[KEY_RESULT]
316
317     if not data[KEY_SUCCESS]:
318       # TODO: decide on a standard exception
319       if (isinstance(result, (tuple, list)) and len(result) == 2 and
320           isinstance(result[1], (tuple, list))):
321         # custom ganeti errors
322         err_class = errors.GetErrorClass(result[0])
323         if err_class is not None:
324           raise err_class, tuple(result[1])
325
326       raise RequestError(result)
327
328     return result
329
330   def SetQueueDrainFlag(self, drain_flag):
331     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
332
333   def SubmitJob(self, ops):
334     ops_state = map(lambda op: op.__getstate__(), ops)
335     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
336
337   def CancelJob(self, job_id):
338     return self.CallMethod(REQ_CANCEL_JOB, job_id)
339
340   def ArchiveJob(self, job_id):
341     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
342
343   def AutoArchiveJobs(self, age):
344     timeout = (DEF_RWTO - 1) / 2
345     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
346
347   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
348     timeout = (DEF_RWTO - 1) / 2
349     while True:
350       result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
351                                (job_id, fields, prev_job_info,
352                                 prev_log_serial, timeout))
353       if result != constants.JOB_NOTCHANGED:
354         break
355     return result
356
357   def QueryJobs(self, job_ids, fields):
358     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
359
360   def QueryInstances(self, names, fields):
361     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields))
362
363   def QueryNodes(self, names, fields):
364     return self.CallMethod(REQ_QUERY_NODES, (names, fields))
365
366   def QueryExports(self, nodes):
367     return self.CallMethod(REQ_QUERY_EXPORTS, nodes)
368
369   def QueryConfigValues(self, fields):
370     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
371
372
373 # TODO: class Server(object)