workerpool: Additional check in BaseWorker.ShouldTerminate
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36 import logging
37
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
42
43
44 KEY_METHOD = "method"
45 KEY_ARGS = "args"
46 KEY_SUCCESS = "success"
47 KEY_RESULT = "result"
48
49 REQ_SUBMIT_JOB = "SubmitJob"
50 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
51 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
52 REQ_CANCEL_JOB = "CancelJob"
53 REQ_ARCHIVE_JOB = "ArchiveJob"
54 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
55 REQ_QUERY_JOBS = "QueryJobs"
56 REQ_QUERY_INSTANCES = "QueryInstances"
57 REQ_QUERY_NODES = "QueryNodes"
58 REQ_QUERY_EXPORTS = "QueryExports"
59 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
60 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
61 REQ_QUERY_TAGS = "QueryTags"
62 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
63 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
64
65 DEF_CTMO = 10
66 DEF_RWTO = 60
67
68 # WaitForJobChange timeout
69 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
70
71
72 class ProtocolError(errors.GenericError):
73   """Denotes an error in the LUXI protocol."""
74
75
76 class ConnectionClosedError(ProtocolError):
77   """Connection closed error."""
78
79
80 class TimeoutError(ProtocolError):
81   """Operation timeout error."""
82
83
84 class RequestError(ProtocolError):
85   """Error on request.
86
87   This signifies an error in the request format or request handling,
88   but not (e.g.) an error in starting up an instance.
89
90   Some common conditions that can trigger this exception:
91     - job submission failed because the job data was wrong
92     - query failed because required fields were missing
93
94   """
95
96
97 class NoMasterError(ProtocolError):
98   """The master cannot be reached.
99
100   This means that the master daemon is not running or the socket has
101   been removed.
102
103   """
104
105
106 class PermissionError(ProtocolError):
107   """Permission denied while connecting to the master socket.
108
109   This means the user doesn't have the proper rights.
110
111   """
112
113
114 class Transport:
115   """Low-level transport class.
116
117   This is used on the client side.
118
119   This could be replace by any other class that provides the same
120   semantics to the Client. This means:
121     - can send messages and receive messages
122     - safe for multithreading
123
124   """
125
126   def __init__(self, address, timeouts=None):
127     """Constructor for the Client class.
128
129     Arguments:
130       - address: a valid address the the used transport class
131       - timeout: a list of timeouts, to be used on connect and read/write
132
133     There are two timeouts used since we might want to wait for a long
134     time for a response, but the connect timeout should be lower.
135
136     If not passed, we use a default of 10 and respectively 60 seconds.
137
138     Note that on reading data, since the timeout applies to an
139     invidual receive, it might be that the total duration is longer
140     than timeout value passed (we make a hard limit at twice the read
141     timeout).
142
143     """
144     self.address = address
145     if timeouts is None:
146       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
147     else:
148       self._ctimeout, self._rwtimeout = timeouts
149
150     self.socket = None
151     self._buffer = ""
152     self._msgs = collections.deque()
153
154     try:
155       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
156
157       # Try to connect
158       try:
159         utils.Retry(self._Connect, 1.0, self._ctimeout,
160                     args=(self.socket, address, self._ctimeout))
161       except utils.RetryTimeout:
162         raise TimeoutError("Connect timed out")
163
164       self.socket.settimeout(self._rwtimeout)
165     except (socket.error, NoMasterError):
166       if self.socket is not None:
167         self.socket.close()
168       self.socket = None
169       raise
170
171   @staticmethod
172   def _Connect(sock, address, timeout):
173     sock.settimeout(timeout)
174     try:
175       sock.connect(address)
176     except socket.timeout, err:
177       raise TimeoutError("Connect timed out: %s" % str(err))
178     except socket.error, err:
179       error_code = err.args[0]
180       if error_code in (errno.ENOENT, errno.ECONNREFUSED):
181         raise NoMasterError(address)
182       elif error_code in (errno.EPERM, errno.EACCES):
183         raise PermissionError(address)
184       elif error_code == errno.EAGAIN:
185         # Server's socket backlog is full at the moment
186         raise utils.RetryAgain()
187       raise
188
189   def _CheckSocket(self):
190     """Make sure we are connected.
191
192     """
193     if self.socket is None:
194       raise ProtocolError("Connection is closed")
195
196   def Send(self, msg):
197     """Send a message.
198
199     This just sends a message and doesn't wait for the response.
200
201     """
202     if constants.LUXI_EOM in msg:
203       raise ProtocolError("Message terminator found in payload")
204
205     self._CheckSocket()
206     try:
207       # TODO: sendall is not guaranteed to send everything
208       self.socket.sendall(msg + constants.LUXI_EOM)
209     except socket.timeout, err:
210       raise TimeoutError("Sending timeout: %s" % str(err))
211
212   def Recv(self):
213     """Try to receive a message from the socket.
214
215     In case we already have messages queued, we just return from the
216     queue. Otherwise, we try to read data with a _rwtimeout network
217     timeout, and making sure we don't go over 2x_rwtimeout as a global
218     limit.
219
220     """
221     self._CheckSocket()
222     etime = time.time() + self._rwtimeout
223     while not self._msgs:
224       if time.time() > etime:
225         raise TimeoutError("Extended receive timeout")
226       while True:
227         try:
228           data = self.socket.recv(4096)
229         except socket.error, err:
230           if err.args and err.args[0] == errno.EAGAIN:
231             continue
232           raise
233         except socket.timeout, err:
234           raise TimeoutError("Receive timeout: %s" % str(err))
235         break
236       if not data:
237         raise ConnectionClosedError("Connection closed while reading")
238       new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
239       self._buffer = new_msgs.pop()
240       self._msgs.extend(new_msgs)
241     return self._msgs.popleft()
242
243   def Call(self, msg):
244     """Send a message and wait for the response.
245
246     This is just a wrapper over Send and Recv.
247
248     """
249     self.Send(msg)
250     return self.Recv()
251
252   def Close(self):
253     """Close the socket"""
254     if self.socket is not None:
255       self.socket.close()
256       self.socket = None
257
258
259 def ParseRequest(msg):
260   """Parses a LUXI request message.
261
262   """
263   try:
264     request = serializer.LoadJson(msg)
265   except ValueError, err:
266     raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
267
268   logging.debug("LUXI request: %s", request)
269
270   if not isinstance(request, dict):
271     logging.error("LUXI request not a dict: %r", msg)
272     raise ProtocolError("Invalid LUXI request (not a dict)")
273
274   method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
275   args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
276
277   if method is None or args is None:
278     logging.error("LUXI request missing method or arguments: %r", msg)
279     raise ProtocolError(("Invalid LUXI request (no method or arguments"
280                          " in request): %r") % msg)
281
282   return (method, args)
283
284
285 def ParseResponse(msg):
286   """Parses a LUXI response message.
287
288   """
289   # Parse the result
290   try:
291     data = serializer.LoadJson(msg)
292   except Exception, err:
293     raise ProtocolError("Error while deserializing response: %s" % str(err))
294
295   # Validate response
296   if not (isinstance(data, dict) and
297           KEY_SUCCESS in data and
298           KEY_RESULT in data):
299     raise ProtocolError("Invalid response from server: %r" % data)
300
301   return (data[KEY_SUCCESS], data[KEY_RESULT])
302
303
304 def FormatResponse(success, result):
305   """Formats a LUXI response message.
306
307   """
308   response = {
309     KEY_SUCCESS: success,
310     KEY_RESULT: result,
311     }
312
313   logging.debug("LUXI response: %s", response)
314
315   return serializer.DumpJson(response)
316
317
318 def FormatRequest(method, args):
319   """Formats a LUXI request message.
320
321   """
322   # Build request
323   request = {
324     KEY_METHOD: method,
325     KEY_ARGS: args,
326     }
327
328   # Serialize the request
329   return serializer.DumpJson(request, indent=False)
330
331
332 def CallLuxiMethod(transport_cb, method, args):
333   """Send a LUXI request via a transport and return the response.
334
335   """
336   assert callable(transport_cb)
337
338   request_msg = FormatRequest(method, args)
339
340   # Send request and wait for response
341   response_msg = transport_cb(request_msg)
342
343   (success, result) = ParseResponse(response_msg)
344
345   if success:
346     return result
347
348   errors.MaybeRaise(result)
349   raise RequestError(result)
350
351
352 class Client(object):
353   """High-level client implementation.
354
355   This uses a backing Transport-like class on top of which it
356   implements data serialization/deserialization.
357
358   """
359   def __init__(self, address=None, timeouts=None, transport=Transport):
360     """Constructor for the Client class.
361
362     Arguments:
363       - address: a valid address the the used transport class
364       - timeout: a list of timeouts, to be used on connect and read/write
365       - transport: a Transport-like class
366
367
368     If timeout is not passed, the default timeouts of the transport
369     class are used.
370
371     """
372     if address is None:
373       address = constants.MASTER_SOCKET
374     self.address = address
375     self.timeouts = timeouts
376     self.transport_class = transport
377     self.transport = None
378     self._InitTransport()
379
380   def _InitTransport(self):
381     """(Re)initialize the transport if needed.
382
383     """
384     if self.transport is None:
385       self.transport = self.transport_class(self.address,
386                                             timeouts=self.timeouts)
387
388   def _CloseTransport(self):
389     """Close the transport, ignoring errors.
390
391     """
392     if self.transport is None:
393       return
394     try:
395       old_transp = self.transport
396       self.transport = None
397       old_transp.Close()
398     except Exception: # pylint: disable-msg=W0703
399       pass
400
401   def _SendMethodCall(self, data):
402     # Send request and wait for response
403     try:
404       self._InitTransport()
405       return self.transport.Call(data)
406     except Exception:
407       self._CloseTransport()
408       raise
409
410   def CallMethod(self, method, args):
411     """Send a generic request and return the response.
412
413     """
414     return CallLuxiMethod(self._SendMethodCall, method, args)
415
416   def SetQueueDrainFlag(self, drain_flag):
417     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
418
419   def SetWatcherPause(self, until):
420     return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
421
422   def SubmitJob(self, ops):
423     ops_state = map(lambda op: op.__getstate__(), ops)
424     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
425
426   def SubmitManyJobs(self, jobs):
427     jobs_state = []
428     for ops in jobs:
429       jobs_state.append([op.__getstate__() for op in ops])
430     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
431
432   def CancelJob(self, job_id):
433     return self.CallMethod(REQ_CANCEL_JOB, job_id)
434
435   def ArchiveJob(self, job_id):
436     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
437
438   def AutoArchiveJobs(self, age):
439     timeout = (DEF_RWTO - 1) / 2
440     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
441
442   def WaitForJobChangeOnce(self, job_id, fields,
443                            prev_job_info, prev_log_serial,
444                            timeout=WFJC_TIMEOUT):
445     """Waits for changes on a job.
446
447     @param job_id: Job ID
448     @type fields: list
449     @param fields: List of field names to be observed
450     @type prev_job_info: None or list
451     @param prev_job_info: Previously received job information
452     @type prev_log_serial: None or int/long
453     @param prev_log_serial: Highest log serial number previously received
454     @type timeout: int/float
455     @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
456                     be capped to that value)
457
458     """
459     assert timeout >= 0, "Timeout can not be negative"
460     return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
461                            (job_id, fields, prev_job_info,
462                             prev_log_serial,
463                             min(WFJC_TIMEOUT, timeout)))
464
465   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
466     while True:
467       result = self.WaitForJobChangeOnce(job_id, fields,
468                                          prev_job_info, prev_log_serial)
469       if result != constants.JOB_NOTCHANGED:
470         break
471     return result
472
473   def QueryJobs(self, job_ids, fields):
474     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
475
476   def QueryInstances(self, names, fields, use_locking):
477     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
478
479   def QueryNodes(self, names, fields, use_locking):
480     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
481
482   def QueryExports(self, nodes, use_locking):
483     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
484
485   def QueryClusterInfo(self):
486     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
487
488   def QueryConfigValues(self, fields):
489     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
490
491   def QueryTags(self, kind, name):
492     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))