Rename OpRemoveInstance and LURemoveInstance
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36 import logging
37 import warnings
38
39 from ganeti import serializer
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import utils
43 from ganeti import objects
44
45
46 KEY_METHOD = "method"
47 KEY_ARGS = "args"
48 KEY_SUCCESS = "success"
49 KEY_RESULT = "result"
50 KEY_VERSION = "version"
51
52 REQ_SUBMIT_JOB = "SubmitJob"
53 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
54 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
55 REQ_CANCEL_JOB = "CancelJob"
56 REQ_ARCHIVE_JOB = "ArchiveJob"
57 REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
58 REQ_QUERY = "Query"
59 REQ_QUERY_FIELDS = "QueryFields"
60 REQ_QUERY_JOBS = "QueryJobs"
61 REQ_QUERY_INSTANCES = "QueryInstances"
62 REQ_QUERY_NODES = "QueryNodes"
63 REQ_QUERY_GROUPS = "QueryGroups"
64 REQ_QUERY_EXPORTS = "QueryExports"
65 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
66 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
67 REQ_QUERY_TAGS = "QueryTags"
68 REQ_QUERY_LOCKS = "QueryLocks"
69 REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
70 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
71
72 DEF_CTMO = 10
73 DEF_RWTO = 60
74
75 # WaitForJobChange timeout
76 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
77
78
79 class ProtocolError(errors.LuxiError):
80   """Denotes an error in the LUXI protocol."""
81
82
83 class ConnectionClosedError(ProtocolError):
84   """Connection closed error."""
85
86
87 class TimeoutError(ProtocolError):
88   """Operation timeout error."""
89
90
91 class RequestError(ProtocolError):
92   """Error on request.
93
94   This signifies an error in the request format or request handling,
95   but not (e.g.) an error in starting up an instance.
96
97   Some common conditions that can trigger this exception:
98     - job submission failed because the job data was wrong
99     - query failed because required fields were missing
100
101   """
102
103
104 class NoMasterError(ProtocolError):
105   """The master cannot be reached.
106
107   This means that the master daemon is not running or the socket has
108   been removed.
109
110   """
111
112
113 class PermissionError(ProtocolError):
114   """Permission denied while connecting to the master socket.
115
116   This means the user doesn't have the proper rights.
117
118   """
119
120
121 class Transport:
122   """Low-level transport class.
123
124   This is used on the client side.
125
126   This could be replace by any other class that provides the same
127   semantics to the Client. This means:
128     - can send messages and receive messages
129     - safe for multithreading
130
131   """
132
133   def __init__(self, address, timeouts=None):
134     """Constructor for the Client class.
135
136     Arguments:
137       - address: a valid address the the used transport class
138       - timeout: a list of timeouts, to be used on connect and read/write
139
140     There are two timeouts used since we might want to wait for a long
141     time for a response, but the connect timeout should be lower.
142
143     If not passed, we use a default of 10 and respectively 60 seconds.
144
145     Note that on reading data, since the timeout applies to an
146     invidual receive, it might be that the total duration is longer
147     than timeout value passed (we make a hard limit at twice the read
148     timeout).
149
150     """
151     self.address = address
152     if timeouts is None:
153       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
154     else:
155       self._ctimeout, self._rwtimeout = timeouts
156
157     self.socket = None
158     self._buffer = ""
159     self._msgs = collections.deque()
160
161     try:
162       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
163
164       # Try to connect
165       try:
166         utils.Retry(self._Connect, 1.0, self._ctimeout,
167                     args=(self.socket, address, self._ctimeout))
168       except utils.RetryTimeout:
169         raise TimeoutError("Connect timed out")
170
171       self.socket.settimeout(self._rwtimeout)
172     except (socket.error, NoMasterError):
173       if self.socket is not None:
174         self.socket.close()
175       self.socket = None
176       raise
177
178   @staticmethod
179   def _Connect(sock, address, timeout):
180     sock.settimeout(timeout)
181     try:
182       sock.connect(address)
183     except socket.timeout, err:
184       raise TimeoutError("Connect timed out: %s" % str(err))
185     except socket.error, err:
186       error_code = err.args[0]
187       if error_code in (errno.ENOENT, errno.ECONNREFUSED):
188         raise NoMasterError(address)
189       elif error_code in (errno.EPERM, errno.EACCES):
190         raise PermissionError(address)
191       elif error_code == errno.EAGAIN:
192         # Server's socket backlog is full at the moment
193         raise utils.RetryAgain()
194       raise
195
196   def _CheckSocket(self):
197     """Make sure we are connected.
198
199     """
200     if self.socket is None:
201       raise ProtocolError("Connection is closed")
202
203   def Send(self, msg):
204     """Send a message.
205
206     This just sends a message and doesn't wait for the response.
207
208     """
209     if constants.LUXI_EOM in msg:
210       raise ProtocolError("Message terminator found in payload")
211
212     self._CheckSocket()
213     try:
214       # TODO: sendall is not guaranteed to send everything
215       self.socket.sendall(msg + constants.LUXI_EOM)
216     except socket.timeout, err:
217       raise TimeoutError("Sending timeout: %s" % str(err))
218
219   def Recv(self):
220     """Try to receive a message from the socket.
221
222     In case we already have messages queued, we just return from the
223     queue. Otherwise, we try to read data with a _rwtimeout network
224     timeout, and making sure we don't go over 2x_rwtimeout as a global
225     limit.
226
227     """
228     self._CheckSocket()
229     etime = time.time() + self._rwtimeout
230     while not self._msgs:
231       if time.time() > etime:
232         raise TimeoutError("Extended receive timeout")
233       while True:
234         try:
235           data = self.socket.recv(4096)
236         except socket.timeout, err:
237           raise TimeoutError("Receive timeout: %s" % str(err))
238         except socket.error, err:
239           if err.args and err.args[0] == errno.EAGAIN:
240             continue
241           raise
242         break
243       if not data:
244         raise ConnectionClosedError("Connection closed while reading")
245       new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
246       self._buffer = new_msgs.pop()
247       self._msgs.extend(new_msgs)
248     return self._msgs.popleft()
249
250   def Call(self, msg):
251     """Send a message and wait for the response.
252
253     This is just a wrapper over Send and Recv.
254
255     """
256     self.Send(msg)
257     return self.Recv()
258
259   def Close(self):
260     """Close the socket"""
261     if self.socket is not None:
262       self.socket.close()
263       self.socket = None
264
265
266 def ParseRequest(msg):
267   """Parses a LUXI request message.
268
269   """
270   try:
271     request = serializer.LoadJson(msg)
272   except ValueError, err:
273     raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
274
275   logging.debug("LUXI request: %s", request)
276
277   if not isinstance(request, dict):
278     logging.error("LUXI request not a dict: %r", msg)
279     raise ProtocolError("Invalid LUXI request (not a dict)")
280
281   method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
282   args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
283   version = request.get(KEY_VERSION, None) # pylint: disable-msg=E1103
284
285   if method is None or args is None:
286     logging.error("LUXI request missing method or arguments: %r", msg)
287     raise ProtocolError(("Invalid LUXI request (no method or arguments"
288                          " in request): %r") % msg)
289
290   return (method, args, version)
291
292
293 def ParseResponse(msg):
294   """Parses a LUXI response message.
295
296   """
297   # Parse the result
298   try:
299     data = serializer.LoadJson(msg)
300   except Exception, err:
301     raise ProtocolError("Error while deserializing response: %s" % str(err))
302
303   # Validate response
304   if not (isinstance(data, dict) and
305           KEY_SUCCESS in data and
306           KEY_RESULT in data):
307     raise ProtocolError("Invalid response from server: %r" % data)
308
309   return (data[KEY_SUCCESS], data[KEY_RESULT],
310           data.get(KEY_VERSION, None)) # pylint: disable-msg=E1103
311
312
313 def FormatResponse(success, result, version=None):
314   """Formats a LUXI response message.
315
316   """
317   response = {
318     KEY_SUCCESS: success,
319     KEY_RESULT: result,
320     }
321
322   if version is not None:
323     response[KEY_VERSION] = version
324
325   logging.debug("LUXI response: %s", response)
326
327   return serializer.DumpJson(response)
328
329
330 def FormatRequest(method, args, version=None):
331   """Formats a LUXI request message.
332
333   """
334   # Build request
335   request = {
336     KEY_METHOD: method,
337     KEY_ARGS: args,
338     }
339
340   if version is not None:
341     request[KEY_VERSION] = version
342
343   # Serialize the request
344   return serializer.DumpJson(request, indent=False)
345
346
347 def CallLuxiMethod(transport_cb, method, args, version=None):
348   """Send a LUXI request via a transport and return the response.
349
350   """
351   assert callable(transport_cb)
352
353   request_msg = FormatRequest(method, args, version=version)
354
355   # Send request and wait for response
356   response_msg = transport_cb(request_msg)
357
358   (success, result, resp_version) = ParseResponse(response_msg)
359
360   # Verify version if there was one in the response
361   if resp_version is not None and resp_version != version:
362     raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
363                            (version, resp_version))
364
365   if success:
366     return result
367
368   errors.MaybeRaise(result)
369   raise RequestError(result)
370
371
372 class Client(object):
373   """High-level client implementation.
374
375   This uses a backing Transport-like class on top of which it
376   implements data serialization/deserialization.
377
378   """
379   def __init__(self, address=None, timeouts=None, transport=Transport):
380     """Constructor for the Client class.
381
382     Arguments:
383       - address: a valid address the the used transport class
384       - timeout: a list of timeouts, to be used on connect and read/write
385       - transport: a Transport-like class
386
387
388     If timeout is not passed, the default timeouts of the transport
389     class are used.
390
391     """
392     if address is None:
393       address = constants.MASTER_SOCKET
394     self.address = address
395     self.timeouts = timeouts
396     self.transport_class = transport
397     self.transport = None
398     self._InitTransport()
399
400   def _InitTransport(self):
401     """(Re)initialize the transport if needed.
402
403     """
404     if self.transport is None:
405       self.transport = self.transport_class(self.address,
406                                             timeouts=self.timeouts)
407
408   def _CloseTransport(self):
409     """Close the transport, ignoring errors.
410
411     """
412     if self.transport is None:
413       return
414     try:
415       old_transp = self.transport
416       self.transport = None
417       old_transp.Close()
418     except Exception: # pylint: disable-msg=W0703
419       pass
420
421   def _SendMethodCall(self, data):
422     # Send request and wait for response
423     try:
424       self._InitTransport()
425       return self.transport.Call(data)
426     except Exception:
427       self._CloseTransport()
428       raise
429
430   def Close(self):
431     """Close the underlying connection.
432
433     """
434     self._CloseTransport()
435
436   def CallMethod(self, method, args):
437     """Send a generic request and return the response.
438
439     """
440     return CallLuxiMethod(self._SendMethodCall, method, args,
441                           version=constants.LUXI_VERSION)
442
443   def SetQueueDrainFlag(self, drain_flag):
444     return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
445
446   def SetWatcherPause(self, until):
447     return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
448
449   def SubmitJob(self, ops):
450     ops_state = map(lambda op: op.__getstate__(), ops)
451     return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
452
453   def SubmitManyJobs(self, jobs):
454     jobs_state = []
455     for ops in jobs:
456       jobs_state.append([op.__getstate__() for op in ops])
457     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
458
459   def CancelJob(self, job_id):
460     return self.CallMethod(REQ_CANCEL_JOB, job_id)
461
462   def ArchiveJob(self, job_id):
463     return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
464
465   def AutoArchiveJobs(self, age):
466     timeout = (DEF_RWTO - 1) / 2
467     return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
468
469   def WaitForJobChangeOnce(self, job_id, fields,
470                            prev_job_info, prev_log_serial,
471                            timeout=WFJC_TIMEOUT):
472     """Waits for changes on a job.
473
474     @param job_id: Job ID
475     @type fields: list
476     @param fields: List of field names to be observed
477     @type prev_job_info: None or list
478     @param prev_job_info: Previously received job information
479     @type prev_log_serial: None or int/long
480     @param prev_log_serial: Highest log serial number previously received
481     @type timeout: int/float
482     @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
483                     be capped to that value)
484
485     """
486     assert timeout >= 0, "Timeout can not be negative"
487     return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
488                            (job_id, fields, prev_job_info,
489                             prev_log_serial,
490                             min(WFJC_TIMEOUT, timeout)))
491
492   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
493     while True:
494       result = self.WaitForJobChangeOnce(job_id, fields,
495                                          prev_job_info, prev_log_serial)
496       if result != constants.JOB_NOTCHANGED:
497         break
498     return result
499
500   def Query(self, what, fields, filter_):
501     """Query for resources/items.
502
503     @param what: One of L{constants.QR_OP_LUXI}
504     @type fields: List of strings
505     @param fields: List of requested fields
506     @type filter_: None or list
507     @param filter_: Query filter
508     @rtype: L{objects.QueryResponse}
509
510     """
511     req = objects.QueryRequest(what=what, fields=fields, filter=filter_)
512     result = self.CallMethod(REQ_QUERY, req.ToDict())
513     return objects.QueryResponse.FromDict(result)
514
515   def QueryFields(self, what, fields):
516     """Query for available fields.
517
518     @param what: One of L{constants.QR_OP_LUXI}
519     @type fields: None or list of strings
520     @param fields: List of requested fields
521     @rtype: L{objects.QueryFieldsResponse}
522
523     """
524     req = objects.QueryFieldsRequest(what=what, fields=fields)
525     result = self.CallMethod(REQ_QUERY_FIELDS, req.ToDict())
526     return objects.QueryFieldsResponse.FromDict(result)
527
528   def QueryJobs(self, job_ids, fields):
529     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
530
531   def QueryInstances(self, names, fields, use_locking):
532     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
533
534   def QueryNodes(self, names, fields, use_locking):
535     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
536
537   def QueryGroups(self, names, fields, use_locking):
538     return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
539
540   def QueryExports(self, nodes, use_locking):
541     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
542
543   def QueryClusterInfo(self):
544     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
545
546   def QueryConfigValues(self, fields):
547     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
548
549   def QueryTags(self, kind, name):
550     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
551
552   def QueryLocks(self, fields, sync):
553     warnings.warn("This LUXI call is deprecated and will be removed, use"
554                   " Query(\"%s\", ...) instead" % constants.QR_LOCK)
555     return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))