Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 1a2eb2dc

History | View | Annotate | Download (16.2 kB)

1 c2a03789 Iustin Pop
#
2 c2a03789 Iustin Pop
#
3 c2a03789 Iustin Pop
4 83c046a2 Iustin Pop
# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5 c2a03789 Iustin Pop
#
6 c2a03789 Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 c2a03789 Iustin Pop
# it under the terms of the GNU General Public License as published by
8 c2a03789 Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 c2a03789 Iustin Pop
# (at your option) any later version.
10 c2a03789 Iustin Pop
#
11 c2a03789 Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 c2a03789 Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 c2a03789 Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 c2a03789 Iustin Pop
# General Public License for more details.
15 c2a03789 Iustin Pop
#
16 c2a03789 Iustin Pop
# You should have received a copy of the GNU General Public License
17 c2a03789 Iustin Pop
# along with this program; if not, write to the Free Software
18 c2a03789 Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 c2a03789 Iustin Pop
# 02110-1301, USA.
20 c2a03789 Iustin Pop
21 c2a03789 Iustin Pop
22 c2a03789 Iustin Pop
"""Module for the unix socket protocol
23 c2a03789 Iustin Pop

24 8d5b316c Iustin Pop
This module implements the local unix socket protocol. You only need
25 c2a03789 Iustin Pop
this module and the opcodes module in the client program in order to
26 c2a03789 Iustin Pop
communicate with the master.
27 c2a03789 Iustin Pop

28 7577196d Guido Trotter
The module is also used by the master daemon.
29 c2a03789 Iustin Pop

30 c2a03789 Iustin Pop
"""
31 c2a03789 Iustin Pop
32 c2a03789 Iustin Pop
import socket
33 c2a03789 Iustin Pop
import collections
34 c2a03789 Iustin Pop
import time
35 03a8dbdc Iustin Pop
import errno
36 231db3a5 Michael Hanselmann
import logging
37 c2a03789 Iustin Pop
38 fad50141 Michael Hanselmann
from ganeti import serializer
39 ceab32dd Iustin Pop
from ganeti import constants
40 6797ec29 Iustin Pop
from ganeti import errors
41 cb462b06 Michael Hanselmann
from ganeti import utils
42 28b71a76 Michael Hanselmann
from ganeti import objects
43 b87ee98f Michael Hanselmann
from ganeti import pathutils
44 c2a03789 Iustin Pop
45 c2a03789 Iustin Pop
46 231db3a5 Michael Hanselmann
KEY_METHOD = "method"
47 231db3a5 Michael Hanselmann
KEY_ARGS = "args"
48 3d8548c4 Michael Hanselmann
KEY_SUCCESS = "success"
49 3d8548c4 Michael Hanselmann
KEY_RESULT = "result"
50 e986f20c Michael Hanselmann
KEY_VERSION = "version"
51 3d8548c4 Michael Hanselmann
52 0bbe448c Michael Hanselmann
REQ_SUBMIT_JOB = "SubmitJob"
53 2971c913 Iustin Pop
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
54 dfe57c22 Michael Hanselmann
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
55 0bbe448c Michael Hanselmann
REQ_CANCEL_JOB = "CancelJob"
56 0bbe448c Michael Hanselmann
REQ_ARCHIVE_JOB = "ArchiveJob"
57 f63ffb37 Michael Hanselmann
REQ_CHANGE_JOB_PRIORITY = "ChangeJobPriority"
58 83c046a2 Iustin Pop
REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
59 28b71a76 Michael Hanselmann
REQ_QUERY = "Query"
60 28b71a76 Michael Hanselmann
REQ_QUERY_FIELDS = "QueryFields"
61 0bbe448c Michael Hanselmann
REQ_QUERY_JOBS = "QueryJobs"
62 ee6c7b94 Michael Hanselmann
REQ_QUERY_INSTANCES = "QueryInstances"
63 02f7fe54 Michael Hanselmann
REQ_QUERY_NODES = "QueryNodes"
64 a79ef2a5 Adeodato Simo
REQ_QUERY_GROUPS = "QueryGroups"
65 32f93223 Michael Hanselmann
REQ_QUERY_EXPORTS = "QueryExports"
66 ae5849b5 Michael Hanselmann
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
67 66baeccc Iustin Pop
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
68 7699c3af Iustin Pop
REQ_QUERY_TAGS = "QueryTags"
69 83c046a2 Iustin Pop
REQ_SET_DRAIN_FLAG = "SetDrainFlag"
70 05e50653 Michael Hanselmann
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
71 c2a03789 Iustin Pop
72 e3a25810 Michael Hanselmann
#: List of all LUXI requests
73 e3a25810 Michael Hanselmann
REQ_ALL = frozenset([
74 e3a25810 Michael Hanselmann
  REQ_ARCHIVE_JOB,
75 83c046a2 Iustin Pop
  REQ_AUTO_ARCHIVE_JOBS,
76 e3a25810 Michael Hanselmann
  REQ_CANCEL_JOB,
77 f63ffb37 Michael Hanselmann
  REQ_CHANGE_JOB_PRIORITY,
78 e3a25810 Michael Hanselmann
  REQ_QUERY,
79 e3a25810 Michael Hanselmann
  REQ_QUERY_CLUSTER_INFO,
80 e3a25810 Michael Hanselmann
  REQ_QUERY_CONFIG_VALUES,
81 e3a25810 Michael Hanselmann
  REQ_QUERY_EXPORTS,
82 e3a25810 Michael Hanselmann
  REQ_QUERY_FIELDS,
83 e3a25810 Michael Hanselmann
  REQ_QUERY_GROUPS,
84 e3a25810 Michael Hanselmann
  REQ_QUERY_INSTANCES,
85 e3a25810 Michael Hanselmann
  REQ_QUERY_JOBS,
86 e3a25810 Michael Hanselmann
  REQ_QUERY_NODES,
87 e3a25810 Michael Hanselmann
  REQ_QUERY_TAGS,
88 83c046a2 Iustin Pop
  REQ_SET_DRAIN_FLAG,
89 e3a25810 Michael Hanselmann
  REQ_SET_WATCHER_PAUSE,
90 e3a25810 Michael Hanselmann
  REQ_SUBMIT_JOB,
91 e3a25810 Michael Hanselmann
  REQ_SUBMIT_MANY_JOBS,
92 e3a25810 Michael Hanselmann
  REQ_WAIT_FOR_JOB_CHANGE,
93 e3a25810 Michael Hanselmann
  ])
