Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ d971402f

History | View | Annotate | Download (16 kB)

1 c2a03789 Iustin Pop
#
2 c2a03789 Iustin Pop
#
3 c2a03789 Iustin Pop
4 83c046a2 Iustin Pop
# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5 c2a03789 Iustin Pop
#
6 c2a03789 Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 c2a03789 Iustin Pop
# it under the terms of the GNU General Public License as published by
8 c2a03789 Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 c2a03789 Iustin Pop
# (at your option) any later version.
10 c2a03789 Iustin Pop
#
11 c2a03789 Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 c2a03789 Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 c2a03789 Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 c2a03789 Iustin Pop
# General Public License for more details.
15 c2a03789 Iustin Pop
#
16 c2a03789 Iustin Pop
# You should have received a copy of the GNU General Public License
17 c2a03789 Iustin Pop
# along with this program; if not, write to the Free Software
18 c2a03789 Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 c2a03789 Iustin Pop
# 02110-1301, USA.
20 c2a03789 Iustin Pop
21 c2a03789 Iustin Pop
22 c2a03789 Iustin Pop
"""Module for the unix socket protocol
23 c2a03789 Iustin Pop

24 8d5b316c Iustin Pop
This module implements the local unix socket protocol. You only need
25 c2a03789 Iustin Pop
this module and the opcodes module in the client program in order to
26 c2a03789 Iustin Pop
communicate with the master.
27 c2a03789 Iustin Pop

28 7577196d Guido Trotter
The module is also used by the master daemon.
29 c2a03789 Iustin Pop

30 c2a03789 Iustin Pop
"""
31 c2a03789 Iustin Pop
32 c2a03789 Iustin Pop
import socket
33 c2a03789 Iustin Pop
import collections
34 c2a03789 Iustin Pop
import time
35 03a8dbdc Iustin Pop
import errno
36 231db3a5 Michael Hanselmann
import logging
37 c2a03789 Iustin Pop
38 fad50141 Michael Hanselmann
from ganeti import serializer
39 ceab32dd Iustin Pop
from ganeti import constants
40 6797ec29 Iustin Pop
from ganeti import errors
41 cb462b06 Michael Hanselmann
from ganeti import utils
42 28b71a76 Michael Hanselmann
from ganeti import objects
43 b87ee98f Michael Hanselmann
from ganeti import pathutils
44 c2a03789 Iustin Pop
45 c2a03789 Iustin Pop
46 231db3a5 Michael Hanselmann
KEY_METHOD = "method"
47 231db3a5 Michael Hanselmann
KEY_ARGS = "args"
48 3d8548c4 Michael Hanselmann
KEY_SUCCESS = "success"
49 3d8548c4 Michael Hanselmann
KEY_RESULT = "result"
50 e986f20c Michael Hanselmann
KEY_VERSION = "version"
51 3d8548c4 Michael Hanselmann
52 0bbe448c Michael Hanselmann
REQ_SUBMIT_JOB = "SubmitJob"
53 2971c913 Iustin Pop
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
54 dfe57c22 Michael Hanselmann
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
55 0bbe448c Michael Hanselmann
REQ_CANCEL_JOB = "CancelJob"
56 0bbe448c Michael Hanselmann
REQ_ARCHIVE_JOB = "ArchiveJob"
57 83c046a2 Iustin Pop
REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
58 28b71a76 Michael Hanselmann
REQ_QUERY = "Query"
59 28b71a76 Michael Hanselmann
REQ_QUERY_FIELDS = "QueryFields"
60 0bbe448c Michael Hanselmann
REQ_QUERY_JOBS = "QueryJobs"
61 ee6c7b94 Michael Hanselmann
REQ_QUERY_INSTANCES = "QueryInstances"
62 02f7fe54 Michael Hanselmann
REQ_QUERY_NODES = "QueryNodes"
63 a79ef2a5 Adeodato Simo
REQ_QUERY_GROUPS = "QueryGroups"
64 32f93223 Michael Hanselmann
REQ_QUERY_EXPORTS = "QueryExports"
65 ae5849b5 Michael Hanselmann
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
66 66baeccc Iustin Pop
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
67 7699c3af Iustin Pop
REQ_QUERY_TAGS = "QueryTags"
68 83c046a2 Iustin Pop
REQ_SET_DRAIN_FLAG = "SetDrainFlag"
69 05e50653 Michael Hanselmann
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
70 c2a03789 Iustin Pop
71 e3a25810 Michael Hanselmann
#: List of all LUXI requests
72 e3a25810 Michael Hanselmann
REQ_ALL = frozenset([
73 e3a25810 Michael Hanselmann
  REQ_ARCHIVE_JOB,
74 83c046a2 Iustin Pop
  REQ_AUTO_ARCHIVE_JOBS,
75 e3a25810 Michael Hanselmann
  REQ_CANCEL_JOB,
76 e3a25810 Michael Hanselmann
  REQ_QUERY,
77 e3a25810 Michael Hanselmann
  REQ_QUERY_CLUSTER_INFO,
78 e3a25810 Michael Hanselmann
  REQ_QUERY_CONFIG_VALUES,
79 e3a25810 Michael Hanselmann
  REQ_QUERY_EXPORTS,
80 e3a25810 Michael Hanselmann
  REQ_QUERY_FIELDS,
81 e3a25810 Michael Hanselmann
  REQ_QUERY_GROUPS,
82 e3a25810 Michael Hanselmann
  REQ_QUERY_INSTANCES,
83 e3a25810 Michael Hanselmann
  REQ_QUERY_JOBS,
84 e3a25810 Michael Hanselmann
  REQ_QUERY_NODES,
85 e3a25810 Michael Hanselmann
  REQ_QUERY_TAGS,
86 83c046a2 Iustin Pop
  REQ_SET_DRAIN_FLAG,
87 e3a25810 Michael Hanselmann
  REQ_SET_WATCHER_PAUSE,
88 e3a25810 Michael Hanselmann
  REQ_SUBMIT_JOB,
89 e3a25810 Michael Hanselmann
  REQ_SUBMIT_MANY_JOBS,
90 e3a25810 Michael Hanselmann
  REQ_WAIT_FOR_JOB_CHANGE,
91 e3a25810 Michael Hanselmann
  ])
