jqueue/gnt-job: Add job priority fields for display
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36 import logging
37
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
42
43
44 KEY_METHOD = "method"
45 KEY_ARGS = "args"
46 KEY_SUCCESS = "success"
47 KEY_RESULT = "result"
48
49 REQ_SUBMIT_JOB = "SubmitJob"
50 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
51 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
52 REQ_CANCEL_JOB = "CancelJob"
53 REQ_ARCHIVE_JOB = "ArchiveJob"
54 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
55 REQ_QUERY_JOBS = "QueryJobs"
56 REQ_QUERY_INSTANCES = "QueryInstances"
57 REQ_QUERY_NODES = "QueryNodes"
58 REQ_QUERY_EXPORTS = "QueryExports"
59 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
60 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
61 REQ_QUERY_TAGS = "QueryTags"
62 REQ_QUERY_LOCKS = "QueryLocks"
63 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
64 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
65
66 DEF_CTMO = 10
67 DEF_RWTO = 60
68
69 # WaitForJobChange timeout
70 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
71
72
73 class ProtocolError(errors.GenericError):
74   """Denotes an error in the LUXI protocol."""
75
76
77 class ConnectionClosedError(ProtocolError):
78   """Connection closed error."""
79
80
81 class TimeoutError(ProtocolError):
82   """Operation timeout error."""
83
84
85 class RequestError(ProtocolError):
86   """Error on request.
87
88   This signifies an error in the request format or request handling,
89   but not (e.g.) an error in starting up an instance.
90
91   Some common conditions that can trigger this exception:
92     - job submission failed because the job data was wrong
93     - query failed because required fields were missing
94
95   """
96
97
98 class NoMasterError(ProtocolError):
99   """The master cannot be reached.
100
101   This means that the master daemon is not running or the socket has
102   been removed.
103
104   """
105
106
107 class PermissionError(ProtocolError):
108   """Permission denied while connecting to the master socket.
109
110   This means the user doesn't have the proper rights.
111
112   """
113
114
115 class Transport:
116   """Low-level transport class.
117
118   This is used on the client side.
119
120   This could be replace by any other class that provides the same
121   semantics to the Client. This means:
122     - can send messages and receive messages
123     - safe for multithreading
124
125   """
126
127   def __init__(self, address, timeouts=None):
128     """Constructor for the Client class.
129
130     Arguments:
131       - address: a valid address the the used transport class
132       - timeout: a list of timeouts, to be used on connect and read/write
133
134     There are two timeouts used since we might want to wait for a long
135     time for a response, but the connect timeout should be lower.
136
137     If not passed, we use a default of 10 and respectively 60 seconds.
138
139     Note that on reading data, since the timeout applies to an
140     invidual receive, it might be that the total duration is longer
141     than timeout value passed (we make a hard limit at twice the read
142     timeout).
143
144     """
145     self.address = address
146     if timeouts is None:
147       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
148     else:
149       self._ctimeout, self._rwtimeout = timeouts
150
151     self.socket = None
152     self._buffer = ""
153     self._msgs = collections.deque()
154
155     try:
156       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
157
158       # Try to connect
159       try:
160         utils.Retry(self._Connect, 1.0, self._ctimeout,
161                     args=(self.socket, address, self._ctimeout))
162       except utils.RetryTimeout:
163         raise TimeoutError("Connect timed out")
164
165       self.socket.settimeout(self._rwtimeout)
166     except (socket.error, NoMasterError):
167       if self.socket is not None:
168         self.socket.close()
169       self.socket = None
170       raise
171
172   @staticmethod
173   def _Connect(sock, address, timeout):
174     sock.settimeout(timeout)
175     try:
176       sock.connect(address)
177     except socket.timeout, err:
178       raise TimeoutError("Connect timed out: %s" % str(err))
179     except socket.error, err:
180       error_code = err.args[0]
181       if error_code in (errno.ENOENT, errno.ECONNREFUSED):
182         raise NoMasterError(address)
183       elif error_code in (errno.EPERM, errno.EACCES):
184         raise PermissionError(address)
185       elif error_code == errno.EAGAIN:
186         # Server's socket backlog is full at the moment
187         raise utils.RetryAgain()
188       raise
189
190   def _CheckSocket(self):
191     """Make sure we are connected.
192
193     """
194     if self.socket is None:
195       raise ProtocolError("Connection is closed")
196
197   def Send(self, msg):
198     """Send a message.
199
200     This just sends a message and doesn't wait for the response.
201
202     """
203     if constants.LUXI_EOM in msg:
204       raise ProtocolError("Message terminator found in payload")
205
206     self._CheckSocket()
207     try:
208       # TODO: sendall is not guaranteed to send everything
209       self.socket.sendall(msg + constants.LUXI_EOM)
210     except socket.timeout, err:
211       raise TimeoutError("Sending timeout: %s" % str(err))
212
213   def Recv(self):
214     """Try to receive a message from the socket.
215
216     In case we already have messages queued, we just return from the
217     queue. Otherwise, we try to read data with a _rwtimeout network
218     timeout, and making sure we don't go over 2x_rwtimeout as a global
219     limit.
220
221     """
222     self._CheckSocket()
223     etime = time.time() + self._rwtimeout
224     while not self._msgs:
225       if time.time() > etime:
226         raise TimeoutError("Extended receive timeout")
227       while True:
228         try:
229           data = self.socket.recv(4096)
230         except socket.error, err:
231           if err.args and err.args[0] == errno.EAGAIN:
232             continue
233           raise
234         except socket.timeout, err:
235           raise TimeoutError("Receive timeout: %s" % str(err))
236         break
237       if not data:
238         raise ConnectionClosedError("Connection closed while reading")
239       new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
240       self._buffer = new_msgs.pop()
241       self._msgs.extend(new_msgs)
242     return self._msgs.popleft()
243
244   def Call(self, msg):
245     """Send a message and wait for the response.
246
247     This is just a wrapper over Send and Recv.
248
249     """
250     self.Send(msg)
251     return self.Recv()
252
253   def Close(self):
254     """Close the socket"""
255     if self.socket is not None:
256       self.socket.close()
257       self.socket = None
258
259
260 def ParseRequest(msg):
261   """Parses a LUXI request message.
262
263   """
264   try:
265     request = serializer.LoadJson(msg)
266   except ValueError, err:
267     raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
268
269   logging.debug("LUXI request: %s", request)
270
271   if not isinstance(request, dict):
272     logging.error("LUXI request not a dict: %r", msg)
273     raise ProtocolError("Invalid LUXI request (not a dict)")
274
275   method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
276   args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
277
278   if method is None or args is None:
279     logging.error("LUXI request missing method or arguments: %r", msg)
280     raise ProtocolError(("Invalid LUXI request (no method or arguments"
281                          " in request): %r") % msg)
282
283   return (method, args)
284
285
286 def ParseResponse(msg):
287   """Parses a LUXI response message.
288
289   """
290   # Parse the result
291   try:
292     data = serializer.LoadJson(msg)
293   except Exception, err:
294     raise ProtocolError("Error while deserializing response: %s" % str(err))
295
296   # Validate response
297   if not (isinstance(data, dict) and
298           KEY_SUCCESS in data and
299           KEY_RESULT in data):
300     raise ProtocolError("Invalid response from server: %r" % data)
301
302   return (data[KEY_SUCCESS], data[KEY_RESULT])
303
304
305 def FormatResponse(success, result):
306   """Formats a LUXI response message.
307
308   """
309   response = {
310     KEY_SUCCESS: success,
311     KEY_RESULT: result,
312     }
313
314   logging.debug("LUXI response: %s", response)
315
316   return serializer.DumpJson(response)
317
318
319 def FormatRequest(method, args):
320   """Formats a LUXI request message.
321
322   """
323   # Build request
324   request = {
325     KEY_METHOD: method,
326     KEY_ARGS: args,
327     }
328
329   # Serialize the request
330   return serializer.DumpJson(request, indent=False)
331
332
333 def CallLuxiMethod(transport_cb, method, args):
334   """Send a LUXI request via a transport and return the response.
335
336   """
337   assert callable(transport_cb)
338
339   request_msg = FormatRequest(method, args)
340
341   # Send request and wait for response
342   response_msg = transport_cb(request_msg)
343
344   (success, result) = ParseResponse(response_msg)
345
346   if success:
347     return result
348
349   errors.MaybeRaise(result)
350   raise RequestError(result)
351
352
353 class Client(object):
354   """High-level client implementation.
355
356   This uses a backing Transport-like class on top of which it
357   implements data serialization/deserialization.
358
359   """
360   def __init__(self, address=None, timeouts=None, transport=Transport):
361     """Constructor for the Client class.
362
363     Arguments:
364       - address: a valid address the the used transport class
365       - timeout: a list of timeouts, to be used on connect and read/write
366       - transport: a Transport-like class
367
368
369     If timeout is not passed, the default timeouts of the transport
370     class are used.
371
372     """
373     if address is None:
374       address = constants.MASTER_SOCKET
375     self.address = address
376     self.timeouts = timeouts
377     self.transport_class = transport
378     self.transport = None
379     self._InitTransport()
380
381   def _InitTransport(self):
382     """(Re)initialize the transport if needed.
383
384     """
385     if self.transport is None:
386       self.transport = self.transport_class(self.address,
387                                             timeouts=self.timeouts)
388
389   def _CloseTransport(self):
390     """Close the transport, ignoring errors.
391
392     """
393     if self.transport is None:
394       return
395     try:
396       old_transp = self.transport
397       self.transport = None
398       old_transp.Close()
399     except Exception: # pylint: disable-msg=W0703
400       pass
401
402   def _SendMethodCall(self, data):
403     # Send request and wait for response
404     try:
405       self._InitTransport()
406       return self.transport.Call(data)
407     except Exception:
408       self._CloseTransport()
409       raise
410
411   def CallMethod(self, method, args):
412     """Send a generic request and return the response.
413
414     """
415     return CallLuxiMethod(self._SendMethodCall, method, args)
416
417   def SetQueueDrainFlag(self, drain_flag):
418     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
419
420   def SetWatcherPause(self, until):
421     return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
422
423   def SubmitJob(self, ops):
424     ops_state = map(lambda op: op.__getstate__(), ops)
425     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
426
427   def SubmitManyJobs(self, jobs):
428     jobs_state = []
429     for ops in jobs:
430       jobs_state.append([op.__getstate__() for op in ops])
431     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
432
433   def CancelJob(self, job_id):
434     return self.CallMethod(REQ_CANCEL_JOB, job_id)
435
436   def ArchiveJob(self, job_id):
437     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
438
439   def AutoArchiveJobs(self, age):
440     timeout = (DEF_RWTO - 1) / 2
441     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
442
443   def WaitForJobChangeOnce(self, job_id, fields,
444                            prev_job_info, prev_log_serial,
445                            timeout=WFJC_TIMEOUT):
446     """Waits for changes on a job.
447
448     @param job_id: Job ID
449     @type fields: list
450     @param fields: List of field names to be observed
451     @type prev_job_info: None or list
452     @param prev_job_info: Previously received job information
453     @type prev_log_serial: None or int/long
454     @param prev_log_serial: Highest log serial number previously received
455     @type timeout: int/float
456     @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
457                     be capped to that value)
458
459     """
460     assert timeout >= 0, "Timeout can not be negative"
461     return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
462                            (job_id, fields, prev_job_info,
463                             prev_log_serial,
464                             min(WFJC_TIMEOUT, timeout)))
465
466   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
467     while True:
468       result = self.WaitForJobChangeOnce(job_id, fields,
469                                          prev_job_info, prev_log_serial)
470       if result != constants.JOB_NOTCHANGED:
471         break
472     return result
473
474   def QueryJobs(self, job_ids, fields):
475     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
476
477   def QueryInstances(self, names, fields, use_locking):
478     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
479
480   def QueryNodes(self, names, fields, use_locking):
481     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
482
483   def QueryExports(self, nodes, use_locking):
484     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
485
486   def QueryClusterInfo(self):
487     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
488
489   def QueryConfigValues(self, fields):
490     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
491
492   def QueryTags(self, kind, name):
493     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
494
495   def QueryLocks(self, fields, sync):
496     return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))