94 e3a25810 Michael Hanselmann
95 c2a03789 Iustin Pop
DEF_CTMO = 10
96 c2a03789 Iustin Pop
DEF_RWTO = 60
97 c2a03789 Iustin Pop
98 793a8f7c Michael Hanselmann
# WaitForJobChange timeout
99 793a8f7c Michael Hanselmann
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
100 793a8f7c Michael Hanselmann
101 c2a03789 Iustin Pop
102 7a8bda3f Michael Hanselmann
class ProtocolError(errors.LuxiError):
103 5a1c22fe Iustin Pop
  """Denotes an error in the LUXI protocol."""
104 c2a03789 Iustin Pop
105 c2a03789 Iustin Pop
106 c2a03789 Iustin Pop
class ConnectionClosedError(ProtocolError):
107 5a1c22fe Iustin Pop
  """Connection closed error."""
108 c2a03789 Iustin Pop
109 c2a03789 Iustin Pop
110 c2a03789 Iustin Pop
class TimeoutError(ProtocolError):
111 5a1c22fe Iustin Pop
  """Operation timeout error."""
112 c2a03789 Iustin Pop
113 c2a03789 Iustin Pop
114 b77acb3e Iustin Pop
class RequestError(ProtocolError):
115 5a1c22fe Iustin Pop
  """Error on request.
116 b77acb3e Iustin Pop

117 b77acb3e Iustin Pop
  This signifies an error in the request format or request handling,
118 b77acb3e Iustin Pop
  but not (e.g.) an error in starting up an instance.
119 b77acb3e Iustin Pop

120 b77acb3e Iustin Pop
  Some common conditions that can trigger this exception:
121 b77acb3e Iustin Pop
    - job submission failed because the job data was wrong
122 b77acb3e Iustin Pop
    - query failed because required fields were missing
123 b77acb3e Iustin Pop

124 b77acb3e Iustin Pop
  """
125 b77acb3e Iustin Pop
126 3d8548c4 Michael Hanselmann
127 03a8dbdc Iustin Pop
class NoMasterError(ProtocolError):
128 5a1c22fe Iustin Pop
  """The master cannot be reached.
129 03a8dbdc Iustin Pop

130 03a8dbdc Iustin Pop
  This means that the master daemon is not running or the socket has
131 03a8dbdc Iustin Pop
  been removed.
132 03a8dbdc Iustin Pop

133 03a8dbdc Iustin Pop
  """
134 03a8dbdc Iustin Pop
135 b77acb3e Iustin Pop
136 5a1c22fe Iustin Pop
class PermissionError(ProtocolError):
137 5a1c22fe Iustin Pop
  """Permission denied while connecting to the master socket.
138 5a1c22fe Iustin Pop

139 5a1c22fe Iustin Pop
  This means the user doesn't have the proper rights.
140 5a1c22fe Iustin Pop

141 5a1c22fe Iustin Pop
  """