92 e3a25810 Michael Hanselmann
93 c2a03789 Iustin Pop
DEF_CTMO = 10
94 c2a03789 Iustin Pop
DEF_RWTO = 60
95 c2a03789 Iustin Pop
96 793a8f7c Michael Hanselmann
# WaitForJobChange timeout
97 793a8f7c Michael Hanselmann
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
98 793a8f7c Michael Hanselmann
99 c2a03789 Iustin Pop
100 7a8bda3f Michael Hanselmann
class ProtocolError(errors.LuxiError):
101 5a1c22fe Iustin Pop
  """Denotes an error in the LUXI protocol."""
102 c2a03789 Iustin Pop
103 c2a03789 Iustin Pop
104 c2a03789 Iustin Pop
class ConnectionClosedError(ProtocolError):
105 5a1c22fe Iustin Pop
  """Connection closed error."""
106 c2a03789 Iustin Pop
107 c2a03789 Iustin Pop
108 c2a03789 Iustin Pop
class TimeoutError(ProtocolError):
109 5a1c22fe Iustin Pop
  """Operation timeout error."""
110 c2a03789 Iustin Pop
111 c2a03789 Iustin Pop
112 b77acb3e Iustin Pop
class RequestError(ProtocolError):
113 5a1c22fe Iustin Pop
  """Error on request.
114 b77acb3e Iustin Pop

115 b77acb3e Iustin Pop
  This signifies an error in the request format or request handling,
116 b77acb3e Iustin Pop
  but not (e.g.) an error in starting up an instance.
117 b77acb3e Iustin Pop

118 b77acb3e Iustin Pop
  Some common conditions that can trigger this exception:
119 b77acb3e Iustin Pop
    - job submission failed because the job data was wrong
120 b77acb3e Iustin Pop
    - query failed because required fields were missing
121 b77acb3e Iustin Pop

122 b77acb3e Iustin Pop
  """
123 b77acb3e Iustin Pop
124 3d8548c4 Michael Hanselmann
125 03a8dbdc Iustin Pop
class NoMasterError(ProtocolError):
126 5a1c22fe Iustin Pop
  """The master cannot be reached.
127 03a8dbdc Iustin Pop

128 03a8dbdc Iustin Pop
  This means that the master daemon is not running or the socket has
129 03a8dbdc Iustin Pop
  been removed.
130 03a8dbdc Iustin Pop

131 03a8dbdc Iustin Pop
  """
132 03a8dbdc Iustin Pop
133 b77acb3e Iustin Pop
134 5a1c22fe Iustin Pop
class PermissionError(ProtocolError):
135 5a1c22fe Iustin Pop
  """Permission denied while connecting to the master socket.
136 5a1c22fe Iustin Pop

137 5a1c22fe Iustin Pop
  This means the user doesn't have the proper rights.
138 5a1c22fe Iustin Pop

139 5a1c22fe Iustin Pop
  """
140 5a1c22fe Iustin Pop
141 5a1c22fe Iustin Pop
142 c2a03789 Iustin Pop
class Transport:
143 c2a03789 Iustin Pop
  """Low-level transport class.
144 c2a03789 Iustin Pop

145 c2a03789 Iustin Pop
  This is used on the client side.
146 c2a03789 Iustin Pop

147 c2a03789 Iustin Pop
  This could be replace by any other class that provides the same
148 c2a03789 Iustin Pop
  semantics to the Client. This means:
149 c2a03789 Iustin Pop
    - can send messages and receive messages
150 c2a03789 Iustin Pop
    - safe for multithreading
151 c2a03789 Iustin Pop

152 c2a03789 Iustin Pop
  """
