Show message when job is waiting in queue or for locks
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
40 from ganeti import utils
41
42
43 KEY_METHOD = 'method'
44 KEY_ARGS = 'args'
45 KEY_SUCCESS = "success"
46 KEY_RESULT = "result"
47
48 REQ_SUBMIT_JOB = "SubmitJob"
49 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
50 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
51 REQ_CANCEL_JOB = "CancelJob"
52 REQ_ARCHIVE_JOB = "ArchiveJob"
53 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
54 REQ_QUERY_JOBS = "QueryJobs"
55 REQ_QUERY_INSTANCES = "QueryInstances"
56 REQ_QUERY_NODES = "QueryNodes"
57 REQ_QUERY_EXPORTS = "QueryExports"
58 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
59 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
60 REQ_QUERY_TAGS = "QueryTags"
61 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
62 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
63
64 DEF_CTMO = 10
65 DEF_RWTO = 60
66
67
68 class ProtocolError(Exception):
69   """Denotes an error in the server communication"""
70
71
72 class ConnectionClosedError(ProtocolError):
73   """Connection closed error"""
74
75
76 class TimeoutError(ProtocolError):
77   """Operation timeout error"""
78
79
80 class EncodingError(ProtocolError):
81   """Encoding failure on the sending side"""
82
83
84 class DecodingError(ProtocolError):
85   """Decoding failure on the receiving side"""
86
87
88 class RequestError(ProtocolError):
89   """Error on request
90
91   This signifies an error in the request format or request handling,
92   but not (e.g.) an error in starting up an instance.
93
94   Some common conditions that can trigger this exception:
95     - job submission failed because the job data was wrong
96     - query failed because required fields were missing
97
98   """
99
100
101 class NoMasterError(ProtocolError):
102   """The master cannot be reached
103
104   This means that the master daemon is not running or the socket has
105   been removed.
106
107   """
108
109
110 class Transport:
111   """Low-level transport class.
112
113   This is used on the client side.
114
115   This could be replace by any other class that provides the same
116   semantics to the Client. This means:
117     - can send messages and receive messages
118     - safe for multithreading
119
120   """
121
122   def __init__(self, address, timeouts=None, eom=None):
123     """Constructor for the Client class.
124
125     Arguments:
126       - address: a valid address the the used transport class
127       - timeout: a list of timeouts, to be used on connect and read/write
128       - eom: an identifier to be used as end-of-message which the
129         upper-layer will guarantee that this identifier will not appear
130         in any message
131
132     There are two timeouts used since we might want to wait for a long
133     time for a response, but the connect timeout should be lower.
134
135     If not passed, we use a default of 10 and respectively 60 seconds.
136
137     Note that on reading data, since the timeout applies to an
138     invidual receive, it might be that the total duration is longer
139     than timeout value passed (we make a hard limit at twice the read
140     timeout).
141
142     """
143     self.address = address
144     if timeouts is None:
145       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
146     else:
147       self._ctimeout, self._rwtimeout = timeouts
148
149     self.socket = None
150     self._buffer = ""
151     self._msgs = collections.deque()
152
153     if eom is None:
154       self.eom = '\3'
155     else:
156       self.eom = eom
157
158     try:
159       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
160
161       # Try to connect
162       try:
163         utils.Retry(self._Connect, 1.0, self._ctimeout,
164                     args=(self.socket, address, self._ctimeout))
165       except utils.RetryTimeout:
166         raise TimeoutError("Connect timed out")
167
168       self.socket.settimeout(self._rwtimeout)
169     except (socket.error, NoMasterError):
170       if self.socket is not None:
171         self.socket.close()
172       self.socket = None
173       raise
174
175   @staticmethod
176   def _Connect(sock, address, timeout):
177     sock.settimeout(timeout)
178     try:
179       sock.connect(address)
180     except socket.timeout, err:
181       raise TimeoutError("Connect timed out: %s" % str(err))
182     except socket.error, err:
183       if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
184         raise NoMasterError(address)
185       if err.args[0] == errno.EAGAIN:
186         # Server's socket backlog is full at the moment
187         raise utils.RetryAgain()
188       raise
189
190   def _CheckSocket(self):
191     """Make sure we are connected.
192
193     """
194     if self.socket is None:
195       raise ProtocolError("Connection is closed")
196
197   def Send(self, msg):
198     """Send a message.
199
200     This just sends a message and doesn't wait for the response.
201
202     """
203     if self.eom in msg:
204       raise EncodingError("Message terminator found in payload")
205     self._CheckSocket()
206     try:
207       # TODO: sendall is not guaranteed to send everything
208       self.socket.sendall(msg + self.eom)
209     except socket.timeout, err:
210       raise TimeoutError("Sending timeout: %s" % str(err))
211
212   def Recv(self):
213     """Try to receive a message from the socket.
214
215     In case we already have messages queued, we just return from the
216     queue. Otherwise, we try to read data with a _rwtimeout network
217     timeout, and making sure we don't go over 2x_rwtimeout as a global
218     limit.
219
220     """
221     self._CheckSocket()
222     etime = time.time() + self._rwtimeout
223     while not self._msgs:
224       if time.time() > etime:
225         raise TimeoutError("Extended receive timeout")
226       while True:
227         try:
228           data = self.socket.recv(4096)
229         except socket.error, err:
230           if err.args and err.args[0] == errno.EAGAIN:
231             continue
232           raise
233         except socket.timeout, err:
234           raise TimeoutError("Receive timeout: %s" % str(err))
235         break
236       if not data:
237         raise ConnectionClosedError("Connection closed while reading")
238       new_msgs = (self._buffer + data).split(self.eom)
239       self._buffer = new_msgs.pop()
240       self._msgs.extend(new_msgs)
241     return self._msgs.popleft()
242
243   def Call(self, msg):
244     """Send a message and wait for the response.
245
246     This is just a wrapper over Send and Recv.
247
248     """
249     self.Send(msg)
250     return self.Recv()
251
252   def Close(self):
253     """Close the socket"""
254     if self.socket is not None:
255       self.socket.close()
256       self.socket = None
257
258
259 class Client(object):
260   """High-level client implementation.
261
262   This uses a backing Transport-like class on top of which it
263   implements data serialization/deserialization.
264
265   """
266   def __init__(self, address=None, timeouts=None, transport=Transport):
267     """Constructor for the Client class.
268
269     Arguments:
270       - address: a valid address the the used transport class
271       - timeout: a list of timeouts, to be used on connect and read/write
272       - transport: a Transport-like class
273
274
275     If timeout is not passed, the default timeouts of the transport
276     class are used.
277
278     """
279     if address is None:
280       address = constants.MASTER_SOCKET
281     self.address = address
282     self.timeouts = timeouts
283     self.transport_class = transport
284     self.transport = None
285     self._InitTransport()
286
287   def _InitTransport(self):
288     """(Re)initialize the transport if needed.
289
290     """
291     if self.transport is None:
292       self.transport = self.transport_class(self.address,
293                                             timeouts=self.timeouts)
294
295   def _CloseTransport(self):
296     """Close the transport, ignoring errors.
297
298     """
299     if self.transport is None:
300       return
301     try:
302       old_transp = self.transport
303       self.transport = None
304       old_transp.Close()
305     except Exception: # pylint: disable-msg=W0703
306       pass
307
308   def CallMethod(self, method, args):
309     """Send a generic request and return the response.
310
311     """
312     # Build request
313     request = {
314       KEY_METHOD: method,
315       KEY_ARGS: args,
316       }
317
318     # Serialize the request
319     send_data = serializer.DumpJson(request, indent=False)
320
321     # Send request and wait for response
322     try:
323       self._InitTransport()
324       result = self.transport.Call(send_data)
325     except Exception:
326       self._CloseTransport()
327       raise
328
329     # Parse the result
330     try:
331       data = serializer.LoadJson(result)
332     except Exception, err:
333       raise ProtocolError("Error while deserializing response: %s" % str(err))
334
335     # Validate response
336     if (not isinstance(data, dict) or
337         KEY_SUCCESS not in data or
338         KEY_RESULT not in data):
339       raise DecodingError("Invalid response from server: %s" % str(data))
340
341     result = data[KEY_RESULT]
342
343     if not data[KEY_SUCCESS]:
344       errors.MaybeRaise(result)
345       raise RequestError(result)
346
347     return result
348
349   def SetQueueDrainFlag(self, drain_flag):
350     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
351
352   def SetWatcherPause(self, until):
353     return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
354
355   def SubmitJob(self, ops):
356     ops_state = map(lambda op: op.__getstate__(), ops)
357     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
358
359   def SubmitManyJobs(self, jobs):
360     jobs_state = []
361     for ops in jobs:
362       jobs_state.append([op.__getstate__() for op in ops])
363     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
364
365   def CancelJob(self, job_id):
366     return self.CallMethod(REQ_CANCEL_JOB, job_id)
367
368   def ArchiveJob(self, job_id):
369     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
370
371   def AutoArchiveJobs(self, age):
372     timeout = (DEF_RWTO - 1) / 2
373     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
374
375   def WaitForJobChangeOnce(self, job_id, fields,
376                            prev_job_info, prev_log_serial):
377     timeout = (DEF_RWTO - 1) / 2
378     return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
379                            (job_id, fields, prev_job_info,
380                             prev_log_serial, timeout))
381
382   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
383     while True:
384       result = self.WaitForJobChangeOnce(job_id, fields,
385                                          prev_job_info, prev_log_serial)
386       if result != constants.JOB_NOTCHANGED:
387         break
388     return result
389
390   def QueryJobs(self, job_ids, fields):
391     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
392
393   def QueryInstances(self, names, fields, use_locking):
394     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
395
396   def QueryNodes(self, names, fields, use_locking):
397     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
398
399   def QueryExports(self, nodes, use_locking):
400     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
401
402   def QueryClusterInfo(self):
403     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
404
405   def QueryConfigValues(self, fields):
406     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
407
408   def QueryTags(self, kind, name):
409     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))