Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ bc57fa8d

History | View | Annotate | Download (16.8 kB)

1 c2a03789 Iustin Pop
#
2 c2a03789 Iustin Pop
#
3 c2a03789 Iustin Pop
4 83c046a2 Iustin Pop
# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5 c2a03789 Iustin Pop
#
6 c2a03789 Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 c2a03789 Iustin Pop
# it under the terms of the GNU General Public License as published by
8 c2a03789 Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 c2a03789 Iustin Pop
# (at your option) any later version.
10 c2a03789 Iustin Pop
#
11 c2a03789 Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 c2a03789 Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 c2a03789 Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 c2a03789 Iustin Pop
# General Public License for more details.
15 c2a03789 Iustin Pop
#
16 c2a03789 Iustin Pop
# You should have received a copy of the GNU General Public License
17 c2a03789 Iustin Pop
# along with this program; if not, write to the Free Software
18 c2a03789 Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 c2a03789 Iustin Pop
# 02110-1301, USA.
20 c2a03789 Iustin Pop
21 c2a03789 Iustin Pop
22 c2a03789 Iustin Pop
"""Module for the unix socket protocol
23 c2a03789 Iustin Pop

24 8d5b316c Iustin Pop
This module implements the local unix socket protocol. You only need
25 c2a03789 Iustin Pop
this module and the opcodes module in the client program in order to
26 c2a03789 Iustin Pop
communicate with the master.
27 c2a03789 Iustin Pop

28 7577196d Guido Trotter
The module is also used by the master daemon.
29 c2a03789 Iustin Pop

30 c2a03789 Iustin Pop
"""
31 c2a03789 Iustin Pop
32 c2a03789 Iustin Pop
import socket
33 c2a03789 Iustin Pop
import collections
34 c2a03789 Iustin Pop
import time
35 03a8dbdc Iustin Pop
import errno
36 231db3a5 Michael Hanselmann
import logging
37 c2a03789 Iustin Pop
38 fad50141 Michael Hanselmann
from ganeti import serializer
39 ceab32dd Iustin Pop
from ganeti import constants
40 6797ec29 Iustin Pop
from ganeti import errors
41 cb462b06 Michael Hanselmann
from ganeti import utils
42 28b71a76 Michael Hanselmann
from ganeti import objects
43 b87ee98f Michael Hanselmann
from ganeti import pathutils
44 c2a03789 Iustin Pop
45 c2a03789 Iustin Pop
46 fbb05686 Jose A. Lopes
KEY_METHOD = constants.LUXI_KEY_METHOD
47 fbb05686 Jose A. Lopes
KEY_ARGS = constants.LUXI_KEY_ARGS
48 fbb05686 Jose A. Lopes
KEY_SUCCESS = constants.LUXI_KEY_SUCCESS
49 fbb05686 Jose A. Lopes
KEY_RESULT = constants.LUXI_KEY_RESULT
50 fbb05686 Jose A. Lopes
KEY_VERSION = constants.LUXI_KEY_VERSION
51 fbb05686 Jose A. Lopes
52 fbb05686 Jose A. Lopes
REQ_SUBMIT_JOB = constants.LUXI_REQ_SUBMIT_JOB
53 fbb05686 Jose A. Lopes
REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = constants.LUXI_REQ_SUBMIT_JOB_TO_DRAINED_QUEUE
54 fbb05686 Jose A. Lopes
REQ_SUBMIT_MANY_JOBS = constants.LUXI_REQ_SUBMIT_MANY_JOBS
55 a4417db4 Thomas Thrainer
REQ_PICKUP_JOB = constants.LUXI_REQ_PICKUP_JOB
56 fbb05686 Jose A. Lopes
REQ_WAIT_FOR_JOB_CHANGE = constants.LUXI_REQ_WAIT_FOR_JOB_CHANGE
57 fbb05686 Jose A. Lopes
REQ_CANCEL_JOB = constants.LUXI_REQ_CANCEL_JOB
58 fbb05686 Jose A. Lopes
REQ_ARCHIVE_JOB = constants.LUXI_REQ_ARCHIVE_JOB
59 fbb05686 Jose A. Lopes
REQ_CHANGE_JOB_PRIORITY = constants.LUXI_REQ_CHANGE_JOB_PRIORITY
60 fbb05686 Jose A. Lopes
REQ_AUTO_ARCHIVE_JOBS = constants.LUXI_REQ_AUTO_ARCHIVE_JOBS
61 fbb05686 Jose A. Lopes
REQ_QUERY = constants.LUXI_REQ_QUERY
62 fbb05686 Jose A. Lopes
REQ_QUERY_FIELDS = constants.LUXI_REQ_QUERY_FIELDS
63 fbb05686 Jose A. Lopes
REQ_QUERY_JOBS = constants.LUXI_REQ_QUERY_JOBS
64 fbb05686 Jose A. Lopes
REQ_QUERY_INSTANCES = constants.LUXI_REQ_QUERY_INSTANCES
65 fbb05686 Jose A. Lopes
REQ_QUERY_NODES = constants.LUXI_REQ_QUERY_NODES
66 fbb05686 Jose A. Lopes
REQ_QUERY_GROUPS = constants.LUXI_REQ_QUERY_GROUPS
67 fbb05686 Jose A. Lopes
REQ_QUERY_NETWORKS = constants.LUXI_REQ_QUERY_NETWORKS
68 fbb05686 Jose A. Lopes
REQ_QUERY_EXPORTS = constants.LUXI_REQ_QUERY_EXPORTS
69 fbb05686 Jose A. Lopes
REQ_QUERY_CONFIG_VALUES = constants.LUXI_REQ_QUERY_CONFIG_VALUES
70 fbb05686 Jose A. Lopes
REQ_QUERY_CLUSTER_INFO = constants.LUXI_REQ_QUERY_CLUSTER_INFO
71 fbb05686 Jose A. Lopes
REQ_QUERY_TAGS = constants.LUXI_REQ_QUERY_TAGS
72 fbb05686 Jose A. Lopes
REQ_SET_DRAIN_FLAG = constants.LUXI_REQ_SET_DRAIN_FLAG
73 fbb05686 Jose A. Lopes
REQ_SET_WATCHER_PAUSE = constants.LUXI_REQ_SET_WATCHER_PAUSE
74 fbb05686 Jose A. Lopes
REQ_ALL = constants.LUXI_REQ_ALL
75 fbb05686 Jose A. Lopes
76 fbb05686 Jose A. Lopes
DEF_CTMO = constants.LUXI_DEF_CTMO
77 fbb05686 Jose A. Lopes
DEF_RWTO = constants.LUXI_DEF_RWTO
78 fbb05686 Jose A. Lopes
WFJC_TIMEOUT = constants.LUXI_WFJC_TIMEOUT
79 793a8f7c Michael Hanselmann
80 c2a03789 Iustin Pop
81 7a8bda3f Michael Hanselmann
class ProtocolError(errors.LuxiError):
82 5a1c22fe Iustin Pop
  """Denotes an error in the LUXI protocol."""