153 c2a03789 Iustin Pop
154 25942a6c Guido Trotter
  def __init__(self, address, timeouts=None):
155 c2a03789 Iustin Pop
    """Constructor for the Client class.
156 c2a03789 Iustin Pop

157 c2a03789 Iustin Pop
    Arguments:
158 c2a03789 Iustin Pop
      - address: a valid address the the used transport class
159 c2a03789 Iustin Pop
      - timeout: a list of timeouts, to be used on connect and read/write
160 c2a03789 Iustin Pop

161 c2a03789 Iustin Pop
    There are two timeouts used since we might want to wait for a long
162 c2a03789 Iustin Pop
    time for a response, but the connect timeout should be lower.
163 c2a03789 Iustin Pop

164 c2a03789 Iustin Pop
    If not passed, we use a default of 10 and respectively 60 seconds.
165 c2a03789 Iustin Pop

166 c2a03789 Iustin Pop
    Note that on reading data, since the timeout applies to an
167 c2a03789 Iustin Pop
    invidual receive, it might be that the total duration is longer
168 c2a03789 Iustin Pop
    than timeout value passed (we make a hard limit at twice the read
169 c2a03789 Iustin Pop
    timeout).
170 c2a03789 Iustin Pop

171 c2a03789 Iustin Pop
    """
172 c2a03789 Iustin Pop
    self.address = address
173 c2a03789 Iustin Pop
    if timeouts is None:
174 c2a03789 Iustin Pop
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
175 c2a03789 Iustin Pop
    else:
176 c2a03789 Iustin Pop
      self._ctimeout, self._rwtimeout = timeouts
177 c2a03789 Iustin Pop
178 c2a03789 Iustin Pop
    self.socket = None
179 c2a03789 Iustin Pop
    self._buffer = ""
180 c2a03789 Iustin Pop
    self._msgs = collections.deque()
181 c2a03789 Iustin Pop
182 c2a03789 Iustin Pop
    try:
183 c2a03789 Iustin Pop
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
184 cb462b06 Michael Hanselmann
185 cb462b06 Michael Hanselmann
      # Try to connect
186 c2a03789 Iustin Pop
      try:
187 cb462b06 Michael Hanselmann
        utils.Retry(self._Connect, 1.0, self._ctimeout,
188 cb462b06 Michael Hanselmann
                    args=(self.socket, address, self._ctimeout))
189 cb462b06 Michael Hanselmann
      except utils.RetryTimeout:
190 cb462b06 Michael Hanselmann
        raise TimeoutError("Connect timed out")
191 cb462b06 Michael Hanselmann
192 c2a03789 Iustin Pop
      self.socket.settimeout(self._rwtimeout)
193 03a8dbdc Iustin Pop
    except (socket.error, NoMasterError):
194 c2a03789 Iustin Pop
      if self.socket is not None:
195 c2a03789 Iustin Pop
        self.socket.close()
196 c2a03789 Iustin Pop
      self.socket = None
197 c2a03789 Iustin Pop
      raise
198 c2a03789 Iustin Pop
199 cb462b06 Michael Hanselmann
  @staticmethod
200 cb462b06 Michael Hanselmann
  def _Connect(sock, address, timeout):
201 cb462b06 Michael Hanselmann
    sock.settimeout(timeout)
202 cb462b06 Michael Hanselmann
    try:
203 cb462b06 Michael Hanselmann
      sock.connect(address)
204 cb462b06 Michael Hanselmann
    except socket.timeout, err:
205 cb462b06 Michael Hanselmann
      raise TimeoutError("Connect timed out: %s" % str(err))
206 cb462b06 Michael Hanselmann
    except socket.error, err:
207 5a1c22fe Iustin Pop
      error_code = err.args[0]
208 5a1c22fe Iustin Pop
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
209 cb462b06 Michael Hanselmann
        raise NoMasterError(address)
210 5a1c22fe Iustin Pop
      elif error_code in (errno.EPERM, errno.EACCES):
211 5a1c22fe Iustin Pop
        raise PermissionError(address)
212 5a1c22fe Iustin Pop
      elif error_code == errno.EAGAIN:
213 cb462b06 Michael Hanselmann
        # Server's socket backlog is full at the moment
214 cb462b06 Michael Hanselmann
        raise utils.RetryAgain()
215 cb462b06 Michael Hanselmann
      raise
