cmdlib changes to support nic.network as uuid
[ganeti-local] / lib / luxi.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module for the unix socket protocol
23
24 This module implements the local unix socket protocol. You only need
25 this module and the opcodes module in the client program in order to
26 communicate with the master.
27
28 The module is also used by the master daemon.
29
30 """
31
32 import socket
33 import collections
34 import time
35 import errno
36 import logging
37
38 from ganeti import compat
39 from ganeti import serializer
40 from ganeti import constants
41 from ganeti import errors
42 from ganeti import utils
43 from ganeti import objects
44 from ganeti import pathutils
45
46
47 KEY_METHOD = "method"
48 KEY_ARGS = "args"
49 KEY_SUCCESS = "success"
50 KEY_RESULT = "result"
51 KEY_VERSION = "version"
52
53 REQ_SUBMIT_JOB = "SubmitJob"
54 REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
55 REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
56 REQ_CANCEL_JOB = "CancelJob"
57 REQ_ARCHIVE_JOB = "ArchiveJob"
58 REQ_CHANGE_JOB_PRIORITY = "ChangeJobPriority"
59 REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
60 REQ_QUERY = "Query"
61 REQ_QUERY_FIELDS = "QueryFields"
62 REQ_QUERY_JOBS = "QueryJobs"
63 REQ_QUERY_INSTANCES = "QueryInstances"
64 REQ_QUERY_NODES = "QueryNodes"
65 REQ_QUERY_GROUPS = "QueryGroups"
66 REQ_QUERY_NETWORKS = "QueryNetworks"
67 REQ_QUERY_EXPORTS = "QueryExports"
68 REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
69 REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
70 REQ_QUERY_TAGS = "QueryTags"
71 REQ_SET_DRAIN_FLAG = "SetDrainFlag"
72 REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
73
74 #: List of all LUXI requests
75 REQ_ALL = compat.UniqueFrozenset([
76   REQ_ARCHIVE_JOB,
77   REQ_AUTO_ARCHIVE_JOBS,
78   REQ_CANCEL_JOB,
79   REQ_CHANGE_JOB_PRIORITY,
80   REQ_QUERY,
81   REQ_QUERY_CLUSTER_INFO,
82   REQ_QUERY_CONFIG_VALUES,
83   REQ_QUERY_EXPORTS,
84   REQ_QUERY_FIELDS,
85   REQ_QUERY_GROUPS,
86   REQ_QUERY_INSTANCES,
87   REQ_QUERY_JOBS,
88   REQ_QUERY_NODES,
89   REQ_QUERY_TAGS,
90   REQ_SET_DRAIN_FLAG,
91   REQ_SET_WATCHER_PAUSE,
92   REQ_SUBMIT_JOB,
93   REQ_SUBMIT_MANY_JOBS,
94   REQ_WAIT_FOR_JOB_CHANGE,
95   ])
96
97 DEF_CTMO = 10
98 DEF_RWTO = 60
99
100 # WaitForJobChange timeout
101 WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
102
103
104 class ProtocolError(errors.LuxiError):
105   """Denotes an error in the LUXI protocol."""
106
107
108 class ConnectionClosedError(ProtocolError):
109   """Connection closed error."""
110
111
112 class TimeoutError(ProtocolError):
113   """Operation timeout error."""
114
115
116 class RequestError(ProtocolError):
117   """Error on request.
118
119   This signifies an error in the request format or request handling,
120   but not (e.g.) an error in starting up an instance.
121
122   Some common conditions that can trigger this exception:
123     - job submission failed because the job data was wrong
124     - query failed because required fields were missing
125
126   """
127
128
129 class NoMasterError(ProtocolError):
130   """The master cannot be reached.
131
132   This means that the master daemon is not running or the socket has
133   been removed.
134
135   """
136
137
138 class PermissionError(ProtocolError):
139   """Permission denied while connecting to the master socket.
140
141   This means the user doesn't have the proper rights.
142
143   """
144
145
146 class Transport:
147   """Low-level transport class.
148
149   This is used on the client side.
150
151   This could be replace by any other class that provides the same
152   semantics to the Client. This means:
153     - can send messages and receive messages
154     - safe for multithreading
155
156   """
157
158   def __init__(self, address, timeouts=None):
159     """Constructor for the Client class.
160
161     Arguments:
162       - address: a valid address the the used transport class
163       - timeout: a list of timeouts, to be used on connect and read/write
164
165     There are two timeouts used since we might want to wait for a long
166     time for a response, but the connect timeout should be lower.
167
168     If not passed, we use a default of 10 and respectively 60 seconds.
169
170     Note that on reading data, since the timeout applies to an
171     invidual receive, it might be that the total duration is longer
172     than timeout value passed (we make a hard limit at twice the read
173     timeout).
174
175     """
176     self.address = address
177     if timeouts is None:
178       self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
179     else:
180       self._ctimeout, self._rwtimeout = timeouts
181
182     self.socket = None
183     self._buffer = ""
184     self._msgs = collections.deque()
185
186     try:
187       self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
188
189       # Try to connect
190       try:
191         utils.Retry(self._Connect, 1.0, self._ctimeout,
192                     args=(self.socket, address, self._ctimeout))
193       except utils.RetryTimeout:
194         raise TimeoutError("Connect timed out")
195
196       self.socket.settimeout(self._rwtimeout)
197     except (socket.error, NoMasterError):
198       if self.socket is not None:
199         self.socket.close()
200       self.socket = None
201       raise
202
203   @staticmethod
204   def _Connect(sock, address, timeout):
205     sock.settimeout(timeout)
206     try:
207       sock.connect(address)
208     except socket.timeout, err:
209       raise TimeoutError("Connect timed out: %s" % str(err))
210     except socket.error, err:
211       error_code = err.args[0]
212       if error_code in (errno.ENOENT, errno.ECONNREFUSED):
213         raise NoMasterError(address)
214       elif error_code in (errno.EPERM, errno.EACCES):
215         raise PermissionError(address)
216       elif error_code == errno.EAGAIN:
217         # Server's socket backlog is full at the moment
218         raise utils.RetryAgain()
219       raise
220
221   def _CheckSocket(self):
222     """Make sure we are connected.
223
224     """
225     if self.socket is None:
226       raise ProtocolError("Connection is closed")
227
228   def Send(self, msg):
229     """Send a message.
230
231     This just sends a message and doesn't wait for the response.
232
233     """
234     if constants.LUXI_EOM in msg:
235       raise ProtocolError("Message terminator found in payload")
236
237     self._CheckSocket()
238     try:
239       # TODO: sendall is not guaranteed to send everything
240       self.socket.sendall(msg + constants.LUXI_EOM)
241     except socket.timeout, err:
242       raise TimeoutError("Sending timeout: %s" % str(err))
243
244   def Recv(self):
245     """Try to receive a message from the socket.
246
247     In case we already have messages queued, we just return from the
248     queue. Otherwise, we try to read data with a _rwtimeout network
249     timeout, and making sure we don't go over 2x_rwtimeout as a global
250     limit.
251
252     """
253     self._CheckSocket()
254     etime = time.time() + self._rwtimeout
255     while not self._msgs:
256       if time.time() > etime:
257         raise TimeoutError("Extended receive timeout")
258       while True:
259         try:
260           data = self.socket.recv(4096)
261         except socket.timeout, err:
262           raise TimeoutError("Receive timeout: %s" % str(err))
263         except socket.error, err:
264           if err.args and err.args[0] == errno.EAGAIN:
265             continue
266           raise
267         break
268       if not data:
269         raise ConnectionClosedError("Connection closed while reading")
270       new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
271       self._buffer = new_msgs.pop()
272       self._msgs.extend(new_msgs)
273     return self._msgs.popleft()
274
275   def Call(self, msg):
276     """Send a message and wait for the response.
277
278     This is just a wrapper over Send and Recv.
279
280     """
281     self.Send(msg)
282     return self.Recv()
283
284   def Close(self):
285     """Close the socket"""
286     if self.socket is not None:
287       self.socket.close()
288       self.socket = None
289
290
291 def ParseRequest(msg):
292   """Parses a LUXI request message.
293
294   """
295   try:
296     request = serializer.LoadJson(msg)
297   except ValueError, err:
298     raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
299
300   logging.debug("LUXI request: %s", request)
301
302   if not isinstance(request, dict):
303     logging.error("LUXI request not a dict: %r", msg)
304     raise ProtocolError("Invalid LUXI request (not a dict)")
305
306   method = request.get(KEY_METHOD, None) # pylint: disable=E1103
307   args = request.get(KEY_ARGS, None) # pylint: disable=E1103
308   version = request.get(KEY_VERSION, None) # pylint: disable=E1103
309
310   if method is None or args is None:
311     logging.error("LUXI request missing method or arguments: %r", msg)
312     raise ProtocolError(("Invalid LUXI request (no method or arguments"
313                          " in request): %r") % msg)
314
315   return (method, args, version)
316
317
318 def ParseResponse(msg):
319   """Parses a LUXI response message.
320
321   """
322   # Parse the result
323   try:
324     data = serializer.LoadJson(msg)
325   except KeyboardInterrupt:
326     raise
327   except Exception, err:
328     raise ProtocolError("Error while deserializing response: %s" % str(err))
329
330   # Validate response
331   if not (isinstance(data, dict) and
332           KEY_SUCCESS in data and
333           KEY_RESULT in data):
334     raise ProtocolError("Invalid response from server: %r" % data)
335
336   return (data[KEY_SUCCESS], data[KEY_RESULT],
337           data.get(KEY_VERSION, None)) # pylint: disable=E1103
338
339
340 def FormatResponse(success, result, version=None):
341   """Formats a LUXI response message.
342
343   """
344   response = {
345     KEY_SUCCESS: success,
346     KEY_RESULT: result,
347     }
348
349   if version is not None:
350     response[KEY_VERSION] = version
351
352   logging.debug("LUXI response: %s", response)
353
354   return serializer.DumpJson(response)
355
356
357 def FormatRequest(method, args, version=None):
358   """Formats a LUXI request message.
359
360   """
361   # Build request
362   request = {
363     KEY_METHOD: method,
364     KEY_ARGS: args,
365     }
366
367   if version is not None:
368     request[KEY_VERSION] = version
369
370   # Serialize the request
371   return serializer.DumpJson(request)
372
373
374 def CallLuxiMethod(transport_cb, method, args, version=None):
375   """Send a LUXI request via a transport and return the response.
376
377   """
378   assert callable(transport_cb)
379
380   request_msg = FormatRequest(method, args, version=version)
381
382   # Send request and wait for response
383   response_msg = transport_cb(request_msg)
384
385   (success, result, resp_version) = ParseResponse(response_msg)
386
387   # Verify version if there was one in the response
388   if resp_version is not None and resp_version != version:
389     raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
390                            (version, resp_version))
391
392   if success:
393     return result
394
395   errors.MaybeRaise(result)
396   raise RequestError(result)
397
398
399 class Client(object):
400   """High-level client implementation.
401
402   This uses a backing Transport-like class on top of which it
403   implements data serialization/deserialization.
404
405   """
406   def __init__(self, address=None, timeouts=None, transport=Transport):
407     """Constructor for the Client class.
408
409     Arguments:
410       - address: a valid address the the used transport class
411       - timeout: a list of timeouts, to be used on connect and read/write
412       - transport: a Transport-like class
413
414
415     If timeout is not passed, the default timeouts of the transport
416     class are used.
417
418     """
419     if address is None:
420       address = pathutils.MASTER_SOCKET
421     self.address = address
422     self.timeouts = timeouts
423     self.transport_class = transport
424     self.transport = None
425     self._InitTransport()
426
427   def _InitTransport(self):
428     """(Re)initialize the transport if needed.
429
430     """
431     if self.transport is None:
432       self.transport = self.transport_class(self.address,
433                                             timeouts=self.timeouts)
434
435   def _CloseTransport(self):
436     """Close the transport, ignoring errors.
437
438     """
439     if self.transport is None:
440       return
441     try:
442       old_transp = self.transport
443       self.transport = None
444       old_transp.Close()
445     except Exception: # pylint: disable=W0703
446       pass
447
448   def _SendMethodCall(self, data):
449     # Send request and wait for response
450     try:
451       self._InitTransport()
452       return self.transport.Call(data)
453     except Exception:
454       self._CloseTransport()
455       raise
456
457   def Close(self):
458     """Close the underlying connection.
459
460     """
461     self._CloseTransport()
462
463   def CallMethod(self, method, args):
464     """Send a generic request and return the response.
465
466     """
467     if not isinstance(args, (list, tuple)):
468       raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
469                                    " expected list, got %s" % type(args))
470     return CallLuxiMethod(self._SendMethodCall, method, args,
471                           version=constants.LUXI_VERSION)
472
473   def SetQueueDrainFlag(self, drain_flag):
474     return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
475
476   def SetWatcherPause(self, until):
477     return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
478
479   def SubmitJob(self, ops):
480     ops_state = map(lambda op: op.__getstate__(), ops)
481     return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
482
483   def SubmitManyJobs(self, jobs):
484     jobs_state = []
485     for ops in jobs:
486       jobs_state.append([op.__getstate__() for op in ops])
487     return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
488
489   def CancelJob(self, job_id):
490     return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
491
492   def ArchiveJob(self, job_id):
493     return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
494
495   def ChangeJobPriority(self, job_id, priority):
496     return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
497
498   def AutoArchiveJobs(self, age):
499     timeout = (DEF_RWTO - 1) / 2
500     return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
501
502   def WaitForJobChangeOnce(self, job_id, fields,
503                            prev_job_info, prev_log_serial,
504                            timeout=WFJC_TIMEOUT):
505     """Waits for changes on a job.
506
507     @param job_id: Job ID
508     @type fields: list
509     @param fields: List of field names to be observed
510     @type prev_job_info: None or list
511     @param prev_job_info: Previously received job information
512     @type prev_log_serial: None or int/long
513     @param prev_log_serial: Highest log serial number previously received
514     @type timeout: int/float
515     @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
516                     be capped to that value)
517
518     """
519     assert timeout >= 0, "Timeout can not be negative"
520     return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
521                            (job_id, fields, prev_job_info,
522                             prev_log_serial,
523                             min(WFJC_TIMEOUT, timeout)))
524
525   def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
526     while True:
527       result = self.WaitForJobChangeOnce(job_id, fields,
528                                          prev_job_info, prev_log_serial)
529       if result != constants.JOB_NOTCHANGED:
530         break
531     return result
532
533   def Query(self, what, fields, qfilter):
534     """Query for resources/items.
535
536     @param what: One of L{constants.QR_VIA_LUXI}
537     @type fields: List of strings
538     @param fields: List of requested fields
539     @type qfilter: None or list
540     @param qfilter: Query filter
541     @rtype: L{objects.QueryResponse}
542
543     """
544     result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
545     return objects.QueryResponse.FromDict(result)
546
547   def QueryFields(self, what, fields):
548     """Query for available fields.
549
550     @param what: One of L{constants.QR_VIA_LUXI}
551     @type fields: None or list of strings
552     @param fields: List of requested fields
553     @rtype: L{objects.QueryFieldsResponse}
554
555     """
556     result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
557     return objects.QueryFieldsResponse.FromDict(result)
558
559   def QueryJobs(self, job_ids, fields):
560     return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
561
562   def QueryInstances(self, names, fields, use_locking):
563     return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
564
565   def QueryNodes(self, names, fields, use_locking):
566     return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
567
568   def QueryGroups(self, names, fields, use_locking):
569     return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
570
571   def QueryNetworks(self, names, fields, use_locking):
572     return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
573
574   def QueryExports(self, nodes, use_locking):
575     return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
576
577   def QueryClusterInfo(self):
578     return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
579
580   def QueryConfigValues(self, fields):
581     return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
582
583   def QueryTags(self, kind, name):
584     return self.CallMethod(REQ_QUERY_TAGS, (kind, name))