83 c2a03789 Iustin Pop
84 c2a03789 Iustin Pop
85 c2a03789 Iustin Pop
class ConnectionClosedError(ProtocolError):
86 5a1c22fe Iustin Pop
  """Connection closed error."""
87 c2a03789 Iustin Pop
88 c2a03789 Iustin Pop
89 c2a03789 Iustin Pop
class TimeoutError(ProtocolError):
90 5a1c22fe Iustin Pop
  """Operation timeout error."""
91 c2a03789 Iustin Pop
92 c2a03789 Iustin Pop
93 b77acb3e Iustin Pop
class RequestError(ProtocolError):
94 5a1c22fe Iustin Pop
  """Error on request.
95 b77acb3e Iustin Pop

96 b77acb3e Iustin Pop
  This signifies an error in the request format or request handling,
97 b77acb3e Iustin Pop
  but not (e.g.) an error in starting up an instance.
98 b77acb3e Iustin Pop

99 b77acb3e Iustin Pop
  Some common conditions that can trigger this exception:
100 b77acb3e Iustin Pop
    - job submission failed because the job data was wrong
101 b77acb3e Iustin Pop
    - query failed because required fields were missing
102 b77acb3e Iustin Pop

103 b77acb3e Iustin Pop
  """
104 b77acb3e Iustin Pop
105 3d8548c4 Michael Hanselmann
106 03a8dbdc Iustin Pop
class NoMasterError(ProtocolError):
107 5a1c22fe Iustin Pop
  """The master cannot be reached.
108 03a8dbdc Iustin Pop

109 03a8dbdc Iustin Pop
  This means that the master daemon is not running or the socket has
110 03a8dbdc Iustin Pop
  been removed.
111 03a8dbdc Iustin Pop

112 03a8dbdc Iustin Pop
  """
113 03a8dbdc Iustin Pop
114 b77acb3e Iustin Pop
115 5a1c22fe Iustin Pop
class PermissionError(ProtocolError):
116 5a1c22fe Iustin Pop
  """Permission denied while connecting to the master socket.
117 5a1c22fe Iustin Pop

118 5a1c22fe Iustin Pop
  This means the user doesn't have the proper rights.
119 5a1c22fe Iustin Pop

120 5a1c22fe Iustin Pop
  """
121 5a1c22fe Iustin Pop
122 5a1c22fe Iustin Pop
123 c2a03789 Iustin Pop
class Transport:
124 c2a03789 Iustin Pop
  """Low-level transport class.
125 c2a03789 Iustin Pop

126 c2a03789 Iustin Pop
  This is used on the client side.
127 c2a03789 Iustin Pop

128 c2a03789 Iustin Pop
  This could be replace by any other class that provides the same
129 c2a03789 Iustin Pop
  semantics to the Client. This means:
130 c2a03789 Iustin Pop
    - can send messages and receive messages
131 c2a03789 Iustin Pop
    - safe for multithreading
132 c2a03789 Iustin Pop

133 c2a03789 Iustin Pop
  """
