cmdlib: Don't sort list of nodes if names are given
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36 import logging
37
38 from ganeti import serializer
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import utils
42
43
44 KEY_METHOD = "method"
45 KEY_ARGS = "args"
46 KEY_SUCCESS = "success"
47 KEY_RESULT = "result"
48 KEY_VERSION = "version"
49
50 REQ_SUBMIT_JOB = "SubmitJob"
51 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
52 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
53 REQ_CANCEL_JOB = "CancelJob"
54 REQ_ARCHIVE_JOB = "ArchiveJob"
55 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
56 REQ_QUERY_JOBS = "QueryJobs"
57 REQ_QUERY_INSTANCES = "QueryInstances"
58 REQ_QUERY_NODES = "QueryNodes"
59 REQ_QUERY_GROUPS = "QueryGroups"
60 REQ_QUERY_EXPORTS = "QueryExports"
61 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
62 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
63 REQ_QUERY_TAGS = "QueryTags"
64 REQ_QUERY_LOCKS = "QueryLocks"
65 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
66 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
67
68 DEF_CTMO = 10
69 DEF_RWTO = 60
70
71 # WaitForJobChange timeout
72 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
73
74
75 class ProtocolError(errors.LuxiError):
76   """Denotes an error in the LUXI protocol."""
77
78
79 class ConnectionClosedError(ProtocolError):
80   """Connection closed error."""
81
82
83 class TimeoutError(ProtocolError):
84   """Operation timeout error."""
85
86
87 class RequestError(ProtocolError):
88   """Error on request.
89
90   This signifies an error in the request format or request handling,
91   but not (e.g.) an error in starting up an instance.
92
93   Some common conditions that can trigger this exception:
94     - job submission failed because the job data was wrong
95     - query failed because required fields were missing
96
97   """
98
99
100 class NoMasterError(ProtocolError):
101   """The master cannot be reached.
102
103   This means that the master daemon is not running or the socket has
104   been removed.
105
106   """
107
108
109 class PermissionError(ProtocolError):
110   """Permission denied while connecting to the master socket.
111
112   This means the user doesn't have the proper rights.
113
114   """
115
116
117 class Transport:
118   """Low-level transport class.
119
120   This is used on the client side.
121
122   This could be replace by any other class that provides the same
123   semantics to the Client. This means:
124     - can send messages and receive messages
125     - safe for multithreading
126
127   """
128
129   def __init__(self, address, timeouts=None):
130     """Constructor for the Client class.
131
132     Arguments:
133       - address: a valid address the the used transport class
134       - timeout: a list of timeouts, to be used on connect and read/write
135
136     There are two timeouts used since we might want to wait for a long
137     time for a response, but the connect timeout should be lower.
138
139     If not passed, we use a default of 10 and respectively 60 seconds.
140
141     Note that on reading data, since the timeout applies to an
142     invidual receive, it might be that the total duration is longer
143     than timeout value passed (we make a hard limit at twice the read
144     timeout).
145
146     """
147     self.address = address
148     if timeouts is None:
149       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
150     else:
151       self._ctimeout, self._rwtimeout = timeouts
152
153     self.socket = None
154     self._buffer = ""
155     self._msgs = collections.deque()
156
157     try:
158       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
159
160       # Try to connect
161       try:
162         utils.Retry(self._Connect, 1.0, self._ctimeout,
163                     args=(self.socket, address, self._ctimeout))
164       except utils.RetryTimeout:
165         raise TimeoutError("Connect timed out")
166
167       self.socket.settimeout(self._rwtimeout)
168     except (socket.error, NoMasterError):
169       if self.socket is not None:
170         self.socket.close()
171       self.socket = None
172       raise
173
174   @staticmethod
175   def _Connect(sock, address, timeout):
176     sock.settimeout(timeout)
177     try:
178       sock.connect(address)
179     except socket.timeout, err:
180       raise TimeoutError("Connect timed out: %s" % str(err))
181     except socket.error, err:
182       error_code = err.args[0]
183       if error_code in (errno.ENOENT, errno.ECONNREFUSED):
184         raise NoMasterError(address)
185       elif error_code in (errno.EPERM, errno.EACCES):
186         raise PermissionError(address)
187       elif error_code == errno.EAGAIN:
188         # Server's socket backlog is full at the moment
189         raise utils.RetryAgain()
190       raise
191
192   def _CheckSocket(self):
193     """Make sure we are connected.
194
195     """
196     if self.socket is None:
197       raise ProtocolError("Connection is closed")
198
199   def Send(self, msg):
200     """Send a message.
201
202     This just sends a message and doesn't wait for the response.
203
204     """
205     if constants.LUXI_EOM in msg:
206       raise ProtocolError("Message terminator found in payload")
207
208     self._CheckSocket()
209     try:
210       # TODO: sendall is not guaranteed to send everything
211       self.socket.sendall(msg + constants.LUXI_EOM)
212     except socket.timeout, err:
213       raise TimeoutError("Sending timeout: %s" % str(err))
214
215   def Recv(self):
216     """Try to receive a message from the socket.
217
218     In case we already have messages queued, we just return from the
219     queue. Otherwise, we try to read data with a _rwtimeout network
220     timeout, and making sure we don't go over 2x_rwtimeout as a global
221     limit.
222
223     """
224     self._CheckSocket()
225     etime = time.time() + self._rwtimeout
226     while not self._msgs:
227       if time.time() > etime:
228         raise TimeoutError("Extended receive timeout")
229       while True:
230         try:
231           data = self.socket.recv(4096)
232         except socket.error, err:
233           if err.args and err.args[0] == errno.EAGAIN:
234             continue
235           raise
236         except socket.timeout, err:
237           raise TimeoutError("Receive timeout: %s" % str(err))
238         break
239       if not data:
240         raise ConnectionClosedError("Connection closed while reading")
241       new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
242       self._buffer = new_msgs.pop()
243       self._msgs.extend(new_msgs)
244     return self._msgs.popleft()
245
246   def Call(self, msg):
247     """Send a message and wait for the response.
248
249     This is just a wrapper over Send and Recv.
250
251     """
252     self.Send(msg)
253     return self.Recv()
254
255   def Close(self):
256     """Close the socket"""
257     if self.socket is not None:
258       self.socket.close()
259       self.socket = None
260
261
262 def ParseRequest(msg):
263   """Parses a LUXI request message.
264
265   """
266   try:
267     request = serializer.LoadJson(msg)
268   except ValueError, err:
269     raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
270
271   logging.debug("LUXI request: %s", request)
272
273   if not isinstance(request, dict):
274     logging.error("LUXI request not a dict: %r", msg)
275     raise ProtocolError("Invalid LUXI request (not a dict)")
276
277   method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
278   args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
279   version = request.get(KEY_VERSION, None) # pylint: disable-msg=E1103
280
281   if method is None or args is None:
282     logging.error("LUXI request missing method or arguments: %r", msg)
283     raise ProtocolError(("Invalid LUXI request (no method or arguments"
284                          " in request): %r") % msg)
285
286   return (method, args, version)
287
288
289 def ParseResponse(msg):
290   """Parses a LUXI response message.
291
292   """
293   # Parse the result
294   try:
295     data = serializer.LoadJson(msg)
296   except Exception, err:
297     raise ProtocolError("Error while deserializing response: %s" % str(err))
298
299   # Validate response
300   if not (isinstance(data, dict) and
301           KEY_SUCCESS in data and
302           KEY_RESULT in data):
303     raise ProtocolError("Invalid response from server: %r" % data)
304
305   return (data[KEY_SUCCESS], data[KEY_RESULT],
306           data.get(KEY_VERSION, None)) # pylint: disable-msg=E1103
307
308
309 def FormatResponse(success, result, version=None):
310   """Formats a LUXI response message.
311
312   """
313   response = {
314     KEY_SUCCESS: success,
315     KEY_RESULT: result,
316     }
317
318   if version is not None:
319     response[KEY_VERSION] = version
320
321   logging.debug("LUXI response: %s", response)
322
323   return serializer.DumpJson(response)
324
325
326 def FormatRequest(method, args, version=None):
327   """Formats a LUXI request message.
328
329   """
330   # Build request
331   request = {
332     KEY_METHOD: method,
333     KEY_ARGS: args,
334     }
335
336   if version is not None:
337     request[KEY_VERSION] = version
338
339   # Serialize the request
340   return serializer.DumpJson(request, indent=False)
341
342
343 def CallLuxiMethod(transport_cb, method, args, version=None):
344   """Send a LUXI request via a transport and return the response.
345
346   """
347   assert callable(transport_cb)
348
349   request_msg = FormatRequest(method, args, version=version)
350
351   # Send request and wait for response
352   response_msg = transport_cb(request_msg)
353
354   (success, result, resp_version) = ParseResponse(response_msg)
355
356   # Verify version if there was one in the response
357   if resp_version is not None and resp_version != version:
358     raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
359                            (version, resp_version))
360
361   if success:
362     return result
363
364   errors.MaybeRaise(result)
365   raise RequestError(result)
366
367
368 class Client(object):
369   """High-level client implementation.
370
371   This uses a backing Transport-like class on top of which it
372   implements data serialization/deserialization.
373
374   """
375   def __init__(self, address=None, timeouts=None, transport=Transport):
376     """Constructor for the Client class.
377
378     Arguments:
379       - address: a valid address the the used transport class
380       - timeout: a list of timeouts, to be used on connect and read/write
381       - transport: a Transport-like class
382
383
384     If timeout is not passed, the default timeouts of the transport
385     class are used.
386
387     """
388     if address is None:
389       address = constants.MASTER_SOCKET
390     self.address = address
391     self.timeouts = timeouts
392     self.transport_class = transport
393     self.transport = None
394     self._InitTransport()
395
396   def _InitTransport(self):
397     """(Re)initialize the transport if needed.
398
399     """
400     if self.transport is None:
401       self.transport = self.transport_class(self.address,
402                                             timeouts=self.timeouts)
403
404   def _CloseTransport(self):
405     """Close the transport, ignoring errors.
406
407     """
408     if self.transport is None:
409       return
410     try:
411       old_transp = self.transport
412       self.transport = None
413       old_transp.Close()
414     except Exception: # pylint: disable-msg=W0703
415       pass
416
417   def _SendMethodCall(self, data):
418     # Send request and wait for response
419     try:
420       self._InitTransport()
421       return self.transport.Call(data)
422     except Exception:
423       self._CloseTransport()
424       raise
425
426   def CallMethod(self, method, args):
427     """Send a generic request and return the response.
428
429     """
430     return CallLuxiMethod(self._SendMethodCall, method, args,
431                           version=constants.LUXI_VERSION)
432
433   def SetQueueDrainFlag(self, drain_flag):
434     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
435
436   def SetWatcherPause(self, until):
437     return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
438
439   def SubmitJob(self, ops):
440     ops_state = map(lambda op: op.__getstate__(), ops)
441     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
442
443   def SubmitManyJobs(self, jobs):
444     jobs_state = []
445     for ops in jobs:
446       jobs_state.append([op.__getstate__() for op in ops])
447     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
448
449   def CancelJob(self, job_id):
450     return self.CallMethod(REQ_CANCEL_JOB, job_id)
451
452   def ArchiveJob(self, job_id):
453     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
454
455   def AutoArchiveJobs(self, age):
456     timeout = (DEF_RWTO - 1) / 2
457     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
458
459   def WaitForJobChangeOnce(self, job_id, fields,
460                            prev_job_info, prev_log_serial,
461                            timeout=WFJC_TIMEOUT):
462     """Waits for changes on a job.
463
464     @param job_id: Job ID
465     @type fields: list
466     @param fields: List of field names to be observed
467     @type prev_job_info: None or list
468     @param prev_job_info: Previously received job information
469     @type prev_log_serial: None or int/long
470     @param prev_log_serial: Highest log serial number previously received
471     @type timeout: int/float
472     @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
473                     be capped to that value)
474
475     """
476     assert timeout >= 0, "Timeout can not be negative"
477     return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
478                            (job_id, fields, prev_job_info,
479                             prev_log_serial,
480                             min(WFJC_TIMEOUT, timeout)))
481
482   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
483     while True:
484       result = self.WaitForJobChangeOnce(job_id, fields,
485                                          prev_job_info, prev_log_serial)
486       if result != constants.JOB_NOTCHANGED:
487         break
488     return result
489
490   def QueryJobs(self, job_ids, fields):
491     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
492
493   def QueryInstances(self, names, fields, use_locking):
494     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
495
496   def QueryNodes(self, names, fields, use_locking):
497     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
498
499   def QueryGroups(self, names, fields, use_locking):
500     return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
501
502   def QueryExports(self, nodes, use_locking):
503     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
504
505   def QueryClusterInfo(self):
506     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
507
508   def QueryConfigValues(self, fields):
509     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
510
511   def QueryTags(self, kind, name):
512     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
513
514   def QueryLocks(self, fields, sync):
515     return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))