142 5a1c22fe Iustin Pop
143 5a1c22fe Iustin Pop
144 c2a03789 Iustin Pop
class Transport:
145 c2a03789 Iustin Pop
  """Low-level transport class.
146 c2a03789 Iustin Pop

147 c2a03789 Iustin Pop
  This is used on the client side.
148 c2a03789 Iustin Pop

149 c2a03789 Iustin Pop
  This could be replace by any other class that provides the same
150 c2a03789 Iustin Pop
  semantics to the Client. This means:
151 c2a03789 Iustin Pop
    - can send messages and receive messages
152 c2a03789 Iustin Pop
    - safe for multithreading
153 c2a03789 Iustin Pop

154 c2a03789 Iustin Pop
  """
155 c2a03789 Iustin Pop
156 25942a6c Guido Trotter
  def __init__(self, address, timeouts=None):
157 c2a03789 Iustin Pop
    """Constructor for the Client class.
158 c2a03789 Iustin Pop

159 c2a03789 Iustin Pop
    Arguments:
160 c2a03789 Iustin Pop
      - address: a valid address the the used transport class
161 c2a03789 Iustin Pop
      - timeout: a list of timeouts, to be used on connect and read/write
162 c2a03789 Iustin Pop

163 c2a03789 Iustin Pop
    There are two timeouts used since we might want to wait for a long
164 c2a03789 Iustin Pop
    time for a response, but the connect timeout should be lower.
165 c2a03789 Iustin Pop

166 c2a03789 Iustin Pop
    If not passed, we use a default of 10 and respectively 60 seconds.
167 c2a03789 Iustin Pop

168 c2a03789 Iustin Pop
    Note that on reading data, since the timeout applies to an
169 c2a03789 Iustin Pop
    invidual receive, it might be that the total duration is longer
170 c2a03789 Iustin Pop
    than timeout value passed (we make a hard limit at twice the read
171 c2a03789 Iustin Pop
    timeout).
172 c2a03789 Iustin Pop

173 c2a03789 Iustin Pop
    """
174 c2a03789 Iustin Pop
    self.address = address
175 c2a03789 Iustin Pop
    if timeouts is None:
176 c2a03789 Iustin Pop
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
177 c2a03789 Iustin Pop
    else:
178 c2a03789 Iustin Pop
      self._ctimeout, self._rwtimeout = timeouts
179 c2a03789 Iustin Pop
180 c2a03789 Iustin Pop
    self.socket = None
181 c2a03789 Iustin Pop
    self._buffer = ""
182 c2a03789 Iustin Pop
    self._msgs = collections.deque()
183 c2a03789 Iustin Pop
184 c2a03789 Iustin Pop
    try:
185 c2a03789 Iustin Pop
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
186 cb462b06 Michael Hanselmann
187 cb462b06 Michael Hanselmann
      # Try to connect
188 c2a03789 Iustin Pop
      try:
189 cb462b06 Michael Hanselmann
        utils.Retry(self._Connect, 1.0, self._ctimeout,
190 cb462b06 Michael Hanselmann
                    args=(self.socket, address, self._ctimeout))
191 cb462b06 Michael Hanselmann
      except utils.RetryTimeout:
192 cb462b06 Michael Hanselmann
        raise TimeoutError("Connect timed out")
193 cb462b06 Michael Hanselmann
194 c2a03789 Iustin Pop
      self.socket.settimeout(self._rwtimeout)
195 03a8dbdc Iustin Pop
    except (socket.error, NoMasterError):
196 c2a03789 Iustin Pop
      if self.socket is not None:
197 c2a03789 Iustin Pop
        self.socket.close()
198 c2a03789 Iustin Pop
      self.socket = None
199 c2a03789 Iustin Pop
      raise
200 c2a03789 Iustin Pop
201 cb462b06 Michael Hanselmann
  @staticmethod
202 cb462b06 Michael Hanselmann
  def _Connect(sock, address, timeout):
203 cb462b06 Michael Hanselmann
    sock.settimeout(timeout)
204 cb462b06 Michael Hanselmann
    try:
205 cb462b06 Michael Hanselmann
      sock.connect(address)
206 cb462b06 Michael Hanselmann
    except socket.timeout, err:
207 cb462b06 Michael Hanselmann
      raise TimeoutError("Connect timed out: %s" % str(err))
208 cb462b06 Michael Hanselmann
    except socket.error, err:
209 5a1c22fe Iustin Pop
      error_code = err.args[0]
210 5a1c22fe Iustin Pop
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
211 cb462b06 Michael Hanselmann
        raise NoMasterError(address)
212 5a1c22fe Iustin Pop
      elif error_code in (errno.EPERM, errno.EACCES):
213 5a1c22fe Iustin Pop
        raise PermissionError(address)
214 5a1c22fe Iustin Pop
      elif error_code == errno.EAGAIN:
215 cb462b06 Michael Hanselmann
        # Server's socket backlog is full at the moment
216 cb462b06 Michael Hanselmann
        raise utils.RetryAgain()
217 cb462b06 Michael Hanselmann
      raise