216 cb462b06 Michael Hanselmann
217 c2a03789 Iustin Pop
  def _CheckSocket(self):
218 c2a03789 Iustin Pop
    """Make sure we are connected.
219 c2a03789 Iustin Pop

220 c2a03789 Iustin Pop
    """
221 c2a03789 Iustin Pop
    if self.socket is None:
222 c2a03789 Iustin Pop
      raise ProtocolError("Connection is closed")
223 c2a03789 Iustin Pop
224 c2a03789 Iustin Pop
  def Send(self, msg):
225 c2a03789 Iustin Pop
    """Send a message.
226 c2a03789 Iustin Pop

227 c2a03789 Iustin Pop
    This just sends a message and doesn't wait for the response.
228 c2a03789 Iustin Pop

229 c2a03789 Iustin Pop
    """
230 25942a6c Guido Trotter
    if constants.LUXI_EOM in msg:
231 797506fc Michael Hanselmann
      raise ProtocolError("Message terminator found in payload")
232 797506fc Michael Hanselmann
233 c2a03789 Iustin Pop
    self._CheckSocket()
234 c2a03789 Iustin Pop
    try:
235 6096ee13 Michael Hanselmann
      # TODO: sendall is not guaranteed to send everything
236 25942a6c Guido Trotter
      self.socket.sendall(msg + constants.LUXI_EOM)
237 c2a03789 Iustin Pop
    except socket.timeout, err:
238 c2a03789 Iustin Pop
      raise TimeoutError("Sending timeout: %s" % str(err))
239 c2a03789 Iustin Pop
240 c2a03789 Iustin Pop
  def Recv(self):
241 5bbd3f7f Michael Hanselmann
    """Try to receive a message from the socket.
242 c2a03789 Iustin Pop

243 c2a03789 Iustin Pop
    In case we already have messages queued, we just return from the
244 c2a03789 Iustin Pop
    queue. Otherwise, we try to read data with a _rwtimeout network
245 c2a03789 Iustin Pop
    timeout, and making sure we don't go over 2x_rwtimeout as a global
246 c2a03789 Iustin Pop
    limit.
247 c2a03789 Iustin Pop

248 c2a03789 Iustin Pop
    """
249 c2a03789 Iustin Pop
    self._CheckSocket()
250 c2a03789 Iustin Pop
    etime = time.time() + self._rwtimeout
251 c2a03789 Iustin Pop
    while not self._msgs:
252 c2a03789 Iustin Pop
      if time.time() > etime:
253 c2a03789 Iustin Pop
        raise TimeoutError("Extended receive timeout")
254 6096ee13 Michael Hanselmann
      while True:
255 6096ee13 Michael Hanselmann
        try:
256 6096ee13 Michael Hanselmann
          data = self.socket.recv(4096)
257 28e3e216 Michael Hanselmann
        except socket.timeout, err:
258 28e3e216 Michael Hanselmann
          raise TimeoutError("Receive timeout: %s" % str(err))
259 6096ee13 Michael Hanselmann
        except socket.error, err:
260 6096ee13 Michael Hanselmann
          if err.args and err.args[0] == errno.EAGAIN:
261 6096ee13 Michael Hanselmann
            continue
262 6096ee13 Michael Hanselmann
          raise
263 6096ee13 Michael Hanselmann
        break
264 c2a03789 Iustin Pop
      if not data:
265 c2a03789 Iustin Pop
        raise ConnectionClosedError("Connection closed while reading")
266 25942a6c Guido Trotter
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
267 c2a03789 Iustin Pop
      self._buffer = new_msgs.pop()
268 c2a03789 Iustin Pop
      self._msgs.extend(new_msgs)
269 c2a03789 Iustin Pop
    return self._msgs.popleft()
270 c2a03789 Iustin Pop
271 c2a03789 Iustin Pop
  def Call(self, msg):
272 c2a03789 Iustin Pop
    """Send a message and wait for the response.
273 c2a03789 Iustin Pop

274 c2a03789 Iustin Pop
    This is just a wrapper over Send and Recv.
275 c2a03789 Iustin Pop

276 c2a03789 Iustin Pop
    """
277 c2a03789 Iustin Pop
    self.Send(msg)
278 c2a03789 Iustin Pop
    return self.Recv()
279 c2a03789 Iustin Pop
280 c2a03789 Iustin Pop
  def Close(self):
281 c2a03789 Iustin Pop
    """Close the socket"""
282 c2a03789 Iustin Pop
    if self.socket is not None:
283 c2a03789 Iustin Pop
      self.socket.close()
284 c2a03789 Iustin Pop
      self.socket = None
285 c2a03789 Iustin Pop
286 c2a03789 Iustin Pop
287 231db3a5 Michael Hanselmann
def ParseRequest(msg):
288 231db3a5 Michael Hanselmann
  """Parses a LUXI request message.
289 231db3a5 Michael Hanselmann

290 231db3a5 Michael Hanselmann
  """
