RAPI: Allow waiting for job changes
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36
37 from ganeti import serializer
38 from ganeti import constants
39 from ganeti import errors
40 from ganeti import utils
41
42
43 KEY_METHOD = 'method'
44 KEY_ARGS = 'args'
45 KEY_SUCCESS = "success"
46 KEY_RESULT = "result"
47
48 REQ_SUBMIT_JOB = "SubmitJob"
49 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
50 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
51 REQ_CANCEL_JOB = "CancelJob"
52 REQ_ARCHIVE_JOB = "ArchiveJob"
53 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
54 REQ_QUERY_JOBS = "QueryJobs"
55 REQ_QUERY_INSTANCES = "QueryInstances"
56 REQ_QUERY_NODES = "QueryNodes"
57 REQ_QUERY_EXPORTS = "QueryExports"
58 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
59 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
60 REQ_QUERY_TAGS = "QueryTags"
61 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
62 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
63
64 DEF_CTMO = 10
65 DEF_RWTO = 60
66
67 # WaitForJobChange timeout
68 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
69
70
71 class ProtocolError(Exception):
72   """Denotes an error in the server communication"""
73
74
75 class ConnectionClosedError(ProtocolError):
76   """Connection closed error"""
77
78
79 class TimeoutError(ProtocolError):
80   """Operation timeout error"""
81
82
83 class EncodingError(ProtocolError):
84   """Encoding failure on the sending side"""
85
86
87 class DecodingError(ProtocolError):
88   """Decoding failure on the receiving side"""
89
90
91 class RequestError(ProtocolError):
92   """Error on request
93
94   This signifies an error in the request format or request handling,
95   but not (e.g.) an error in starting up an instance.
96
97   Some common conditions that can trigger this exception:
98     - job submission failed because the job data was wrong
99     - query failed because required fields were missing
100
101   """
102
103
104 class NoMasterError(ProtocolError):
105   """The master cannot be reached
106
107   This means that the master daemon is not running or the socket has
108   been removed.
109
110   """
111
112
113 class Transport:
114   """Low-level transport class.
115
116   This is used on the client side.
117
118   This could be replace by any other class that provides the same
119   semantics to the Client. This means:
120     - can send messages and receive messages
121     - safe for multithreading
122
123   """
124
125   def __init__(self, address, timeouts=None, eom=None):
126     """Constructor for the Client class.
127
128     Arguments:
129       - address: a valid address the the used transport class
130       - timeout: a list of timeouts, to be used on connect and read/write
131       - eom: an identifier to be used as end-of-message which the
132         upper-layer will guarantee that this identifier will not appear
133         in any message
134
135     There are two timeouts used since we might want to wait for a long
136     time for a response, but the connect timeout should be lower.
137
138     If not passed, we use a default of 10 and respectively 60 seconds.
139
140     Note that on reading data, since the timeout applies to an
141     invidual receive, it might be that the total duration is longer
142     than timeout value passed (we make a hard limit at twice the read
143     timeout).
144
145     """
146     self.address = address
147     if timeouts is None:
148       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
149     else:
150       self._ctimeout, self._rwtimeout = timeouts
151
152     self.socket = None
153     self._buffer = ""
154     self._msgs = collections.deque()
155
156     if eom is None:
157       self.eom = '\3'
158     else:
159       self.eom = eom
160
161     try:
162       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
163
164       # Try to connect
165       try:
166         utils.Retry(self._Connect, 1.0, self._ctimeout,
167                     args=(self.socket, address, self._ctimeout))
168       except utils.RetryTimeout:
169         raise TimeoutError("Connect timed out")
170
171       self.socket.settimeout(self._rwtimeout)
172     except (socket.error, NoMasterError):
173       if self.socket is not None:
174         self.socket.close()
175       self.socket = None
176       raise
177
178   @staticmethod
179   def _Connect(sock, address, timeout):
180     sock.settimeout(timeout)
181     try:
182       sock.connect(address)
183     except socket.timeout, err:
184       raise TimeoutError("Connect timed out: %s" % str(err))
185     except socket.error, err:
186       if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
187         raise NoMasterError(address)
188       if err.args[0] == errno.EAGAIN:
189         # Server's socket backlog is full at the moment
190         raise utils.RetryAgain()
191       raise
192
193   def _CheckSocket(self):
194     """Make sure we are connected.
195
196     """
197     if self.socket is None:
198       raise ProtocolError("Connection is closed")
199
200   def Send(self, msg):
201     """Send a message.
202
203     This just sends a message and doesn't wait for the response.
204
205     """
206     if self.eom in msg:
207       raise EncodingError("Message terminator found in payload")
208     self._CheckSocket()
209     try:
210       # TODO: sendall is not guaranteed to send everything
211       self.socket.sendall(msg + self.eom)
212     except socket.timeout, err:
213       raise TimeoutError("Sending timeout: %s" % str(err))
214
215   def Recv(self):
216     """Try to receive a message from the socket.
217
218     In case we already have messages queued, we just return from the
219     queue. Otherwise, we try to read data with a _rwtimeout network
220     timeout, and making sure we don't go over 2x_rwtimeout as a global
221     limit.
222
223     """
224     self._CheckSocket()
225     etime = time.time() + self._rwtimeout
226     while not self._msgs:
227       if time.time() > etime:
228         raise TimeoutError("Extended receive timeout")
229       while True:
230         try:
231           data = self.socket.recv(4096)
232         except socket.error, err:
233           if err.args and err.args[0] == errno.EAGAIN:
234             continue
235           raise
236         except socket.timeout, err:
237           raise TimeoutError("Receive timeout: %s" % str(err))
238         break
239       if not data:
240         raise ConnectionClosedError("Connection closed while reading")
241       new_msgs = (self._buffer + data).split(self.eom)
242       self._buffer = new_msgs.pop()
243       self._msgs.extend(new_msgs)
244     return self._msgs.popleft()
245
246   def Call(self, msg):
247     """Send a message and wait for the response.
248
249     This is just a wrapper over Send and Recv.
250
251     """
252     self.Send(msg)
253     return self.Recv()
254
255   def Close(self):
256     """Close the socket"""
257     if self.socket is not None:
258       self.socket.close()
259       self.socket = None
260
261
262 class Client(object):
263   """High-level client implementation.
264
265   This uses a backing Transport-like class on top of which it
266   implements data serialization/deserialization.
267
268   """
269   def __init__(self, address=None, timeouts=None, transport=Transport):
270     """Constructor for the Client class.
271
272     Arguments:
273       - address: a valid address the the used transport class
274       - timeout: a list of timeouts, to be used on connect and read/write
275       - transport: a Transport-like class
276
277
278     If timeout is not passed, the default timeouts of the transport
279     class are used.
280
281     """
282     if address is None:
283       address = constants.MASTER_SOCKET
284     self.address = address
285     self.timeouts = timeouts
286     self.transport_class = transport
287     self.transport = None
288     self._InitTransport()
289
290   def _InitTransport(self):
291     """(Re)initialize the transport if needed.
292
293     """
294     if self.transport is None:
295       self.transport = self.transport_class(self.address,
296                                             timeouts=self.timeouts)
297
298   def _CloseTransport(self):
299     """Close the transport, ignoring errors.
300
301     """
302     if self.transport is None:
303       return
304     try:
305       old_transp = self.transport
306       self.transport = None
307       old_transp.Close()
308     except Exception: # pylint: disable-msg=W0703
309       pass
310
311   def CallMethod(self, method, args):
312     """Send a generic request and return the response.
313
314     """
315     # Build request
316     request = {
317       KEY_METHOD: method,
318       KEY_ARGS: args,
319       }
320
321     # Serialize the request
322     send_data = serializer.DumpJson(request, indent=False)
323
324     # Send request and wait for response
325     try:
326       self._InitTransport()
327       result = self.transport.Call(send_data)
328     except Exception:
329       self._CloseTransport()
330       raise
331
332     # Parse the result
333     try:
334       data = serializer.LoadJson(result)
335     except Exception, err:
336       raise ProtocolError("Error while deserializing response: %s" % str(err))
337
338     # Validate response
339     if (not isinstance(data, dict) or
340         KEY_SUCCESS not in data or
341         KEY_RESULT not in data):
342       raise DecodingError("Invalid response from server: %s" % str(data))
343
344     result = data[KEY_RESULT]
345
346     if not data[KEY_SUCCESS]:
347       errors.MaybeRaise(result)
348       raise RequestError(result)
349
350     return result
351
352   def SetQueueDrainFlag(self, drain_flag):
353     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
354
355   def SetWatcherPause(self, until):
356     return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
357
358   def SubmitJob(self, ops):
359     ops_state = map(lambda op: op.__getstate__(), ops)
360     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
361
362   def SubmitManyJobs(self, jobs):
363     jobs_state = []
364     for ops in jobs:
365       jobs_state.append([op.__getstate__() for op in ops])
366     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
367
368   def CancelJob(self, job_id):
369     return self.CallMethod(REQ_CANCEL_JOB, job_id)
370
371   def ArchiveJob(self, job_id):
372     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
373
374   def AutoArchiveJobs(self, age):
375     timeout = (DEF_RWTO - 1) / 2
376     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
377
378   def WaitForJobChangeOnce(self, job_id, fields,
379                            prev_job_info, prev_log_serial,
380                            timeout=WFJC_TIMEOUT):
381     """Waits for changes on a job.
382
383     @param job_id: Job ID
384     @type fields: list
385     @param fields: List of field names to be observed
386     @type prev_job_info: None or list
387     @param prev_job_info: Previously received job information
388     @type prev_log_serial: None or int/long
389     @param prev_log_serial: Highest log serial number previously received
390     @type timeout: int/float
391     @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
392                     be capped to that value)
393
394     """
395     assert timeout >= 0, "Timeout can not be negative"
396     return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
397                            (job_id, fields, prev_job_info,
398                             prev_log_serial,
399                             min(WFJC_TIMEOUT, timeout)))
400
401   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
402     while True:
403       result = self.WaitForJobChangeOnce(job_id, fields,
404                                          prev_job_info, prev_log_serial)
405       if result != constants.JOB_NOTCHANGED:
406         break
407     return result
408
409   def QueryJobs(self, job_ids, fields):
410     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
411
412   def QueryInstances(self, names, fields, use_locking):
413     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
414
415   def QueryNodes(self, names, fields, use_locking):
416     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
417
418   def QueryExports(self, nodes, use_locking):
419     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
420
421   def QueryClusterInfo(self):
422     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
423
424   def QueryConfigValues(self, fields):
425     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
426
427   def QueryTags(self, kind, name):
428     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))