134 c2a03789 Iustin Pop
135 25942a6c Guido Trotter
  def __init__(self, address, timeouts=None):
136 c2a03789 Iustin Pop
    """Constructor for the Client class.
137 c2a03789 Iustin Pop

138 c2a03789 Iustin Pop
    Arguments:
139 c2a03789 Iustin Pop
      - address: a valid address the the used transport class
140 c2a03789 Iustin Pop
      - timeout: a list of timeouts, to be used on connect and read/write
141 c2a03789 Iustin Pop

142 c2a03789 Iustin Pop
    There are two timeouts used since we might want to wait for a long
143 c2a03789 Iustin Pop
    time for a response, but the connect timeout should be lower.
144 c2a03789 Iustin Pop

145 c2a03789 Iustin Pop
    If not passed, we use a default of 10 and respectively 60 seconds.
146 c2a03789 Iustin Pop

147 c2a03789 Iustin Pop
    Note that on reading data, since the timeout applies to an
148 c2a03789 Iustin Pop
    invidual receive, it might be that the total duration is longer
149 c2a03789 Iustin Pop
    than timeout value passed (we make a hard limit at twice the read
150 c2a03789 Iustin Pop
    timeout).
151 c2a03789 Iustin Pop

152 c2a03789 Iustin Pop
    """
153 c2a03789 Iustin Pop
    self.address = address
154 c2a03789 Iustin Pop
    if timeouts is None:
155 c2a03789 Iustin Pop
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
156 c2a03789 Iustin Pop
    else:
157 c2a03789 Iustin Pop
      self._ctimeout, self._rwtimeout = timeouts
158 c2a03789 Iustin Pop
159 c2a03789 Iustin Pop
    self.socket = None
160 c2a03789 Iustin Pop
    self._buffer = ""
161 c2a03789 Iustin Pop
    self._msgs = collections.deque()
162 c2a03789 Iustin Pop
163 c2a03789 Iustin Pop
    try:
164 c2a03789 Iustin Pop
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
165 cb462b06 Michael Hanselmann
166 cb462b06 Michael Hanselmann
      # Try to connect
167 c2a03789 Iustin Pop
      try:
168 cb462b06 Michael Hanselmann
        utils.Retry(self._Connect, 1.0, self._ctimeout,
169 cb462b06 Michael Hanselmann
                    args=(self.socket, address, self._ctimeout))
170 cb462b06 Michael Hanselmann
      except utils.RetryTimeout:
171 cb462b06 Michael Hanselmann
        raise TimeoutError("Connect timed out")
172 cb462b06 Michael Hanselmann
173 c2a03789 Iustin Pop
      self.socket.settimeout(self._rwtimeout)
174 03a8dbdc Iustin Pop
    except (socket.error, NoMasterError):
175 c2a03789 Iustin Pop
      if self.socket is not None:
176 c2a03789 Iustin Pop
        self.socket.close()
177 c2a03789 Iustin Pop
      self.socket = None
178 c2a03789 Iustin Pop
      raise
179 c2a03789 Iustin Pop
180 cb462b06 Michael Hanselmann
  @staticmethod
181 cb462b06 Michael Hanselmann
  def _Connect(sock, address, timeout):
182 cb462b06 Michael Hanselmann
    sock.settimeout(timeout)
183 cb462b06 Michael Hanselmann
    try:
184 cb462b06 Michael Hanselmann
      sock.connect(address)
185 cb462b06 Michael Hanselmann
    except socket.timeout, err:
186 cb462b06 Michael Hanselmann
      raise TimeoutError("Connect timed out: %s" % str(err))
187 cb462b06 Michael Hanselmann
    except socket.error, err:
188 5a1c22fe Iustin Pop
      error_code = err.args[0]
189 5a1c22fe Iustin Pop
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
190 cb462b06 Michael Hanselmann
        raise NoMasterError(address)
191 5a1c22fe Iustin Pop
      elif error_code in (errno.EPERM, errno.EACCES):
192 5a1c22fe Iustin Pop
        raise PermissionError(address)
193 5a1c22fe Iustin Pop
      elif error_code == errno.EAGAIN:
194 cb462b06 Michael Hanselmann
        # Server's socket backlog is full at the moment
195 cb462b06 Michael Hanselmann
        raise utils.RetryAgain()
196 cb462b06 Michael Hanselmann
      raise
197 cb462b06 Michael Hanselmann
198 c2a03789 Iustin Pop
  def _CheckSocket(self):
199 c2a03789 Iustin Pop
    """Make sure we are connected.
200 c2a03789 Iustin Pop

201 c2a03789 Iustin Pop
    """
202 c2a03789 Iustin Pop
    if self.socket is None:
203 c2a03789 Iustin Pop
      raise ProtocolError("Connection is closed")
204 c2a03789 Iustin Pop
205 c2a03789 Iustin Pop
  def Send(self, msg):
206 c2a03789 Iustin Pop
    """Send a message.
207 c2a03789 Iustin Pop

208 c2a03789 Iustin Pop
    This just sends a message and doesn't wait for the response.
209 c2a03789 Iustin Pop

210 c2a03789 Iustin Pop
    """
211 25942a6c Guido Trotter
    if constants.LUXI_EOM in msg:
212 797506fc Michael Hanselmann
      raise ProtocolError("Message terminator found in payload")
213 797506fc Michael Hanselmann
214 c2a03789 Iustin Pop
    self._CheckSocket()
215 c2a03789 Iustin Pop
    try:
216 6096ee13 Michael Hanselmann
      # TODO: sendall is not guaranteed to send everything
217 25942a6c Guido Trotter
      self.socket.sendall(msg + constants.LUXI_EOM)
218 c2a03789 Iustin Pop
    except socket.timeout, err:
219 c2a03789 Iustin Pop
      raise TimeoutError("Sending timeout: %s" % str(err))
220 c2a03789 Iustin Pop
221 c2a03789 Iustin Pop
  def Recv(self):
222 5bbd3f7f Michael Hanselmann
    """Try to receive a message from the socket.
223 c2a03789 Iustin Pop

224 c2a03789 Iustin Pop
    In case we already have messages queued, we just return from the
225 c2a03789 Iustin Pop
    queue. Otherwise, we try to read data with a _rwtimeout network
226 c2a03789 Iustin Pop
    timeout, and making sure we don't go over 2x_rwtimeout as a global
227 c2a03789 Iustin Pop
    limit.
228 c2a03789 Iustin Pop

229 c2a03789 Iustin Pop
    """
230 c2a03789 Iustin Pop
    self._CheckSocket()
231 c2a03789 Iustin Pop
    etime = time.time() + self._rwtimeout
232 c2a03789 Iustin Pop
    while not self._msgs:
233 c2a03789 Iustin Pop
      if time.time() > etime:
234 c2a03789 Iustin Pop
        raise TimeoutError("Extended receive timeout")
235 6096ee13 Michael Hanselmann
      while True:
236 6096ee13 Michael Hanselmann
        try:
237 6096ee13 Michael Hanselmann
          data = self.socket.recv(4096)
238 28e3e216 Michael Hanselmann
        except socket.timeout, err:
239 28e3e216 Michael Hanselmann
          raise TimeoutError("Receive timeout: %s" % str(err))
240 6096ee13 Michael Hanselmann
        except socket.error, err:
241 6096ee13 Michael Hanselmann
          if err.args and err.args[0] == errno.EAGAIN:
242 6096ee13 Michael Hanselmann
            continue
243 6096ee13 Michael Hanselmann
          raise
244 6096ee13 Michael Hanselmann
        break
245 c2a03789 Iustin Pop
      if not data:
246 c2a03789 Iustin Pop
        raise ConnectionClosedError("Connection closed while reading")
247 25942a6c Guido Trotter
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
248 c2a03789 Iustin Pop
      self._buffer = new_msgs.pop()
249 c2a03789 Iustin Pop
      self._msgs.extend(new_msgs)
250 c2a03789 Iustin Pop
    return self._msgs.popleft()
251 c2a03789 Iustin Pop
252 c2a03789 Iustin Pop
  def Call(self, msg):
253 c2a03789 Iustin Pop
    """Send a message and wait for the response.
254 c2a03789 Iustin Pop

255 c2a03789 Iustin Pop
    This is just a wrapper over Send and Recv.
256 c2a03789 Iustin Pop

257 c2a03789 Iustin Pop
    """
258 c2a03789 Iustin Pop
    self.Send(msg)
259 c2a03789 Iustin Pop
    return self.Recv()
260 c2a03789 Iustin Pop
261 c2a03789 Iustin Pop
  def Close(self):
262 c2a03789 Iustin Pop
    """Close the socket"""
263 c2a03789 Iustin Pop
    if self.socket is not None:
264 c2a03789 Iustin Pop
      self.socket.close()
265 c2a03789 Iustin Pop
      self.socket = None
266 c2a03789 Iustin Pop
267 c2a03789 Iustin Pop
268 231db3a5 Michael Hanselmann
def ParseRequest(msg):
269 231db3a5 Michael Hanselmann
  """Parses a LUXI request message.
270 231db3a5 Michael Hanselmann

271 231db3a5 Michael Hanselmann
  """
272 231db3a5 Michael Hanselmann
  try:
273 231db3a5 Michael Hanselmann
    request = serializer.LoadJson(msg)
274 231db3a5 Michael Hanselmann
  except ValueError, err:
275 231db3a5 Michael Hanselmann
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
276 231db3a5 Michael Hanselmann
277 231db3a5 Michael Hanselmann
  logging.debug("LUXI request: %s", request)
278 231db3a5 Michael Hanselmann
279 231db3a5 Michael Hanselmann
  if not isinstance(request, dict):
280 231db3a5 Michael Hanselmann
    logging.error("LUXI request not a dict: %r", msg)
281 231db3a5 Michael Hanselmann
    raise ProtocolError("Invalid LUXI request (not a dict)")
282 231db3a5 Michael Hanselmann
283 b459a848 Andrea Spadaccini
  method = request.get(KEY_METHOD, None) # pylint: disable=E1103
284 b459a848 Andrea Spadaccini
  args = request.get(KEY_ARGS, None) # pylint: disable=E1103
285 b459a848 Andrea Spadaccini
  version = request.get(KEY_VERSION, None) # pylint: disable=E1103
286 e7a25b08 Guido Trotter
287 231db3a5 Michael Hanselmann
  if method is None or args is None:
288 231db3a5 Michael Hanselmann
    logging.error("LUXI request missing method or arguments: %r", msg)
289 231db3a5 Michael Hanselmann
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
290 231db3a5 Michael Hanselmann
                         " in request): %r") % msg)
291 231db3a5 Michael Hanselmann
292 e986f20c Michael Hanselmann
  return (method, args, version)
293 231db3a5 Michael Hanselmann
294 231db3a5 Michael Hanselmann
295 231db3a5 Michael Hanselmann
def ParseResponse(msg):
296 231db3a5 Michael Hanselmann
  """Parses a LUXI response message.
297 231db3a5 Michael Hanselmann

298 231db3a5 Michael Hanselmann
  """
299 231db3a5 Michael Hanselmann
  # Parse the result
300 231db3a5 Michael Hanselmann
  try:
301 231db3a5 Michael Hanselmann
    data = serializer.LoadJson(msg)
302 d143f2c6 Iustin Pop
  except KeyboardInterrupt:
303 d143f2c6 Iustin Pop
    raise
304 231db3a5 Michael Hanselmann
  except Exception, err:
305 231db3a5 Michael Hanselmann
    raise ProtocolError("Error while deserializing response: %s" % str(err))
306 231db3a5 Michael Hanselmann
307 231db3a5 Michael Hanselmann
  # Validate response
308 231db3a5 Michael Hanselmann
  if not (isinstance(data, dict) and
309 231db3a5 Michael Hanselmann
          KEY_SUCCESS in data and
310 231db3a5 Michael Hanselmann
          KEY_RESULT in data):
311 231db3a5 Michael Hanselmann
    raise ProtocolError("Invalid response from server: %r" % data)
312 231db3a5 Michael Hanselmann
313 2317945a Guido Trotter
  return (data[KEY_SUCCESS], data[KEY_RESULT],
314 b459a848 Andrea Spadaccini
          data.get(KEY_VERSION, None)) # pylint: disable=E1103
315 231db3a5 Michael Hanselmann
316 231db3a5 Michael Hanselmann
317 e986f20c Michael Hanselmann
def FormatResponse(success, result, version=None):
318 231db3a5 Michael Hanselmann
  """Formats a LUXI response message.
319 231db3a5 Michael Hanselmann

320 231db3a5 Michael Hanselmann
  """
321 231db3a5 Michael Hanselmann
  response = {
322 231db3a5 Michael Hanselmann
    KEY_SUCCESS: success,
323 231db3a5 Michael Hanselmann
    KEY_RESULT: result,
324 231db3a5 Michael Hanselmann
    }
325 231db3a5 Michael Hanselmann
326 e986f20c Michael Hanselmann
  if version is not None:
327 e986f20c Michael Hanselmann
    response[KEY_VERSION] = version
328 e986f20c Michael Hanselmann
329 231db3a5 Michael Hanselmann
  logging.debug("LUXI response: %s", response)
330 231db3a5 Michael Hanselmann
331 231db3a5 Michael Hanselmann
  return serializer.DumpJson(response)
332 231db3a5 Michael Hanselmann
333 231db3a5 Michael Hanselmann
334 e986f20c Michael Hanselmann
def FormatRequest(method, args, version=None):
335 231db3a5 Michael Hanselmann
  """Formats a LUXI request message.
336 231db3a5 Michael Hanselmann

337 231db3a5 Michael Hanselmann
  """
338 231db3a5 Michael Hanselmann
  # Build request
339 231db3a5 Michael Hanselmann
  request = {
340 231db3a5 Michael Hanselmann
    KEY_METHOD: method,
341 231db3a5 Michael Hanselmann
    KEY_ARGS: args,
342 231db3a5 Michael Hanselmann
    }
343 231db3a5 Michael Hanselmann
344 e986f20c Michael Hanselmann
  if version is not None:
345 e986f20c Michael Hanselmann
    request[KEY_VERSION] = version