291 231db3a5 Michael Hanselmann
  try:
292 231db3a5 Michael Hanselmann
    request = serializer.LoadJson(msg)
293 231db3a5 Michael Hanselmann
  except ValueError, err:
294 231db3a5 Michael Hanselmann
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
295 231db3a5 Michael Hanselmann
296 231db3a5 Michael Hanselmann
  logging.debug("LUXI request: %s", request)
297 231db3a5 Michael Hanselmann
298 231db3a5 Michael Hanselmann
  if not isinstance(request, dict):
299 231db3a5 Michael Hanselmann
    logging.error("LUXI request not a dict: %r", msg)
300 231db3a5 Michael Hanselmann
    raise ProtocolError("Invalid LUXI request (not a dict)")
301 231db3a5 Michael Hanselmann
302 b459a848 Andrea Spadaccini
  method = request.get(KEY_METHOD, None) # pylint: disable=E1103
303 b459a848 Andrea Spadaccini
  args = request.get(KEY_ARGS, None) # pylint: disable=E1103
304 b459a848 Andrea Spadaccini
  version = request.get(KEY_VERSION, None) # pylint: disable=E1103
305 e7a25b08 Guido Trotter
306 231db3a5 Michael Hanselmann
  if method is None or args is None:
307 231db3a5 Michael Hanselmann
    logging.error("LUXI request missing method or arguments: %r", msg)
308 231db3a5 Michael Hanselmann
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
309 231db3a5 Michael Hanselmann
                         " in request): %r") % msg)
310 231db3a5 Michael Hanselmann
311 e986f20c Michael Hanselmann
  return (method, args, version)
312 231db3a5 Michael Hanselmann
313 231db3a5 Michael Hanselmann
314 231db3a5 Michael Hanselmann
def ParseResponse(msg):
315 231db3a5 Michael Hanselmann
  """Parses a LUXI response message.
316 231db3a5 Michael Hanselmann

317 231db3a5 Michael Hanselmann
  """
318 231db3a5 Michael Hanselmann
  # Parse the result
319 231db3a5 Michael Hanselmann
  try:
320 231db3a5 Michael Hanselmann
    data = serializer.LoadJson(msg)
321 d143f2c6 Iustin Pop
  except KeyboardInterrupt:
322 d143f2c6 Iustin Pop
    raise
323 231db3a5 Michael Hanselmann
  except Exception, err:
324 231db3a5 Michael Hanselmann
    raise ProtocolError("Error while deserializing response: %s" % str(err))
325 231db3a5 Michael Hanselmann
326 231db3a5 Michael Hanselmann
  # Validate response
327 231db3a5 Michael Hanselmann
  if not (isinstance(data, dict) and
328 231db3a5 Michael Hanselmann
          KEY_SUCCESS in data and
329 231db3a5 Michael Hanselmann
          KEY_RESULT in data):
330 231db3a5 Michael Hanselmann
    raise ProtocolError("Invalid response from server: %r" % data)
331 231db3a5 Michael Hanselmann
332 2317945a Guido Trotter
  return (data[KEY_SUCCESS], data[KEY_RESULT],
333 b459a848 Andrea Spadaccini
          data.get(KEY_VERSION, None)) # pylint: disable=E1103
334 231db3a5 Michael Hanselmann
335 231db3a5 Michael Hanselmann
336 e986f20c Michael Hanselmann
def FormatResponse(success, result, version=None):
337 231db3a5 Michael Hanselmann
  """Formats a LUXI response message.
338 231db3a5 Michael Hanselmann

339 231db3a5 Michael Hanselmann
  """
340 231db3a5 Michael Hanselmann
  response = {
341 231db3a5 Michael Hanselmann
    KEY_SUCCESS: success,
342 231db3a5 Michael Hanselmann
    KEY_RESULT: result,
343 231db3a5 Michael Hanselmann
    }
344 231db3a5 Michael Hanselmann
345 e986f20c Michael Hanselmann
  if version is not None:
346 e986f20c Michael Hanselmann
    response[KEY_VERSION] = version
347 e986f20c Michael Hanselmann
348 231db3a5 Michael Hanselmann
  logging.debug("LUXI response: %s", response)
349 231db3a5 Michael Hanselmann
350 231db3a5 Michael Hanselmann
  return serializer.DumpJson(response)
351 231db3a5 Michael Hanselmann
352 231db3a5 Michael Hanselmann
353 e986f20c Michael Hanselmann
def FormatRequest(method, args, version=None):
354 231db3a5 Michael Hanselmann
  """Formats a LUXI request message.
355 231db3a5 Michael Hanselmann

356 231db3a5 Michael Hanselmann
  """
