Make the snapshot decision based on disk type
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
40
41
42 KEY_METHOD = 'method'
43 KEY_ARGS = 'args'
44 KEY_SUCCESS = "success"
45 KEY_RESULT = "result"
46
47 REQ_SUBMIT_JOB = "SubmitJob"
48 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
49 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
50 REQ_CANCEL_JOB = "CancelJob"
51 REQ_ARCHIVE_JOB = "ArchiveJob"
52 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
53 REQ_QUERY_JOBS = "QueryJobs"
54 REQ_QUERY_INSTANCES = "QueryInstances"
55 REQ_QUERY_NODES = "QueryNodes"
56 REQ_QUERY_EXPORTS = "QueryExports"
57 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
58 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
59 REQ_QUERY_TAGS = "QueryTags"
60 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
61 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
62
63 DEF_CTMO = 10
64 DEF_RWTO = 60
65
66
67 class ProtocolError(Exception):
68   """Denotes an error in the server communication"""
69
70
71 class ConnectionClosedError(ProtocolError):
72   """Connection closed error"""
73
74
75 class TimeoutError(ProtocolError):
76   """Operation timeout error"""
77
78
79 class EncodingError(ProtocolError):
80   """Encoding failure on the sending side"""
81
82
83 class DecodingError(ProtocolError):
84   """Decoding failure on the receiving side"""
85
86
87 class RequestError(ProtocolError):
88   """Error on request
89
90   This signifies an error in the request format or request handling,
91   but not (e.g.) an error in starting up an instance.
92
93   Some common conditions that can trigger this exception:
94     - job submission failed because the job data was wrong
95     - query failed because required fields were missing
96
97   """
98
99
100 class NoMasterError(ProtocolError):
101   """The master cannot be reached
102
103   This means that the master daemon is not running or the socket has
104   been removed.
105
106   """
107
108
109 class Transport:
110   """Low-level transport class.
111
112   This is used on the client side.
113
114   This could be replace by any other class that provides the same
115   semantics to the Client. This means:
116     - can send messages and receive messages
117     - safe for multithreading
118
119   """
120
121   def __init__(self, address, timeouts=None, eom=None):
122     """Constructor for the Client class.
123
124     Arguments:
125       - address: a valid address the the used transport class
126       - timeout: a list of timeouts, to be used on connect and read/write
127       - eom: an identifier to be used as end-of-message which the
128         upper-layer will guarantee that this identifier will not appear
129         in any message
130
131     There are two timeouts used since we might want to wait for a long
132     time for a response, but the connect timeout should be lower.
133
134     If not passed, we use a default of 10 and respectively 60 seconds.
135
136     Note that on reading data, since the timeout applies to an
137     invidual receive, it might be that the total duration is longer
138     than timeout value passed (we make a hard limit at twice the read
139     timeout).
140
141     """
142     self.address = address
143     if timeouts is None:
144       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
145     else:
146       self._ctimeout, self._rwtimeout = timeouts
147
148     self.socket = None
149     self._buffer = ""
150     self._msgs = collections.deque()
151
152     if eom is None:
153       self.eom = '\3'
154     else:
155       self.eom = eom
156
157     try:
158       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
159       self.socket.settimeout(self._ctimeout)
160       try:
161         self.socket.connect(address)
162       except socket.timeout, err:
163         raise TimeoutError("Connect timed out: %s" % str(err))
164       except socket.error, err:
165         if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
166           raise NoMasterError(address)
167         raise
168       self.socket.settimeout(self._rwtimeout)
169     except (socket.error, NoMasterError):
170       if self.socket is not None:
171         self.socket.close()
172       self.socket = None
173       raise
174
175   def _CheckSocket(self):
176     """Make sure we are connected.
177
178     """
179     if self.socket is None:
180       raise ProtocolError("Connection is closed")
181
182   def Send(self, msg):
183     """Send a message.
184
185     This just sends a message and doesn't wait for the response.
186
187     """
188     if self.eom in msg:
189       raise EncodingError("Message terminator found in payload")
190     self._CheckSocket()
191     try:
192       # TODO: sendall is not guaranteed to send everything
193       self.socket.sendall(msg + self.eom)
194     except socket.timeout, err:
195       raise TimeoutError("Sending timeout: %s" % str(err))
196
197   def Recv(self):
198     """Try to receive a message from the socket.
199
200     In case we already have messages queued, we just return from the
201     queue. Otherwise, we try to read data with a _rwtimeout network
202     timeout, and making sure we don't go over 2x_rwtimeout as a global
203     limit.
204
205     """
206     self._CheckSocket()
207     etime = time.time() + self._rwtimeout
208     while not self._msgs:
209       if time.time() > etime:
210         raise TimeoutError("Extended receive timeout")
211       while True:
212         try:
213           data = self.socket.recv(4096)
214         except socket.error, err:
215           if err.args and err.args[0] == errno.EAGAIN:
216             continue
217           raise
218         except socket.timeout, err:
219           raise TimeoutError("Receive timeout: %s" % str(err))
220         break
221       if not data:
222         raise ConnectionClosedError("Connection closed while reading")
223       new_msgs = (self._buffer + data).split(self.eom)
224       self._buffer = new_msgs.pop()
225       self._msgs.extend(new_msgs)
226     return self._msgs.popleft()
227
228   def Call(self, msg):
229     """Send a message and wait for the response.
230
231     This is just a wrapper over Send and Recv.
232
233     """
234     self.Send(msg)
235     return self.Recv()
236
237   def Close(self):
238     """Close the socket"""
239     if self.socket is not None:
240       self.socket.close()
241       self.socket = None
242
243
244 class Client(object):
245   """High-level client implementation.
246
247   This uses a backing Transport-like class on top of which it
248   implements data serialization/deserialization.
249
250   """
251   def __init__(self, address=None, timeouts=None, transport=Transport):
252     """Constructor for the Client class.
253
254     Arguments:
255       - address: a valid address the the used transport class
256       - timeout: a list of timeouts, to be used on connect and read/write
257       - transport: a Transport-like class
258
259
260     If timeout is not passed, the default timeouts of the transport
261     class are used.
262
263     """
264     if address is None:
265       address = constants.MASTER_SOCKET
266     self.address = address
267     self.timeouts = timeouts
268     self.transport_class = transport
269     self.transport = None
270     self._InitTransport()
271
272   def _InitTransport(self):
273     """(Re)initialize the transport if needed.
274
275     """
276     if self.transport is None:
277       self.transport = self.transport_class(self.address,
278                                             timeouts=self.timeouts)
279
280   def _CloseTransport(self):
281     """Close the transport, ignoring errors.
282
283     """
284     if self.transport is None:
285       return
286     try:
287       old_transp = self.transport
288       self.transport = None
289       old_transp.Close()
290     except Exception: # pylint: disable-msg=W0703
291       pass
292
293   def CallMethod(self, method, args):
294     """Send a generic request and return the response.
295
296     """
297     # Build request
298     request = {
299       KEY_METHOD: method,
300       KEY_ARGS: args,
301       }
302
303     # Serialize the request
304     send_data = serializer.DumpJson(request, indent=False)
305
306     # Send request and wait for response
307     try:
308       self._InitTransport()
309       result = self.transport.Call(send_data)
310     except Exception:
311       self._CloseTransport()
312       raise
313
314     # Parse the result
315     try:
316       data = serializer.LoadJson(result)
317     except Exception, err:
318       raise ProtocolError("Error while deserializing response: %s" % str(err))
319
320     # Validate response
321     if (not isinstance(data, dict) or
322         KEY_SUCCESS not in data or
323         KEY_RESULT not in data):
324       raise DecodingError("Invalid response from server: %s" % str(data))
325
326     result = data[KEY_RESULT]
327
328     if not data[KEY_SUCCESS]:
329       errors.MaybeRaise(result)
330       raise RequestError(result)
331
332     return result
333
334   def SetQueueDrainFlag(self, drain_flag):
335     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
336
337   def SetWatcherPause(self, until):
338     return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
339
340   def SubmitJob(self, ops):
341     ops_state = map(lambda op: op.__getstate__(), ops)
342     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
343
344   def SubmitManyJobs(self, jobs):
345     jobs_state = []
346     for ops in jobs:
347       jobs_state.append([op.__getstate__() for op in ops])
348     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
349
350   def CancelJob(self, job_id):
351     return self.CallMethod(REQ_CANCEL_JOB, job_id)
352
353   def ArchiveJob(self, job_id):
354     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
355
356   def AutoArchiveJobs(self, age):
357     timeout = (DEF_RWTO - 1) / 2
358     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
359
360   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
361     timeout = (DEF_RWTO - 1) / 2
362     while True:
363       result = self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
364                                (job_id, fields, prev_job_info,
365                                 prev_log_serial, timeout))
366       if result != constants.JOB_NOTCHANGED:
367         break
368     return result
369
370   def QueryJobs(self, job_ids, fields):
371     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
372
373   def QueryInstances(self, names, fields, use_locking):
374     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
375
376   def QueryNodes(self, names, fields, use_locking):
377     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
378
379   def QueryExports(self, nodes, use_locking):
380     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
381
382   def QueryClusterInfo(self):
383     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
384
385   def QueryConfigValues(self, fields):
386     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
387
388   def QueryTags(self, kind, name):
389     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
390
391
392 # TODO: class Server(object)