218 cb462b06 Michael Hanselmann
219 c2a03789 Iustin Pop
  def _CheckSocket(self):
220 c2a03789 Iustin Pop
    """Make sure we are connected.
221 c2a03789 Iustin Pop

222 c2a03789 Iustin Pop
    """
223 c2a03789 Iustin Pop
    if self.socket is None:
224 c2a03789 Iustin Pop
      raise ProtocolError("Connection is closed")
225 c2a03789 Iustin Pop
226 c2a03789 Iustin Pop
  def Send(self, msg):
227 c2a03789 Iustin Pop
    """Send a message.
228 c2a03789 Iustin Pop

229 c2a03789 Iustin Pop
    This just sends a message and doesn't wait for the response.
230 c2a03789 Iustin Pop

231 c2a03789 Iustin Pop
    """
232 25942a6c Guido Trotter
    if constants.LUXI_EOM in msg:
233 797506fc Michael Hanselmann
      raise ProtocolError("Message terminator found in payload")
234 797506fc Michael Hanselmann
235 c2a03789 Iustin Pop
    self._CheckSocket()
236 c2a03789 Iustin Pop
    try:
237 6096ee13 Michael Hanselmann
      # TODO: sendall is not guaranteed to send everything
238 25942a6c Guido Trotter
      self.socket.sendall(msg + constants.LUXI_EOM)
239 c2a03789 Iustin Pop
    except socket.timeout, err:
240 c2a03789 Iustin Pop
      raise TimeoutError("Sending timeout: %s" % str(err))
241 c2a03789 Iustin Pop
242 c2a03789 Iustin Pop
  def Recv(self):
243 5bbd3f7f Michael Hanselmann
    """Try to receive a message from the socket.
244 c2a03789 Iustin Pop

245 c2a03789 Iustin Pop
    In case we already have messages queued, we just return from the
246 c2a03789 Iustin Pop
    queue. Otherwise, we try to read data with a _rwtimeout network
247 c2a03789 Iustin Pop
    timeout, and making sure we don't go over 2x_rwtimeout as a global
248 c2a03789 Iustin Pop
    limit.
249 c2a03789 Iustin Pop

250 c2a03789 Iustin Pop
    """
251 c2a03789 Iustin Pop
    self._CheckSocket()
252 c2a03789 Iustin Pop
    etime = time.time() + self._rwtimeout
253 c2a03789 Iustin Pop
    while not self._msgs:
254 c2a03789 Iustin Pop
      if time.time() > etime:
255 c2a03789 Iustin Pop
        raise TimeoutError("Extended receive timeout")
256 6096ee13 Michael Hanselmann
      while True:
257 6096ee13 Michael Hanselmann
        try:
258 6096ee13 Michael Hanselmann
          data = self.socket.recv(4096)
259 28e3e216 Michael Hanselmann
        except socket.timeout, err:
260 28e3e216 Michael Hanselmann
          raise TimeoutError("Receive timeout: %s" % str(err))
261 6096ee13 Michael Hanselmann
        except socket.error, err:
262 6096ee13 Michael Hanselmann
          if err.args and err.args[0] == errno.EAGAIN:
263 6096ee13 Michael Hanselmann
            continue
264 6096ee13 Michael Hanselmann
          raise
265 6096ee13 Michael Hanselmann
        break
266 c2a03789 Iustin Pop
      if not data:
267 c2a03789 Iustin Pop
        raise ConnectionClosedError("Connection closed while reading")
268 25942a6c Guido Trotter
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
269 c2a03789 Iustin Pop
      self._buffer = new_msgs.pop()
270 c2a03789 Iustin Pop
      self._msgs.extend(new_msgs)
271 c2a03789 Iustin Pop
    return self._msgs.popleft()
272 c2a03789 Iustin Pop
273 c2a03789 Iustin Pop
  def Call(self, msg):
274 c2a03789 Iustin Pop
    """Send a message and wait for the response.
275 c2a03789 Iustin Pop

276 c2a03789 Iustin Pop
    This is just a wrapper over Send and Recv.
277 c2a03789 Iustin Pop

278 c2a03789 Iustin Pop
    """
279 c2a03789 Iustin Pop
    self.Send(msg)
280 c2a03789 Iustin Pop
    return self.Recv()
281 c2a03789 Iustin Pop
282 c2a03789 Iustin Pop
  def Close(self):
283 c2a03789 Iustin Pop
    """Close the socket"""
284 c2a03789 Iustin Pop
    if self.socket is not None:
285 c2a03789 Iustin Pop
      self.socket.close()
286 c2a03789 Iustin Pop
      self.socket = None
287 c2a03789 Iustin Pop
288 c2a03789 Iustin Pop
289 231db3a5 Michael Hanselmann
def ParseRequest(msg):
290 231db3a5 Michael Hanselmann
  """Parses a LUXI request message.
291 231db3a5 Michael Hanselmann

292 231db3a5 Michael Hanselmann
  """