357 231db3a5 Michael Hanselmann
  # Build request
358 231db3a5 Michael Hanselmann
  request = {
359 231db3a5 Michael Hanselmann
    KEY_METHOD: method,
360 231db3a5 Michael Hanselmann
    KEY_ARGS: args,
361 231db3a5 Michael Hanselmann
    }
362 231db3a5 Michael Hanselmann
363 e986f20c Michael Hanselmann
  if version is not None:
364 e986f20c Michael Hanselmann
    request[KEY_VERSION] = version
365 e986f20c Michael Hanselmann
366 231db3a5 Michael Hanselmann
  # Serialize the request
367 a182a3ed Michael Hanselmann
  return serializer.DumpJson(request)
368 231db3a5 Michael Hanselmann
369 231db3a5 Michael Hanselmann
370 e986f20c Michael Hanselmann
def CallLuxiMethod(transport_cb, method, args, version=None):
371 231db3a5 Michael Hanselmann
  """Send a LUXI request via a transport and return the response.
372 231db3a5 Michael Hanselmann

373 231db3a5 Michael Hanselmann
  """
374 231db3a5 Michael Hanselmann
  assert callable(transport_cb)
375 231db3a5 Michael Hanselmann
376 e986f20c Michael Hanselmann
  request_msg = FormatRequest(method, args, version=version)
377 231db3a5 Michael Hanselmann
378 231db3a5 Michael Hanselmann
  # Send request and wait for response
379 231db3a5 Michael Hanselmann
  response_msg = transport_cb(request_msg)
380 231db3a5 Michael Hanselmann
381 e986f20c Michael Hanselmann
  (success, result, resp_version) = ParseResponse(response_msg)
382 e986f20c Michael Hanselmann
383 e986f20c Michael Hanselmann
  # Verify version if there was one in the response
384 e986f20c Michael Hanselmann
  if resp_version is not None and resp_version != version:
385 e986f20c Michael Hanselmann
    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
386 e986f20c Michael Hanselmann
                           (version, resp_version))
387 231db3a5 Michael Hanselmann
388 231db3a5 Michael Hanselmann
  if success:
389 231db3a5 Michael Hanselmann
    return result
390 231db3a5 Michael Hanselmann
391 231db3a5 Michael Hanselmann
  errors.MaybeRaise(result)
392 231db3a5 Michael Hanselmann
  raise RequestError(result)
393 231db3a5 Michael Hanselmann
394 231db3a5 Michael Hanselmann
395 c2a03789 Iustin Pop
class Client(object):
396 c2a03789 Iustin Pop
  """High-level client implementation.
397 c2a03789 Iustin Pop

398 c2a03789 Iustin Pop
  This uses a backing Transport-like class on top of which it
399 c2a03789 Iustin Pop
  implements data serialization/deserialization.
400 c2a03789 Iustin Pop

401 c2a03789 Iustin Pop
  """
402 ceab32dd Iustin Pop
  def __init__(self, address=None, timeouts=None, transport=Transport):
403 c2a03789 Iustin Pop
    """Constructor for the Client class.
404 c2a03789 Iustin Pop

405 c2a03789 Iustin Pop
    Arguments:
406 c2a03789 Iustin Pop
      - address: a valid address the the used transport class
407 c2a03789 Iustin Pop
      - timeout: a list of timeouts, to be used on connect and read/write
408 c2a03789 Iustin Pop
      - transport: a Transport-like class
409 c2a03789 Iustin Pop

410 c2a03789 Iustin Pop

411 c2a03789 Iustin Pop
    If timeout is not passed, the default timeouts of the transport
412 c2a03789 Iustin Pop
    class are used.
413 c2a03789 Iustin Pop

414 c2a03789 Iustin Pop
    """
415 ceab32dd Iustin Pop
    if address is None:
416 b87ee98f Michael Hanselmann
      address = pathutils.MASTER_SOCKET
417 8d5b316c Iustin Pop
    self.address = address
418 8d5b316c Iustin Pop
    self.timeouts = timeouts
419 8d5b316c Iustin Pop
    self.transport_class = transport
420 8d5b316c Iustin Pop
    self.transport = None
421 8d5b316c Iustin Pop
    self._InitTransport()
422 8d5b316c Iustin Pop
423 8d5b316c Iustin Pop
  def _InitTransport(self):
424 8d5b316c Iustin Pop
    """(Re)initialize the transport if needed.
425 8d5b316c Iustin Pop

426 8d5b316c Iustin Pop
    """
427 8d5b316c Iustin Pop
    if self.transport is None:
428 8d5b316c Iustin Pop
      self.transport = self.transport_class(self.address,
429 8d5b316c Iustin Pop
                                            timeouts=self.timeouts)
