Abstract the LUXI eom into a constant
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
40 from ganeti import utils
41
42
43 KEY_METHOD = 'method'
44 KEY_ARGS = 'args'
45 KEY_SUCCESS = "success"
46 KEY_RESULT = "result"
47
48 REQ_SUBMIT_JOB = "SubmitJob"
49 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
50 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
51 REQ_CANCEL_JOB = "CancelJob"
52 REQ_ARCHIVE_JOB = "ArchiveJob"
53 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
54 REQ_QUERY_JOBS = "QueryJobs"
55 REQ_QUERY_INSTANCES = "QueryInstances"
56 REQ_QUERY_NODES = "QueryNodes"
57 REQ_QUERY_EXPORTS = "QueryExports"
58 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
59 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
60 REQ_QUERY_TAGS = "QueryTags"
61 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
62 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
63
64 DEF_CTMO = 10
65 DEF_RWTO = 60
66
67 # WaitForJobChange timeout
68 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
69
70
71 class ProtocolError(Exception):
72   """Denotes an error in the server communication"""
73
74
75 class ConnectionClosedError(ProtocolError):
76   """Connection closed error"""
77
78
79 class TimeoutError(ProtocolError):
80   """Operation timeout error"""
81
82
83 class EncodingError(ProtocolError):
84   """Encoding failure on the sending side"""
85
86
87 class DecodingError(ProtocolError):
88   """Decoding failure on the receiving side"""
89
90
91 class RequestError(ProtocolError):
92   """Error on request
93
94   This signifies an error in the request format or request handling,
95   but not (e.g.) an error in starting up an instance.
96
97   Some common conditions that can trigger this exception:
98     - job submission failed because the job data was wrong
99     - query failed because required fields were missing
100
101   """
102
103
104 class NoMasterError(ProtocolError):
105   """The master cannot be reached
106
107   This means that the master daemon is not running or the socket has
108   been removed.
109
110   """
111
112
113 class Transport:
114   """Low-level transport class.
115
116   This is used on the client side.
117
118   This could be replace by any other class that provides the same
119   semantics to the Client. This means:
120     - can send messages and receive messages
121     - safe for multithreading
122
123   """
124
125   def __init__(self, address, timeouts=None):
126     """Constructor for the Client class.
127
128     Arguments:
129       - address: a valid address the the used transport class
130       - timeout: a list of timeouts, to be used on connect and read/write
131
132     There are two timeouts used since we might want to wait for a long
133     time for a response, but the connect timeout should be lower.
134
135     If not passed, we use a default of 10 and respectively 60 seconds.
136
137     Note that on reading data, since the timeout applies to an
138     invidual receive, it might be that the total duration is longer
139     than timeout value passed (we make a hard limit at twice the read
140     timeout).
141
142     """
143     self.address = address
144     if timeouts is None:
145       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
146     else:
147       self._ctimeout, self._rwtimeout = timeouts
148
149     self.socket = None
150     self._buffer = ""
151     self._msgs = collections.deque()
152
153     try:
154       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
155
156       # Try to connect
157       try:
158         utils.Retry(self._Connect, 1.0, self._ctimeout,
159                     args=(self.socket, address, self._ctimeout))
160       except utils.RetryTimeout:
161         raise TimeoutError("Connect timed out")
162
163       self.socket.settimeout(self._rwtimeout)
164     except (socket.error, NoMasterError):
165       if self.socket is not None:
166         self.socket.close()
167       self.socket = None
168       raise
169
170   @staticmethod
171   def _Connect(sock, address, timeout):
172     sock.settimeout(timeout)
173     try:
174       sock.connect(address)
175     except socket.timeout, err:
176       raise TimeoutError("Connect timed out: %s" % str(err))
177     except socket.error, err:
178       if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
179         raise NoMasterError(address)
180       if err.args[0] == errno.EAGAIN:
181         # Server's socket backlog is full at the moment
182         raise utils.RetryAgain()
183       raise
184
185   def _CheckSocket(self):
186     """Make sure we are connected.
187
188     """
189     if self.socket is None:
190       raise ProtocolError("Connection is closed")
191
192   def Send(self, msg):
193     """Send a message.
194
195     This just sends a message and doesn't wait for the response.
196
197     """
198     if constants.LUXI_EOM in msg:
199       raise EncodingError("Message terminator found in payload")
200     self._CheckSocket()
201     try:
202       # TODO: sendall is not guaranteed to send everything
203       self.socket.sendall(msg + constants.LUXI_EOM)
204     except socket.timeout, err:
205       raise TimeoutError("Sending timeout: %s" % str(err))
206
207   def Recv(self):
208     """Try to receive a message from the socket.
209
210     In case we already have messages queued, we just return from the
211     queue. Otherwise, we try to read data with a _rwtimeout network
212     timeout, and making sure we don't go over 2x_rwtimeout as a global
213     limit.
214
215     """
216     self._CheckSocket()
217     etime = time.time() + self._rwtimeout
218     while not self._msgs:
219       if time.time() > etime:
220         raise TimeoutError("Extended receive timeout")
221       while True:
222         try:
223           data = self.socket.recv(4096)
224         except socket.error, err:
225           if err.args and err.args[0] == errno.EAGAIN:
226             continue
227           raise
228         except socket.timeout, err:
229           raise TimeoutError("Receive timeout: %s" % str(err))
230         break
231       if not data:
232         raise ConnectionClosedError("Connection closed while reading")
233       new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
234       self._buffer = new_msgs.pop()
235       self._msgs.extend(new_msgs)
236     return self._msgs.popleft()
237
238   def Call(self, msg):
239     """Send a message and wait for the response.
240
241     This is just a wrapper over Send and Recv.
242
243     """
244     self.Send(msg)
245     return self.Recv()
246
247   def Close(self):
248     """Close the socket"""
249     if self.socket is not None:
250       self.socket.close()
251       self.socket = None
252
253
254 class Client(object):
255   """High-level client implementation.
256
257   This uses a backing Transport-like class on top of which it
258   implements data serialization/deserialization.
259
260   """
261   def __init__(self, address=None, timeouts=None, transport=Transport):
262     """Constructor for the Client class.
263
264     Arguments:
265       - address: a valid address the the used transport class
266       - timeout: a list of timeouts, to be used on connect and read/write
267       - transport: a Transport-like class
268
269
270     If timeout is not passed, the default timeouts of the transport
271     class are used.
272
273     """
274     if address is None:
275       address = constants.MASTER_SOCKET
276     self.address = address
277     self.timeouts = timeouts
278     self.transport_class = transport
279     self.transport = None
280     self._InitTransport()
281
282   def _InitTransport(self):
283     """(Re)initialize the transport if needed.
284
285     """
286     if self.transport is None:
287       self.transport = self.transport_class(self.address,
288                                             timeouts=self.timeouts)
289
290   def _CloseTransport(self):
291     """Close the transport, ignoring errors.
292
293     """
294     if self.transport is None:
295       return
296     try:
297       old_transp = self.transport
298       self.transport = None
299       old_transp.Close()
300     except Exception: # pylint: disable-msg=W0703
301       pass
302
303   def CallMethod(self, method, args):
304     """Send a generic request and return the response.
305
306     """
307     # Build request
308     request = {
309       KEY_METHOD: method,
310       KEY_ARGS: args,
311       }
312
313     # Serialize the request
314     send_data = serializer.DumpJson(request, indent=False)
315
316     # Send request and wait for response
317     try:
318       self._InitTransport()
319       result = self.transport.Call(send_data)
320     except Exception:
321       self._CloseTransport()
322       raise
323
324     # Parse the result
325     try:
326       data = serializer.LoadJson(result)
327     except Exception, err:
328       raise ProtocolError("Error while deserializing response: %s" % str(err))
329
330     # Validate response
331     if (not isinstance(data, dict) or
332         KEY_SUCCESS not in data or
333         KEY_RESULT not in data):
334       raise DecodingError("Invalid response from server: %s" % str(data))
335
336     result = data[KEY_RESULT]
337
338     if not data[KEY_SUCCESS]:
339       errors.MaybeRaise(result)
340       raise RequestError(result)
341
342     return result
343
344   def SetQueueDrainFlag(self, drain_flag):
345     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
346
347   def SetWatcherPause(self, until):
348     return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
349
350   def SubmitJob(self, ops):
351     ops_state = map(lambda op: op.__getstate__(), ops)
352     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
353
354   def SubmitManyJobs(self, jobs):
355     jobs_state = []
356     for ops in jobs:
357       jobs_state.append([op.__getstate__() for op in ops])
358     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
359
360   def CancelJob(self, job_id):
361     return self.CallMethod(REQ_CANCEL_JOB, job_id)
362
363   def ArchiveJob(self, job_id):
364     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
365
366   def AutoArchiveJobs(self, age):
367     timeout = (DEF_RWTO - 1) / 2
368     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
369
370   def WaitForJobChangeOnce(self, job_id, fields,
371                            prev_job_info, prev_log_serial,
372                            timeout=WFJC_TIMEOUT):
373     """Waits for changes on a job.
374
375     @param job_id: Job ID
376     @type fields: list
377     @param fields: List of field names to be observed
378     @type prev_job_info: None or list
379     @param prev_job_info: Previously received job information
380     @type prev_log_serial: None or int/long
381     @param prev_log_serial: Highest log serial number previously received
382     @type timeout: int/float
383     @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
384                     be capped to that value)
385
386     """
387     assert timeout >= 0, "Timeout can not be negative"
388     return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
389                            (job_id, fields, prev_job_info,
390                             prev_log_serial,
391                             min(WFJC_TIMEOUT, timeout)))
392
393   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
394     while True:
395       result = self.WaitForJobChangeOnce(job_id, fields,
396                                          prev_job_info, prev_log_serial)
397       if result != constants.JOB_NOTCHANGED:
398         break
399     return result
400
401   def QueryJobs(self, job_ids, fields):
402     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
403
404   def QueryInstances(self, names, fields, use_locking):
405     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
406
407   def QueryNodes(self, names, fields, use_locking):
408     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
409
410   def QueryExports(self, nodes, use_locking):
411     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
412
413   def QueryClusterInfo(self):
414     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
415
416   def QueryConfigValues(self, fields):
417     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
418
419   def QueryTags(self, kind, name):
420     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))