293 231db3a5 Michael Hanselmann
  try:
294 231db3a5 Michael Hanselmann
    request = serializer.LoadJson(msg)
295 231db3a5 Michael Hanselmann
  except ValueError, err:
296 231db3a5 Michael Hanselmann
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
297 231db3a5 Michael Hanselmann
298 231db3a5 Michael Hanselmann
  logging.debug("LUXI request: %s", request)
299 231db3a5 Michael Hanselmann
300 231db3a5 Michael Hanselmann
  if not isinstance(request, dict):
301 231db3a5 Michael Hanselmann
    logging.error("LUXI request not a dict: %r", msg)
302 231db3a5 Michael Hanselmann
    raise ProtocolError("Invalid LUXI request (not a dict)")
303 231db3a5 Michael Hanselmann
304 b459a848 Andrea Spadaccini
  method = request.get(KEY_METHOD, None) # pylint: disable=E1103
305 b459a848 Andrea Spadaccini
  args = request.get(KEY_ARGS, None) # pylint: disable=E1103
306 b459a848 Andrea Spadaccini
  version = request.get(KEY_VERSION, None) # pylint: disable=E1103
307 e7a25b08 Guido Trotter
308 231db3a5 Michael Hanselmann
  if method is None or args is None:
309 231db3a5 Michael Hanselmann
    logging.error("LUXI request missing method or arguments: %r", msg)
310 231db3a5 Michael Hanselmann
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
311 231db3a5 Michael Hanselmann
                         " in request): %r") % msg)
312 231db3a5 Michael Hanselmann
313 e986f20c Michael Hanselmann
  return (method, args, version)
314 231db3a5 Michael Hanselmann
315 231db3a5 Michael Hanselmann
316 231db3a5 Michael Hanselmann
def ParseResponse(msg):
317 231db3a5 Michael Hanselmann
  """Parses a LUXI response message.
318 231db3a5 Michael Hanselmann

319 231db3a5 Michael Hanselmann
  """
320 231db3a5 Michael Hanselmann
  # Parse the result
321 231db3a5 Michael Hanselmann
  try:
322 231db3a5 Michael Hanselmann
    data = serializer.LoadJson(msg)
323 d143f2c6 Iustin Pop
  except KeyboardInterrupt:
324 d143f2c6 Iustin Pop
    raise
325 231db3a5 Michael Hanselmann
  except Exception, err:
326 231db3a5 Michael Hanselmann
    raise ProtocolError("Error while deserializing response: %s" % str(err))
327 231db3a5 Michael Hanselmann
328 231db3a5 Michael Hanselmann
  # Validate response
329 231db3a5 Michael Hanselmann
  if not (isinstance(data, dict) and
330 231db3a5 Michael Hanselmann
          KEY_SUCCESS in data and
331 231db3a5 Michael Hanselmann
          KEY_RESULT in data):
332 231db3a5 Michael Hanselmann
    raise ProtocolError("Invalid response from server: %r" % data)
333 231db3a5 Michael Hanselmann
334 2317945a Guido Trotter
  return (data[KEY_SUCCESS], data[KEY_RESULT],
335 b459a848 Andrea Spadaccini
          data.get(KEY_VERSION, None)) # pylint: disable=E1103
336 231db3a5 Michael Hanselmann
337 231db3a5 Michael Hanselmann
338 e986f20c Michael Hanselmann
def FormatResponse(success, result, version=None):
339 231db3a5 Michael Hanselmann
  """Formats a LUXI response message.
340 231db3a5 Michael Hanselmann

341 231db3a5 Michael Hanselmann
  """
342 231db3a5 Michael Hanselmann
  response = {
343 231db3a5 Michael Hanselmann
    KEY_SUCCESS: success,
344 231db3a5 Michael Hanselmann
    KEY_RESULT: result,
345 231db3a5 Michael Hanselmann
    }
346 231db3a5 Michael Hanselmann
347 e986f20c Michael Hanselmann
  if version is not None:
348 e986f20c Michael Hanselmann
    response[KEY_VERSION] = version
349 e986f20c Michael Hanselmann
350 231db3a5 Michael Hanselmann
  logging.debug("LUXI response: %s", response)
351 231db3a5 Michael Hanselmann
352 231db3a5 Michael Hanselmann
  return serializer.DumpJson(response)
353 231db3a5 Michael Hanselmann
354 231db3a5 Michael Hanselmann
355 e986f20c Michael Hanselmann
def FormatRequest(method, args, version=None):
356 231db3a5 Michael Hanselmann
  """Formats a LUXI request message.
357 231db3a5 Michael Hanselmann

358 231db3a5 Michael Hanselmann
  """