430 8d5b316c Iustin Pop
431 8d5b316c Iustin Pop
  def _CloseTransport(self):
432 8d5b316c Iustin Pop
    """Close the transport, ignoring errors.
433 8d5b316c Iustin Pop

434 8d5b316c Iustin Pop
    """
435 8d5b316c Iustin Pop
    if self.transport is None:
436 8d5b316c Iustin Pop
      return
437 8d5b316c Iustin Pop
    try:
438 8d5b316c Iustin Pop
      old_transp = self.transport
439 8d5b316c Iustin Pop
      self.transport = None
440 8d5b316c Iustin Pop
      old_transp.Close()
441 b459a848 Andrea Spadaccini
    except Exception: # pylint: disable=W0703
442 8d5b316c Iustin Pop
      pass
443 c2a03789 Iustin Pop
444 231db3a5 Michael Hanselmann
  def _SendMethodCall(self, data):
445 3d8548c4 Michael Hanselmann
    # Send request and wait for response
446 8d5b316c Iustin Pop
    try:
447 8d5b316c Iustin Pop
      self._InitTransport()
448 231db3a5 Michael Hanselmann
      return self.transport.Call(data)
449 8d5b316c Iustin Pop
    except Exception:
450 8d5b316c Iustin Pop
      self._CloseTransport()
451 8d5b316c Iustin Pop
      raise
452 8d5b316c Iustin Pop
453 2a917701 Michael Hanselmann
  def Close(self):
454 2a917701 Michael Hanselmann
    """Close the underlying connection.
455 2a917701 Michael Hanselmann

456 2a917701 Michael Hanselmann
    """
457 2a917701 Michael Hanselmann
    self._CloseTransport()
458 2a917701 Michael Hanselmann
459 231db3a5 Michael Hanselmann
  def CallMethod(self, method, args):
460 231db3a5 Michael Hanselmann
    """Send a generic request and return the response.
461 3d8548c4 Michael Hanselmann

462 231db3a5 Michael Hanselmann
    """
463 a629ecb9 Iustin Pop
    if not isinstance(args, (list, tuple)):
464 a629ecb9 Iustin Pop
      raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
465 a629ecb9 Iustin Pop
                                   " expected list, got %s" % type(args))
466 e986f20c Michael Hanselmann
    return CallLuxiMethod(self._SendMethodCall, method, args,
467 e986f20c Michael Hanselmann
                          version=constants.LUXI_VERSION)
468 c2a03789 Iustin Pop
469 3ccafd0e Iustin Pop
  def SetQueueDrainFlag(self, drain_flag):
470 83c046a2 Iustin Pop
    return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
471 3ccafd0e Iustin Pop
472 05e50653 Michael Hanselmann
  def SetWatcherPause(self, until):
473 a629ecb9 Iustin Pop
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
474 05e50653 Michael Hanselmann
475 0bbe448c Michael Hanselmann
  def SubmitJob(self, ops):
476 0bbe448c Michael Hanselmann
    ops_state = map(lambda op: op.__getstate__(), ops)
477 734a2a7c René Nussbaumer
    return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
478 0bbe448c Michael Hanselmann
479 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
480 2971c913 Iustin Pop
    jobs_state = []
481 2971c913 Iustin Pop
    for ops in jobs:
482 2971c913 Iustin Pop
      jobs_state.append([op.__getstate__() for op in ops])
483 734a2a7c René Nussbaumer
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
484 2971c913 Iustin Pop
485 0bbe448c Michael Hanselmann
  def CancelJob(self, job_id):
486 a629ecb9 Iustin Pop
    return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
487 0bbe448c Michael Hanselmann
488 0bbe448c Michael Hanselmann
  def ArchiveJob(self, job_id):
489 a629ecb9 Iustin Pop
    return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
490 0bbe448c Michael Hanselmann
491 07cd723a Iustin Pop
  def AutoArchiveJobs(self, age):
492 f8ad5591 Michael Hanselmann
    timeout = (DEF_RWTO - 1) / 2
493 83c046a2 Iustin Pop
    return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
494 07cd723a Iustin Pop
495 f4484122 Michael Hanselmann
  def WaitForJobChangeOnce(self, job_id, fields,
496 793a8f7c Michael Hanselmann
                           prev_job_info, prev_log_serial,
497 793a8f7c Michael Hanselmann
                           timeout=WFJC_TIMEOUT):