346 e986f20c Michael Hanselmann
347 231db3a5 Michael Hanselmann
  # Serialize the request
348 a182a3ed Michael Hanselmann
  return serializer.DumpJson(request)
349 231db3a5 Michael Hanselmann
350 231db3a5 Michael Hanselmann
351 e986f20c Michael Hanselmann
def CallLuxiMethod(transport_cb, method, args, version=None):
352 231db3a5 Michael Hanselmann
  """Send a LUXI request via a transport and return the response.
353 231db3a5 Michael Hanselmann

354 231db3a5 Michael Hanselmann
  """
355 231db3a5 Michael Hanselmann
  assert callable(transport_cb)
356 231db3a5 Michael Hanselmann
357 e986f20c Michael Hanselmann
  request_msg = FormatRequest(method, args, version=version)
358 231db3a5 Michael Hanselmann
359 231db3a5 Michael Hanselmann
  # Send request and wait for response
360 231db3a5 Michael Hanselmann
  response_msg = transport_cb(request_msg)
361 231db3a5 Michael Hanselmann
362 e986f20c Michael Hanselmann
  (success, result, resp_version) = ParseResponse(response_msg)
363 e986f20c Michael Hanselmann
364 e986f20c Michael Hanselmann
  # Verify version if there was one in the response
365 e986f20c Michael Hanselmann
  if resp_version is not None and resp_version != version:
366 e986f20c Michael Hanselmann
    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
367 e986f20c Michael Hanselmann
                           (version, resp_version))
368 231db3a5 Michael Hanselmann
369 231db3a5 Michael Hanselmann
  if success:
370 231db3a5 Michael Hanselmann
    return result
371 231db3a5 Michael Hanselmann
372 231db3a5 Michael Hanselmann
  errors.MaybeRaise(result)
373 231db3a5 Michael Hanselmann
  raise RequestError(result)
374 231db3a5 Michael Hanselmann
375 231db3a5 Michael Hanselmann
376 c2a03789 Iustin Pop
class Client(object):
377 c2a03789 Iustin Pop
  """High-level client implementation.
378 c2a03789 Iustin Pop

379 c2a03789 Iustin Pop
  This uses a backing Transport-like class on top of which it
380 c2a03789 Iustin Pop
  implements data serialization/deserialization.
381 c2a03789 Iustin Pop

382 c2a03789 Iustin Pop
  """
383 ceab32dd Iustin Pop
  def __init__(self, address=None, timeouts=None, transport=Transport):
384 c2a03789 Iustin Pop
    """Constructor for the Client class.
385 c2a03789 Iustin Pop

386 c2a03789 Iustin Pop
    Arguments:
387 c2a03789 Iustin Pop
      - address: a valid address the the used transport class
388 c2a03789 Iustin Pop
      - timeout: a list of timeouts, to be used on connect and read/write
389 c2a03789 Iustin Pop
      - transport: a Transport-like class
390 c2a03789 Iustin Pop

391 c2a03789 Iustin Pop

392 c2a03789 Iustin Pop
    If timeout is not passed, the default timeouts of the transport
393 c2a03789 Iustin Pop
    class are used.
394 c2a03789 Iustin Pop

395 c2a03789 Iustin Pop
    """
396 ceab32dd Iustin Pop
    if address is None:
397 b87ee98f Michael Hanselmann
      address = pathutils.MASTER_SOCKET
398 8d5b316c Iustin Pop
    self.address = address
399 8d5b316c Iustin Pop
    self.timeouts = timeouts
400 8d5b316c Iustin Pop
    self.transport_class = transport
401 8d5b316c Iustin Pop
    self.transport = None
402 8d5b316c Iustin Pop
    self._InitTransport()
403 8d5b316c Iustin Pop
404 8d5b316c Iustin Pop
  def _InitTransport(self):
405 8d5b316c Iustin Pop
    """(Re)initialize the transport if needed.
406 8d5b316c Iustin Pop

407 8d5b316c Iustin Pop
    """
408 8d5b316c Iustin Pop
    if self.transport is None:
409 8d5b316c Iustin Pop
      self.transport = self.transport_class(self.address,
410 8d5b316c Iustin Pop
                                            timeouts=self.timeouts)
411 8d5b316c Iustin Pop
412 8d5b316c Iustin Pop
  def _CloseTransport(self):
413 8d5b316c Iustin Pop
    """Close the transport, ignoring errors.
414 8d5b316c Iustin Pop

415 8d5b316c Iustin Pop
    """
416 8d5b316c Iustin Pop
    if self.transport is None:
417 8d5b316c Iustin Pop
      return
418 8d5b316c Iustin Pop
    try:
419 8d5b316c Iustin Pop
      old_transp = self.transport
420 8d5b316c Iustin Pop
      self.transport = None
421 8d5b316c Iustin Pop
      old_transp.Close()
422 b459a848 Andrea Spadaccini
    except Exception: # pylint: disable=W0703
423 8d5b316c Iustin Pop
      pass