359 231db3a5 Michael Hanselmann
  # Build request
360 231db3a5 Michael Hanselmann
  request = {
361 231db3a5 Michael Hanselmann
    KEY_METHOD: method,
362 231db3a5 Michael Hanselmann
    KEY_ARGS: args,
363 231db3a5 Michael Hanselmann
    }
364 231db3a5 Michael Hanselmann
365 e986f20c Michael Hanselmann
  if version is not None:
366 e986f20c Michael Hanselmann
    request[KEY_VERSION] = version
367 e986f20c Michael Hanselmann
368 231db3a5 Michael Hanselmann
  # Serialize the request
369 a182a3ed Michael Hanselmann
  return serializer.DumpJson(request)
370 231db3a5 Michael Hanselmann
371 231db3a5 Michael Hanselmann
372 e986f20c Michael Hanselmann
def CallLuxiMethod(transport_cb, method, args, version=None):
373 231db3a5 Michael Hanselmann
  """Send a LUXI request via a transport and return the response.
374 231db3a5 Michael Hanselmann

375 231db3a5 Michael Hanselmann
  """
376 231db3a5 Michael Hanselmann
  assert callable(transport_cb)
377 231db3a5 Michael Hanselmann
378 e986f20c Michael Hanselmann
  request_msg = FormatRequest(method, args, version=version)
379 231db3a5 Michael Hanselmann
380 231db3a5 Michael Hanselmann
  # Send request and wait for response
381 231db3a5 Michael Hanselmann
  response_msg = transport_cb(request_msg)
382 231db3a5 Michael Hanselmann
383 e986f20c Michael Hanselmann
  (success, result, resp_version) = ParseResponse(response_msg)
384 e986f20c Michael Hanselmann
385 e986f20c Michael Hanselmann
  # Verify version if there was one in the response
386 e986f20c Michael Hanselmann
  if resp_version is not None and resp_version != version:
387 e986f20c Michael Hanselmann
    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
388 e986f20c Michael Hanselmann
                           (version, resp_version))
389 231db3a5 Michael Hanselmann
390 231db3a5 Michael Hanselmann
  if success:
391 231db3a5 Michael Hanselmann
    return result
392 231db3a5 Michael Hanselmann
393 231db3a5 Michael Hanselmann
  errors.MaybeRaise(result)
394 231db3a5 Michael Hanselmann
  raise RequestError(result)
395 231db3a5 Michael Hanselmann
396 231db3a5 Michael Hanselmann
397 c2a03789 Iustin Pop
class Client(object):
398 c2a03789 Iustin Pop
  """High-level client implementation.
399 c2a03789 Iustin Pop

400 c2a03789 Iustin Pop
  This uses a backing Transport-like class on top of which it
401 c2a03789 Iustin Pop
  implements data serialization/deserialization.
402 c2a03789 Iustin Pop

403 c2a03789 Iustin Pop
  """
404 ceab32dd Iustin Pop
  def __init__(self, address=None, timeouts=None, transport=Transport):
405 c2a03789 Iustin Pop
    """Constructor for the Client class.
406 c2a03789 Iustin Pop

407 c2a03789 Iustin Pop
    Arguments:
408 c2a03789 Iustin Pop
      - address: a valid address the the used transport class
409 c2a03789 Iustin Pop
      - timeout: a list of timeouts, to be used on connect and read/write
410 c2a03789 Iustin Pop
      - transport: a Transport-like class
411 c2a03789 Iustin Pop

412 c2a03789 Iustin Pop

413 c2a03789 Iustin Pop
    If timeout is not passed, the default timeouts of the transport
414 c2a03789 Iustin Pop
    class are used.
415 c2a03789 Iustin Pop

416 c2a03789 Iustin Pop
    """
417 ceab32dd Iustin Pop
    if address is None:
418 b87ee98f Michael Hanselmann
      address = pathutils.MASTER_SOCKET
419 8d5b316c Iustin Pop
    self.address = address
420 8d5b316c Iustin Pop
    self.timeouts = timeouts
421 8d5b316c Iustin Pop
    self.transport_class = transport
422 8d5b316c Iustin Pop
    self.transport = None
423 8d5b316c Iustin Pop
    self._InitTransport()
424 8d5b316c Iustin Pop
425 8d5b316c Iustin Pop
  def _InitTransport(self):
426 8d5b316c Iustin Pop
    """(Re)initialize the transport if needed.
427 8d5b316c Iustin Pop

428 8d5b316c Iustin Pop
    """
429 8d5b316c Iustin Pop
    if self.transport is None:
430 8d5b316c Iustin Pop
      self.transport = self.transport_class(self.address,
431 8d5b316c Iustin Pop
                                            timeouts=self.timeouts)
432 8d5b316c Iustin Pop
433 8d5b316c Iustin Pop
  def _CloseTransport(self):