498 793a8f7c Michael Hanselmann
    """Waits for changes on a job.
499 793a8f7c Michael Hanselmann

500 793a8f7c Michael Hanselmann
    @param job_id: Job ID
501 793a8f7c Michael Hanselmann
    @type fields: list
502 793a8f7c Michael Hanselmann
    @param fields: List of field names to be observed
503 793a8f7c Michael Hanselmann
    @type prev_job_info: None or list
504 793a8f7c Michael Hanselmann
    @param prev_job_info: Previously received job information
505 793a8f7c Michael Hanselmann
    @type prev_log_serial: None or int/long
506 793a8f7c Michael Hanselmann
    @param prev_log_serial: Highest log serial number previously received
507 793a8f7c Michael Hanselmann
    @type timeout: int/float
508 793a8f7c Michael Hanselmann
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
509 793a8f7c Michael Hanselmann
                    be capped to that value)
510 793a8f7c Michael Hanselmann

511 793a8f7c Michael Hanselmann
    """
512 793a8f7c Michael Hanselmann
    assert timeout >= 0, "Timeout can not be negative"
513 f4484122 Michael Hanselmann
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
514 f4484122 Michael Hanselmann
                           (job_id, fields, prev_job_info,
515 793a8f7c Michael Hanselmann
                            prev_log_serial,
516 793a8f7c Michael Hanselmann
                            min(WFJC_TIMEOUT, timeout)))
517 f4484122 Michael Hanselmann
518 f4484122 Michael Hanselmann
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
519 5c735209 Iustin Pop
    while True:
520 f4484122 Michael Hanselmann
      result = self.WaitForJobChangeOnce(job_id, fields,
521 f4484122 Michael Hanselmann
                                         prev_job_info, prev_log_serial)
522 5c735209 Iustin Pop
      if result != constants.JOB_NOTCHANGED:
523 5c735209 Iustin Pop
        break
524 5c735209 Iustin Pop
    return result
525 dfe57c22 Michael Hanselmann
526 2e5c33db Iustin Pop
  def Query(self, what, fields, qfilter):
527 28b71a76 Michael Hanselmann
    """Query for resources/items.
528 28b71a76 Michael Hanselmann

529 abd66bf8 Michael Hanselmann
    @param what: One of L{constants.QR_VIA_LUXI}
530 28b71a76 Michael Hanselmann
    @type fields: List of strings
531 28b71a76 Michael Hanselmann
    @param fields: List of requested fields
532 2e5c33db Iustin Pop
    @type qfilter: None or list
533 2e5c33db Iustin Pop
    @param qfilter: Query filter
534 28b71a76 Michael Hanselmann
    @rtype: L{objects.QueryResponse}
535 28b71a76 Michael Hanselmann

536 28b71a76 Michael Hanselmann
    """
537 a629ecb9 Iustin Pop
    result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
538 28b71a76 Michael Hanselmann
    return objects.QueryResponse.FromDict(result)
539 28b71a76 Michael Hanselmann
540 28b71a76 Michael Hanselmann
  def QueryFields(self, what, fields):
541 28b71a76 Michael Hanselmann
    """Query for available fields.
542 28b71a76 Michael Hanselmann

543 abd66bf8 Michael Hanselmann
    @param what: One of L{constants.QR_VIA_LUXI}
544 28b71a76 Michael Hanselmann
    @type fields: None or list of strings
545 28b71a76 Michael Hanselmann
    @param fields: List of requested fields
546 28b71a76 Michael Hanselmann
    @rtype: L{objects.QueryFieldsResponse}
547 28b71a76 Michael Hanselmann

548 28b71a76 Michael Hanselmann
    """
549 a629ecb9 Iustin Pop
    result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
550 28b71a76 Michael Hanselmann
    return objects.QueryFieldsResponse.FromDict(result)
551 28b71a76 Michael Hanselmann
552 0bbe448c Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
553 0bbe448c Michael Hanselmann
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
554 3d8548c4 Michael Hanselmann
555 ec79568d Iustin Pop
  def QueryInstances(self, names, fields, use_locking):
556 ec79568d Iustin Pop
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
557 ee6c7b94 Michael Hanselmann
558 ec79568d Iustin Pop
  def QueryNodes(self, names, fields, use_locking):
559 ec79568d Iustin Pop
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
560 02f7fe54 Michael Hanselmann
561 a79ef2a5 Adeodato Simo
  def QueryGroups(self, names, fields, use_locking):
562 a79ef2a5 Adeodato Simo
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
563 a79ef2a5 Adeodato Simo
564 ec79568d Iustin Pop
  def QueryExports(self, nodes, use_locking):
565 ec79568d Iustin Pop
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
566 32f93223 Michael Hanselmann
567 66baeccc Iustin Pop
  def QueryClusterInfo(self):
568 66baeccc Iustin Pop
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
569 66baeccc Iustin Pop
570 ae5849b5 Michael Hanselmann
  def QueryConfigValues(self, fields):
571 a629ecb9 Iustin Pop
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
572 ae5849b5 Michael Hanselmann
573 7699c3af Iustin Pop
  def QueryTags(self, kind, name):
574 7699c3af Iustin Pop
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))