424 c2a03789 Iustin Pop
425 231db3a5 Michael Hanselmann
  def _SendMethodCall(self, data):
426 3d8548c4 Michael Hanselmann
    # Send request and wait for response
427 8d5b316c Iustin Pop
    try:
428 8d5b316c Iustin Pop
      self._InitTransport()
429 231db3a5 Michael Hanselmann
      return self.transport.Call(data)
430 8d5b316c Iustin Pop
    except Exception:
431 8d5b316c Iustin Pop
      self._CloseTransport()
432 8d5b316c Iustin Pop
      raise
433 8d5b316c Iustin Pop
434 2a917701 Michael Hanselmann
  def Close(self):
435 2a917701 Michael Hanselmann
    """Close the underlying connection.
436 2a917701 Michael Hanselmann

437 2a917701 Michael Hanselmann
    """
438 2a917701 Michael Hanselmann
    self._CloseTransport()
439 2a917701 Michael Hanselmann
440 231db3a5 Michael Hanselmann
  def CallMethod(self, method, args):
441 231db3a5 Michael Hanselmann
    """Send a generic request and return the response.
442 3d8548c4 Michael Hanselmann

443 231db3a5 Michael Hanselmann
    """
444 a629ecb9 Iustin Pop
    if not isinstance(args, (list, tuple)):
445 a629ecb9 Iustin Pop
      raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
446 a629ecb9 Iustin Pop
                                   " expected list, got %s" % type(args))
447 e986f20c Michael Hanselmann
    return CallLuxiMethod(self._SendMethodCall, method, args,
448 e986f20c Michael Hanselmann
                          version=constants.LUXI_VERSION)
449 c2a03789 Iustin Pop
450 3ccafd0e Iustin Pop
  def SetQueueDrainFlag(self, drain_flag):
451 83c046a2 Iustin Pop
    return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
452 3ccafd0e Iustin Pop
453 05e50653 Michael Hanselmann
  def SetWatcherPause(self, until):
454 a629ecb9 Iustin Pop
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
455 05e50653 Michael Hanselmann
456 d9d1e541 Klaus Aehlig
  def PickupJob(self, job):
457 d9d1e541 Klaus Aehlig
    return self.CallMethod(REQ_PICKUP_JOB, (job,))
458 d9d1e541 Klaus Aehlig
459 0bbe448c Michael Hanselmann
  def SubmitJob(self, ops):
460 0bbe448c Michael Hanselmann
    ops_state = map(lambda op: op.__getstate__(), ops)
461 734a2a7c René Nussbaumer
    return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
462 0bbe448c Michael Hanselmann
463 346c3037 Klaus Aehlig
  def SubmitJobToDrainedQueue(self, ops):
464 346c3037 Klaus Aehlig
    ops_state = map(lambda op: op.__getstate__(), ops)
465 346c3037 Klaus Aehlig
    return self.CallMethod(REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, (ops_state, ))
466 346c3037 Klaus Aehlig
467 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
468 2971c913 Iustin Pop
    jobs_state = []
469 2971c913 Iustin Pop
    for ops in jobs:
470 2971c913 Iustin Pop
      jobs_state.append([op.__getstate__() for op in ops])
471 734a2a7c René Nussbaumer
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
472 2971c913 Iustin Pop
473 0bbe448c Michael Hanselmann
  def CancelJob(self, job_id):
474 a629ecb9 Iustin Pop
    return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
475 0bbe448c Michael Hanselmann
476 0bbe448c Michael Hanselmann
  def ArchiveJob(self, job_id):
477 a629ecb9 Iustin Pop
    return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
478 0bbe448c Michael Hanselmann
479 f63ffb37 Michael Hanselmann
  def ChangeJobPriority(self, job_id, priority):
480 f63ffb37 Michael Hanselmann
    return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
481 f63ffb37 Michael Hanselmann
482 07cd723a Iustin Pop
  def AutoArchiveJobs(self, age):
483 f8ad5591 Michael Hanselmann
    timeout = (DEF_RWTO - 1) / 2
484 83c046a2 Iustin Pop
    return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
485 07cd723a Iustin Pop
486 f4484122 Michael Hanselmann
  def WaitForJobChangeOnce(self, job_id, fields,
487 793a8f7c Michael Hanselmann
                           prev_job_info, prev_log_serial,
488 793a8f7c Michael Hanselmann
                           timeout=WFJC_TIMEOUT):