434 8d5b316c Iustin Pop
    """Close the transport, ignoring errors.
435 8d5b316c Iustin Pop

436 8d5b316c Iustin Pop
    """
437 8d5b316c Iustin Pop
    if self.transport is None:
438 8d5b316c Iustin Pop
      return
439 8d5b316c Iustin Pop
    try:
440 8d5b316c Iustin Pop
      old_transp = self.transport
441 8d5b316c Iustin Pop
      self.transport = None
442 8d5b316c Iustin Pop
      old_transp.Close()
443 b459a848 Andrea Spadaccini
    except Exception: # pylint: disable=W0703
444 8d5b316c Iustin Pop
      pass
445 c2a03789 Iustin Pop
446 231db3a5 Michael Hanselmann
  def _SendMethodCall(self, data):
447 3d8548c4 Michael Hanselmann
    # Send request and wait for response
448 8d5b316c Iustin Pop
    try:
449 8d5b316c Iustin Pop
      self._InitTransport()
450 231db3a5 Michael Hanselmann
      return self.transport.Call(data)
451 8d5b316c Iustin Pop
    except Exception:
452 8d5b316c Iustin Pop
      self._CloseTransport()
453 8d5b316c Iustin Pop
      raise
454 8d5b316c Iustin Pop
455 2a917701 Michael Hanselmann
  def Close(self):
456 2a917701 Michael Hanselmann
    """Close the underlying connection.
457 2a917701 Michael Hanselmann

458 2a917701 Michael Hanselmann
    """
459 2a917701 Michael Hanselmann
    self._CloseTransport()
460 2a917701 Michael Hanselmann
461 231db3a5 Michael Hanselmann
  def CallMethod(self, method, args):
462 231db3a5 Michael Hanselmann
    """Send a generic request and return the response.
463 3d8548c4 Michael Hanselmann

464 231db3a5 Michael Hanselmann
    """
465 a629ecb9 Iustin Pop
    if not isinstance(args, (list, tuple)):
466 a629ecb9 Iustin Pop
      raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
467 a629ecb9 Iustin Pop
                                   " expected list, got %s" % type(args))
468 e986f20c Michael Hanselmann
    return CallLuxiMethod(self._SendMethodCall, method, args,
469 e986f20c Michael Hanselmann
                          version=constants.LUXI_VERSION)
470 c2a03789 Iustin Pop
471 3ccafd0e Iustin Pop
  def SetQueueDrainFlag(self, drain_flag):
472 83c046a2 Iustin Pop
    return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
473 3ccafd0e Iustin Pop
474 05e50653 Michael Hanselmann
  def SetWatcherPause(self, until):
475 a629ecb9 Iustin Pop
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
476 05e50653 Michael Hanselmann
477 0bbe448c Michael Hanselmann
  def SubmitJob(self, ops):
478 0bbe448c Michael Hanselmann
    ops_state = map(lambda op: op.__getstate__(), ops)
479 734a2a7c René Nussbaumer
    return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
480 0bbe448c Michael Hanselmann
481 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
482 2971c913 Iustin Pop
    jobs_state = []
483 2971c913 Iustin Pop
    for ops in jobs:
484 2971c913 Iustin Pop
      jobs_state.append([op.__getstate__() for op in ops])
485 734a2a7c René Nussbaumer
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
486 2971c913 Iustin Pop
487 0bbe448c Michael Hanselmann
  def CancelJob(self, job_id):
488 a629ecb9 Iustin Pop
    return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
489 0bbe448c Michael Hanselmann
490 0bbe448c Michael Hanselmann
  def ArchiveJob(self, job_id):
491 a629ecb9 Iustin Pop
    return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
492 0bbe448c Michael Hanselmann
493 f63ffb37 Michael Hanselmann
  def ChangeJobPriority(self, job_id, priority):
494 f63ffb37 Michael Hanselmann
    return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
495 f63ffb37 Michael Hanselmann
496 07cd723a Iustin Pop
  def AutoArchiveJobs(self, age):
497 f8ad5591 Michael Hanselmann
    timeout = (DEF_RWTO - 1) / 2
498 83c046a2 Iustin Pop
    return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
499 07cd723a Iustin Pop
500 f4484122 Michael Hanselmann
  def WaitForJobChangeOnce(self, job_id, fields,
501 793a8f7c Michael Hanselmann
                           prev_job_info, prev_log_serial,
502 793a8f7c Michael Hanselmann
                           timeout=WFJC_TIMEOUT):
