KVM: NIC parameters
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
40
41
42 KEY_METHOD = 'method'
43 KEY_ARGS = 'args'
44 KEY_SUCCESS = "success"
45 KEY_RESULT = "result"
46
47 REQ_SUBMIT_JOB = "SubmitJob"
48 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
49 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
50 REQ_CANCEL_JOB = "CancelJob"
51 REQ_ARCHIVE_JOB = "ArchiveJob"
52 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
53 REQ_QUERY_JOBS = "QueryJobs"
54 REQ_QUERY_INSTANCES = "QueryInstances"
55 REQ_QUERY_NODES = "QueryNodes"
56 REQ_QUERY_EXPORTS = "QueryExports"
57 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
58 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
59 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
60
61 DEF_CTMO = 10
62 DEF_RWTO = 60
63
64
65 class ProtocolError(Exception):
66   """Denotes an error in the server communication"""
67
68
69 class ConnectionClosedError(ProtocolError):
70   """Connection closed error"""
71
72
73 class TimeoutError(ProtocolError):
74   """Operation timeout error"""
75
76
77 class EncodingError(ProtocolError):
78   """Encoding failure on the sending side"""
79
80
81 class DecodingError(ProtocolError):
82   """Decoding failure on the receiving side"""
83
84
85 class RequestError(ProtocolError):
86   """Error on request
87
88   This signifies an error in the request format or request handling,
89   but not (e.g.) an error in starting up an instance.
90
91   Some common conditions that can trigger this exception:
92     - job submission failed because the job data was wrong
93     - query failed because required fields were missing
94
95   """
96
97
98 class NoMasterError(ProtocolError):
99   """The master cannot be reached
100
101   This means that the master daemon is not running or the socket has
102   been removed.
103
104   """
105
106
107 class Transport:
108   """Low-level transport class.
109
110   This is used on the client side.
111
112   This could be replace by any other class that provides the same
113   semantics to the Client. This means:
114     - can send messages and receive messages
115     - safe for multithreading
116
117   """
118
119   def __init__(self, address, timeouts=None, eom=None):
120     """Constructor for the Client class.
121
122     Arguments:
123       - address: a valid address the the used transport class
124       - timeout: a list of timeouts, to be used on connect and read/write
125       - eom: an identifier to be used as end-of-message which the
126         upper-layer will guarantee that this identifier will not appear
127         in any message
128
129     There are two timeouts used since we might want to wait for a long
130     time for a response, but the connect timeout should be lower.
131
132     If not passed, we use a default of 10 and respectively 60 seconds.
133
134     Note that on reading data, since the timeout applies to an
135     invidual receive, it might be that the total duration is longer
136     than timeout value passed (we make a hard limit at twice the read
137     timeout).
138
139     """
140     self.address = address
141     if timeouts is None:
142       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
143     else:
144       self._ctimeout, self._rwtimeout = timeouts
145
146     self.socket = None
147     self._buffer = ""
148     self._msgs = collections.deque()
149
150     if eom is None:
151       self.eom = '\3'
152     else:
153       self.eom = eom
154
155     try:
156       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
157       self.socket.settimeout(self._ctimeout)
158       try:
159         self.socket.connect(address)
160       except socket.timeout, err:
161         raise TimeoutError("Connect timed out: %s" % str(err))
162       except socket.error, err:
163         if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
164           raise NoMasterError((address,))
165         raise
166       self.socket.settimeout(self._rwtimeout)
167     except (socket.error, NoMasterError):
168       if self.socket is not None:
169         self.socket.close()
170       self.socket = None
171       raise
172
173   def _CheckSocket(self):
174     """Make sure we are connected.
175
176     """
177     if self.socket is None:
178       raise ProtocolError("Connection is closed")
179
180   def Send(self, msg):
181     """Send a message.
182
183     This just sends a message and doesn't wait for the response.
184
185     """
186     if self.eom in msg:
187       raise EncodingError("Message terminator found in payload")
188     self._CheckSocket()
189     try:
190       self.socket.sendall(msg + self.eom)
191     except socket.timeout, err:
192       raise TimeoutError("Sending timeout: %s" % str(err))
193
194   def Recv(self):
195     """Try to receive a messae from the socket.
196
197     In case we already have messages queued, we just return from the
198     queue. Otherwise, we try to read data with a _rwtimeout network
199     timeout, and making sure we don't go over 2x_rwtimeout as a global
200     limit.
201
202     """
203     self._CheckSocket()
204     etime = time.time() + self._rwtimeout
205     while not self._msgs:
206       if time.time() > etime:
207         raise TimeoutError("Extended receive timeout")
208       try:
209         data = self.socket.recv(4096)
210       except socket.timeout, err:
211         raise TimeoutError("Receive timeout: %s" % str(err))
212       if not data:
213         raise ConnectionClosedError("Connection closed while reading")
214       new_msgs = (self._buffer + data).split(self.eom)
215       self._buffer = new_msgs.pop()
216       self._msgs.extend(new_msgs)
217     return self._msgs.popleft()
218
219   def Call(self, msg):
220     """Send a message and wait for the response.
221
222     This is just a wrapper over Send and Recv.
223
224     """
225     self.Send(msg)
226     return self.Recv()
227
228   def Close(self):
229     """Close the socket"""
230     if self.socket is not None:
231       self.socket.close()
232       self.socket = None
233
234
235 class Client(object):
236   """High-level client implementation.
237
238   This uses a backing Transport-like class on top of which it
239   implements data serialization/deserialization.
240
241   """
242   def __init__(self, address=None, timeouts=None, transport=Transport):
243     """Constructor for the Client class.
244
245     Arguments:
246       - address: a valid address the the used transport class
247       - timeout: a list of timeouts, to be used on connect and read/write
248       - transport: a Transport-like class
249
250
251     If timeout is not passed, the default timeouts of the transport
252     class are used.
253
254     """
255     if address is None:
256       address = constants.MASTER_SOCKET
257     self.address = address
258     self.timeouts = timeouts
259     self.transport_class = transport
260     self.transport = None
261     self._InitTransport()
262
263   def _InitTransport(self):
264     """(Re)initialize the transport if needed.
265
266     """
267     if self.transport is None:
268       self.transport = self.transport_class(self.address,
269                                             timeouts=self.timeouts)
270
271   def _CloseTransport(self):
272     """Close the transport, ignoring errors.
273
274     """
275     if self.transport is None:
276       return
277     try:
278       old_transp = self.transport
279       self.transport = None
280       old_transp.Close()
281     except Exception, err:
282       pass
283
284   def CallMethod(self, method, args):
285     """Send a generic request and return the response.
286
287     """
288     # Build request
289     request = {
290       KEY_METHOD: method,
291       KEY_ARGS: args,
292       }
293
294     # Serialize the request
295     send_data = serializer.DumpJson(request, indent=False)
296
297     # Send request and wait for response
298     try:
299       self._InitTransport()
300       result = self.transport.Call(send_data)
301     except Exception:
302       self._CloseTransport()
303       raise
304
305     # Parse the result
306     try:
307       data = serializer.LoadJson(result)
308     except Exception, err:
309       raise ProtocolError("Error while deserializing response: %s" % str(err))
310
311     # Validate response
312     if (not isinstance(data, dict) or
313         KEY_SUCCESS not in data or
314         KEY_RESULT not in data):
315       raise DecodingError("Invalid response from server: %s" % str(data))
316
317     result = data[KEY_RESULT]
318
319     if not data[KEY_SUCCESS]:
320       # TODO: decide on a standard exception
321       if (isinstance(result, (tuple, list)) and len(result) == 2 and
322           isinstance(result[1], (tuple, list))):
323         # custom ganeti errors
324         err_class = errors.GetErrorClass(result[0])
325         if err_class is not None:
326           raise err_class, tuple(result[1])
327
328       raise RequestError(result)
329
330     return result
331
332   def SetQueueDrainFlag(self, drain_flag):
333     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
334
335   def SubmitJob(self, ops):
336     ops_state = map(lambda op: op.__getstate__(), ops)
337     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
338
339   def SubmitManyJobs(self, jobs):
340     jobs_state = []
341     for ops in jobs:
342       jobs_state.append([op.__getstate__() for op in ops])
343     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
344
345   def CancelJob(self, job_id):
346     return self.CallMethod(REQ_CANCEL_JOB, job_id)
347
348   def ArchiveJob(self, job_id):
349     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
350
351   def AutoArchiveJobs(self, age):
352     timeout = (DEF_RWTO - 1) / 2
353     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
354
355   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
356     timeout = (DEF_RWTO - 1) / 2
357     while True:
358       result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
359                                (job_id, fields, prev_job_info,
360                                 prev_log_serial, timeout))
361       if result != constants.JOB_NOTCHANGED:
362         break
363     return result
364
365   def QueryJobs(self, job_ids, fields):
366     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
367
368   def QueryInstances(self, names, fields, use_locking):
369     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
370
371   def QueryNodes(self, names, fields, use_locking):
372     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
373
374   def QueryExports(self, nodes, use_locking):
375     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
376
377   def QueryClusterInfo(self):
378     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
379
380   def QueryConfigValues(self, fields):
381     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
382
383
384 # TODO: class Server(object)