489 793a8f7c Michael Hanselmann
    """Waits for changes on a job.
490 793a8f7c Michael Hanselmann

491 793a8f7c Michael Hanselmann
    @param job_id: Job ID
492 793a8f7c Michael Hanselmann
    @type fields: list
493 793a8f7c Michael Hanselmann
    @param fields: List of field names to be observed
494 793a8f7c Michael Hanselmann
    @type prev_job_info: None or list
495 793a8f7c Michael Hanselmann
    @param prev_job_info: Previously received job information
496 793a8f7c Michael Hanselmann
    @type prev_log_serial: None or int/long
497 793a8f7c Michael Hanselmann
    @param prev_log_serial: Highest log serial number previously received
498 793a8f7c Michael Hanselmann
    @type timeout: int/float
499 793a8f7c Michael Hanselmann
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
500 793a8f7c Michael Hanselmann
                    be capped to that value)
501 793a8f7c Michael Hanselmann

502 793a8f7c Michael Hanselmann
    """
503 793a8f7c Michael Hanselmann
    assert timeout >= 0, "Timeout can not be negative"
504 f4484122 Michael Hanselmann
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
505 f4484122 Michael Hanselmann
                           (job_id, fields, prev_job_info,
506 793a8f7c Michael Hanselmann
                            prev_log_serial,
507 793a8f7c Michael Hanselmann
                            min(WFJC_TIMEOUT, timeout)))
508 f4484122 Michael Hanselmann
509 f4484122 Michael Hanselmann
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
510 5c735209 Iustin Pop
    while True:
511 f4484122 Michael Hanselmann
      result = self.WaitForJobChangeOnce(job_id, fields,
512 f4484122 Michael Hanselmann
                                         prev_job_info, prev_log_serial)
513 5c735209 Iustin Pop
      if result != constants.JOB_NOTCHANGED:
514 5c735209 Iustin Pop
        break
515 5c735209 Iustin Pop
    return result
516 dfe57c22 Michael Hanselmann
517 2e5c33db Iustin Pop
  def Query(self, what, fields, qfilter):
518 28b71a76 Michael Hanselmann
    """Query for resources/items.
519 28b71a76 Michael Hanselmann

520 abd66bf8 Michael Hanselmann
    @param what: One of L{constants.QR_VIA_LUXI}
521 28b71a76 Michael Hanselmann
    @type fields: List of strings
522 28b71a76 Michael Hanselmann
    @param fields: List of requested fields
523 2e5c33db Iustin Pop
    @type qfilter: None or list
524 2e5c33db Iustin Pop
    @param qfilter: Query filter
525 28b71a76 Michael Hanselmann
    @rtype: L{objects.QueryResponse}
526 28b71a76 Michael Hanselmann

527 28b71a76 Michael Hanselmann
    """
528 a629ecb9 Iustin Pop
    result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
529 28b71a76 Michael Hanselmann
    return objects.QueryResponse.FromDict(result)
530 28b71a76 Michael Hanselmann
531 28b71a76 Michael Hanselmann
  def QueryFields(self, what, fields):
532 28b71a76 Michael Hanselmann
    """Query for available fields.
533 28b71a76 Michael Hanselmann

534 abd66bf8 Michael Hanselmann
    @param what: One of L{constants.QR_VIA_LUXI}
535 28b71a76 Michael Hanselmann
    @type fields: None or list of strings
536 28b71a76 Michael Hanselmann
    @param fields: List of requested fields
537 28b71a76 Michael Hanselmann
    @rtype: L{objects.QueryFieldsResponse}
538 28b71a76 Michael Hanselmann

539 28b71a76 Michael Hanselmann
    """
540 a629ecb9 Iustin Pop
    result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
541 28b71a76 Michael Hanselmann
    return objects.QueryFieldsResponse.FromDict(result)
542 28b71a76 Michael Hanselmann
543 0bbe448c Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
544 0bbe448c Michael Hanselmann
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
545 3d8548c4 Michael Hanselmann
546 ec79568d Iustin Pop
  def QueryInstances(self, names, fields, use_locking):
547 ec79568d Iustin Pop
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
548 ee6c7b94 Michael Hanselmann
549 ec79568d Iustin Pop
  def QueryNodes(self, names, fields, use_locking):
550 ec79568d Iustin Pop
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
551 02f7fe54 Michael Hanselmann
552 a79ef2a5 Adeodato Simo
  def QueryGroups(self, names, fields, use_locking):
553 a79ef2a5 Adeodato Simo
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
554 a79ef2a5 Adeodato Simo
555 306bed0e Apollon Oikonomopoulos
  def QueryNetworks(self, names, fields, use_locking):
556 306bed0e Apollon Oikonomopoulos
    return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
557 306bed0e Apollon Oikonomopoulos
558 ec79568d Iustin Pop
  def QueryExports(self, nodes, use_locking):
559 ec79568d Iustin Pop
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
560 32f93223 Michael Hanselmann
561 66baeccc Iustin Pop
  def QueryClusterInfo(self):
562 66baeccc Iustin Pop
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
563 66baeccc Iustin Pop
564 ae5849b5 Michael Hanselmann
  def QueryConfigValues(self, fields):
565 a629ecb9 Iustin Pop
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
566 ae5849b5 Michael Hanselmann
567 7699c3af Iustin Pop
  def QueryTags(self, kind, name):
568 7699c3af Iustin Pop
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))