503 793a8f7c Michael Hanselmann
    """Waits for changes on a job.
504 793a8f7c Michael Hanselmann

505 793a8f7c Michael Hanselmann
    @param job_id: Job ID
506 793a8f7c Michael Hanselmann
    @type fields: list
507 793a8f7c Michael Hanselmann
    @param fields: List of field names to be observed
508 793a8f7c Michael Hanselmann
    @type prev_job_info: None or list
509 793a8f7c Michael Hanselmann
    @param prev_job_info: Previously received job information
510 793a8f7c Michael Hanselmann
    @type prev_log_serial: None or int/long
511 793a8f7c Michael Hanselmann
    @param prev_log_serial: Highest log serial number previously received
512 793a8f7c Michael Hanselmann
    @type timeout: int/float
513 793a8f7c Michael Hanselmann
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
514 793a8f7c Michael Hanselmann
                    be capped to that value)
515 793a8f7c Michael Hanselmann

516 793a8f7c Michael Hanselmann
    """
517 793a8f7c Michael Hanselmann
    assert timeout >= 0, "Timeout can not be negative"
518 f4484122 Michael Hanselmann
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
519 f4484122 Michael Hanselmann
                           (job_id, fields, prev_job_info,
520 793a8f7c Michael Hanselmann
                            prev_log_serial,
521 793a8f7c Michael Hanselmann
                            min(WFJC_TIMEOUT, timeout)))
522 f4484122 Michael Hanselmann
523 f4484122 Michael Hanselmann
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
524 5c735209 Iustin Pop
    while True:
525 f4484122 Michael Hanselmann
      result = self.WaitForJobChangeOnce(job_id, fields,
526 f4484122 Michael Hanselmann
                                         prev_job_info, prev_log_serial)
527 5c735209 Iustin Pop
      if result != constants.JOB_NOTCHANGED:
528 5c735209 Iustin Pop
        break
529 5c735209 Iustin Pop
    return result
530 dfe57c22 Michael Hanselmann
531 2e5c33db Iustin Pop
  def Query(self, what, fields, qfilter):
532 28b71a76 Michael Hanselmann
    """Query for resources/items.
533 28b71a76 Michael Hanselmann

534 abd66bf8 Michael Hanselmann
    @param what: One of L{constants.QR_VIA_LUXI}
535 28b71a76 Michael Hanselmann
    @type fields: List of strings
536 28b71a76 Michael Hanselmann
    @param fields: List of requested fields
537 2e5c33db Iustin Pop
    @type qfilter: None or list
538 2e5c33db Iustin Pop
    @param qfilter: Query filter
539 28b71a76 Michael Hanselmann
    @rtype: L{objects.QueryResponse}
540 28b71a76 Michael Hanselmann

541 28b71a76 Michael Hanselmann
    """
542 a629ecb9 Iustin Pop
    result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
543 28b71a76 Michael Hanselmann
    return objects.QueryResponse.FromDict(result)
544 28b71a76 Michael Hanselmann
545 28b71a76 Michael Hanselmann
  def QueryFields(self, what, fields):
546 28b71a76 Michael Hanselmann
    """Query for available fields.
547 28b71a76 Michael Hanselmann

548 abd66bf8 Michael Hanselmann
    @param what: One of L{constants.QR_VIA_LUXI}
549 28b71a76 Michael Hanselmann
    @type fields: None or list of strings
550 28b71a76 Michael Hanselmann
    @param fields: List of requested fields
551 28b71a76 Michael Hanselmann
    @rtype: L{objects.QueryFieldsResponse}
552 28b71a76 Michael Hanselmann

553 28b71a76 Michael Hanselmann
    """
554 a629ecb9 Iustin Pop
    result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
555 28b71a76 Michael Hanselmann
    return objects.QueryFieldsResponse.FromDict(result)
556 28b71a76 Michael Hanselmann
557 0bbe448c Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
558 0bbe448c Michael Hanselmann
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
559 3d8548c4 Michael Hanselmann
560 ec79568d Iustin Pop
  def QueryInstances(self, names, fields, use_locking):
561 ec79568d Iustin Pop
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
562 ee6c7b94 Michael Hanselmann
563 ec79568d Iustin Pop
  def QueryNodes(self, names, fields, use_locking):
564 ec79568d Iustin Pop
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
565 02f7fe54 Michael Hanselmann
566 a79ef2a5 Adeodato Simo
  def QueryGroups(self, names, fields, use_locking):
567 a79ef2a5 Adeodato Simo
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
568 a79ef2a5 Adeodato Simo
569 ec79568d Iustin Pop
  def QueryExports(self, nodes, use_locking):
570 ec79568d Iustin Pop
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
571 32f93223 Michael Hanselmann
572 66baeccc Iustin Pop
  def QueryClusterInfo(self):
573 66baeccc Iustin Pop
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
574 66baeccc Iustin Pop
575 ae5849b5 Michael Hanselmann
  def QueryConfigValues(self, fields):
576 a629ecb9 Iustin Pop
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
577 ae5849b5 Michael Hanselmann
578 7699c3af Iustin Pop
  def QueryTags(self, kind, name):
579 7699c3af Iustin Pop
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))