Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 346c3037

History | View | Annotate | Download (16.7 kB)

1 c2a03789 Iustin Pop
#
2 c2a03789 Iustin Pop
#
3 c2a03789 Iustin Pop
4 83c046a2 Iustin Pop
# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5 c2a03789 Iustin Pop
#
6 c2a03789 Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 c2a03789 Iustin Pop
# it under the terms of the GNU General Public License as published by
8 c2a03789 Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 c2a03789 Iustin Pop
# (at your option) any later version.
10 c2a03789 Iustin Pop
#
11 c2a03789 Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 c2a03789 Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 c2a03789 Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 c2a03789 Iustin Pop
# General Public License for more details.
15 c2a03789 Iustin Pop
#
16 c2a03789 Iustin Pop
# You should have received a copy of the GNU General Public License
17 c2a03789 Iustin Pop
# along with this program; if not, write to the Free Software
18 c2a03789 Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 c2a03789 Iustin Pop
# 02110-1301, USA.
20 c2a03789 Iustin Pop
21 c2a03789 Iustin Pop
22 c2a03789 Iustin Pop
"""Module for the unix socket protocol
23 c2a03789 Iustin Pop

24 8d5b316c Iustin Pop
This module implements the local unix socket protocol. You only need
25 c2a03789 Iustin Pop
this module and the opcodes module in the client program in order to
26 c2a03789 Iustin Pop
communicate with the master.
27 c2a03789 Iustin Pop

28 7577196d Guido Trotter
The module is also used by the master daemon.
29 c2a03789 Iustin Pop

30 c2a03789 Iustin Pop
"""
31 c2a03789 Iustin Pop
32 c2a03789 Iustin Pop
import socket
33 c2a03789 Iustin Pop
import collections
34 c2a03789 Iustin Pop
import time
35 03a8dbdc Iustin Pop
import errno
36 231db3a5 Michael Hanselmann
import logging
37 c2a03789 Iustin Pop
38 b8028dcf Michael Hanselmann
from ganeti import compat
39 fad50141 Michael Hanselmann
from ganeti import serializer
40 ceab32dd Iustin Pop
from ganeti import constants
41 6797ec29 Iustin Pop
from ganeti import errors
42 cb462b06 Michael Hanselmann
from ganeti import utils
43 28b71a76 Michael Hanselmann
from ganeti import objects
44 b87ee98f Michael Hanselmann
from ganeti import pathutils
45 c2a03789 Iustin Pop
46 c2a03789 Iustin Pop
47 231db3a5 Michael Hanselmann
KEY_METHOD = "method"
48 231db3a5 Michael Hanselmann
KEY_ARGS = "args"
49 3d8548c4 Michael Hanselmann
KEY_SUCCESS = "success"
50 3d8548c4 Michael Hanselmann
KEY_RESULT = "result"
51 e986f20c Michael Hanselmann
KEY_VERSION = "version"
52 3d8548c4 Michael Hanselmann
53 0bbe448c Michael Hanselmann
REQ_SUBMIT_JOB = "SubmitJob"
54 346c3037 Klaus Aehlig
REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = "SubmitJobToDrainedQueue"
55 2971c913 Iustin Pop
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
56 dfe57c22 Michael Hanselmann
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
57 0bbe448c Michael Hanselmann
REQ_CANCEL_JOB = "CancelJob"
58 0bbe448c Michael Hanselmann
REQ_ARCHIVE_JOB = "ArchiveJob"
59 f63ffb37 Michael Hanselmann
REQ_CHANGE_JOB_PRIORITY = "ChangeJobPriority"
60 83c046a2 Iustin Pop
REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
61 28b71a76 Michael Hanselmann
REQ_QUERY = "Query"
62 28b71a76 Michael Hanselmann
REQ_QUERY_FIELDS = "QueryFields"
63 0bbe448c Michael Hanselmann
REQ_QUERY_JOBS = "QueryJobs"
64 ee6c7b94 Michael Hanselmann
REQ_QUERY_INSTANCES = "QueryInstances"
65 02f7fe54 Michael Hanselmann
REQ_QUERY_NODES = "QueryNodes"
66 a79ef2a5 Adeodato Simo
REQ_QUERY_GROUPS = "QueryGroups"
67 306bed0e Apollon Oikonomopoulos
REQ_QUERY_NETWORKS = "QueryNetworks"
68 32f93223 Michael Hanselmann
REQ_QUERY_EXPORTS = "QueryExports"
69 ae5849b5 Michael Hanselmann
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
70 66baeccc Iustin Pop
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
71 7699c3af Iustin Pop
REQ_QUERY_TAGS = "QueryTags"
72 83c046a2 Iustin Pop
REQ_SET_DRAIN_FLAG = "SetDrainFlag"
73 05e50653 Michael Hanselmann
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
74 c2a03789 Iustin Pop
75 e3a25810 Michael Hanselmann
#: List of all LUXI requests
76 b8028dcf Michael Hanselmann
REQ_ALL = compat.UniqueFrozenset([
77 e3a25810 Michael Hanselmann
  REQ_ARCHIVE_JOB,
78 83c046a2 Iustin Pop
  REQ_AUTO_ARCHIVE_JOBS,
79 e3a25810 Michael Hanselmann
  REQ_CANCEL_JOB,
80 f63ffb37 Michael Hanselmann
  REQ_CHANGE_JOB_PRIORITY,
81 e3a25810 Michael Hanselmann
  REQ_QUERY,
82 e3a25810 Michael Hanselmann
  REQ_QUERY_CLUSTER_INFO,
83 e3a25810 Michael Hanselmann
  REQ_QUERY_CONFIG_VALUES,
84 e3a25810 Michael Hanselmann
  REQ_QUERY_EXPORTS,
85 e3a25810 Michael Hanselmann
  REQ_QUERY_FIELDS,
86 e3a25810 Michael Hanselmann
  REQ_QUERY_GROUPS,
87 e3a25810 Michael Hanselmann
  REQ_QUERY_INSTANCES,
88 e3a25810 Michael Hanselmann
  REQ_QUERY_JOBS,
89 e3a25810 Michael Hanselmann
  REQ_QUERY_NODES,
90 795d035d Klaus Aehlig
  REQ_QUERY_NETWORKS,
91 e3a25810 Michael Hanselmann
  REQ_QUERY_TAGS,
92 83c046a2 Iustin Pop
  REQ_SET_DRAIN_FLAG,
93 e3a25810 Michael Hanselmann
  REQ_SET_WATCHER_PAUSE,
94 e3a25810 Michael Hanselmann
  REQ_SUBMIT_JOB,
95 346c3037 Klaus Aehlig
  REQ_SUBMIT_JOB_TO_DRAINED_QUEUE,
96 e3a25810 Michael Hanselmann
  REQ_SUBMIT_MANY_JOBS,
97 e3a25810 Michael Hanselmann
  REQ_WAIT_FOR_JOB_CHANGE,
98 e3a25810 Michael Hanselmann
  ])
99 e3a25810 Michael Hanselmann
100 c2a03789 Iustin Pop
DEF_CTMO = 10
101 c2a03789 Iustin Pop
DEF_RWTO = 60
102 c2a03789 Iustin Pop
103 793a8f7c Michael Hanselmann
# WaitForJobChange timeout
104 793a8f7c Michael Hanselmann
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
105 793a8f7c Michael Hanselmann
106 c2a03789 Iustin Pop
107 7a8bda3f Michael Hanselmann
class ProtocolError(errors.LuxiError):
108 5a1c22fe Iustin Pop
  """Denotes an error in the LUXI protocol."""
109 c2a03789 Iustin Pop
110 c2a03789 Iustin Pop
111 c2a03789 Iustin Pop
class ConnectionClosedError(ProtocolError):
112 5a1c22fe Iustin Pop
  """Connection closed error."""
113 c2a03789 Iustin Pop
114 c2a03789 Iustin Pop
115 c2a03789 Iustin Pop
class TimeoutError(ProtocolError):
116 5a1c22fe Iustin Pop
  """Operation timeout error."""
117 c2a03789 Iustin Pop
118 c2a03789 Iustin Pop
119 b77acb3e Iustin Pop
class RequestError(ProtocolError):
120 5a1c22fe Iustin Pop
  """Error on request.
121 b77acb3e Iustin Pop

122 b77acb3e Iustin Pop
  This signifies an error in the request format or request handling,
123 b77acb3e Iustin Pop
  but not (e.g.) an error in starting up an instance.
124 b77acb3e Iustin Pop

125 b77acb3e Iustin Pop
  Some common conditions that can trigger this exception:
126 b77acb3e Iustin Pop
    - job submission failed because the job data was wrong
127 b77acb3e Iustin Pop
    - query failed because required fields were missing
128 b77acb3e Iustin Pop

129 b77acb3e Iustin Pop
  """
130 b77acb3e Iustin Pop
131 3d8548c4 Michael Hanselmann
132 03a8dbdc Iustin Pop
class NoMasterError(ProtocolError):
133 5a1c22fe Iustin Pop
  """The master cannot be reached.
134 03a8dbdc Iustin Pop

135 03a8dbdc Iustin Pop
  This means that the master daemon is not running or the socket has
136 03a8dbdc Iustin Pop
  been removed.
137 03a8dbdc Iustin Pop

138 03a8dbdc Iustin Pop
  """
139 03a8dbdc Iustin Pop
140 b77acb3e Iustin Pop
141 5a1c22fe Iustin Pop
class PermissionError(ProtocolError):
142 5a1c22fe Iustin Pop
  """Permission denied while connecting to the master socket.
143 5a1c22fe Iustin Pop

144 5a1c22fe Iustin Pop
  This means the user doesn't have the proper rights.
145 5a1c22fe Iustin Pop

146 5a1c22fe Iustin Pop
  """
147 5a1c22fe Iustin Pop
148 5a1c22fe Iustin Pop
149 c2a03789 Iustin Pop
class Transport:
150 c2a03789 Iustin Pop
  """Low-level transport class.
151 c2a03789 Iustin Pop

152 c2a03789 Iustin Pop
  This is used on the client side.
153 c2a03789 Iustin Pop

154 c2a03789 Iustin Pop
  This could be replace by any other class that provides the same
155 c2a03789 Iustin Pop
  semantics to the Client. This means:
156 c2a03789 Iustin Pop
    - can send messages and receive messages
157 c2a03789 Iustin Pop
    - safe for multithreading
158 c2a03789 Iustin Pop

159 c2a03789 Iustin Pop
  """
160 c2a03789 Iustin Pop
161 25942a6c Guido Trotter
  def __init__(self, address, timeouts=None):
162 c2a03789 Iustin Pop
    """Constructor for the Client class.
163 c2a03789 Iustin Pop

164 c2a03789 Iustin Pop
    Arguments:
165 c2a03789 Iustin Pop
      - address: a valid address the the used transport class
166 c2a03789 Iustin Pop
      - timeout: a list of timeouts, to be used on connect and read/write
167 c2a03789 Iustin Pop

168 c2a03789 Iustin Pop
    There are two timeouts used since we might want to wait for a long
169 c2a03789 Iustin Pop
    time for a response, but the connect timeout should be lower.
170 c2a03789 Iustin Pop

171 c2a03789 Iustin Pop
    If not passed, we use a default of 10 and respectively 60 seconds.
172 c2a03789 Iustin Pop

173 c2a03789 Iustin Pop
    Note that on reading data, since the timeout applies to an
174 c2a03789 Iustin Pop
    invidual receive, it might be that the total duration is longer
175 c2a03789 Iustin Pop
    than timeout value passed (we make a hard limit at twice the read
176 c2a03789 Iustin Pop
    timeout).
177 c2a03789 Iustin Pop

178 c2a03789 Iustin Pop
    """
179 c2a03789 Iustin Pop
    self.address = address
180 c2a03789 Iustin Pop
    if timeouts is None:
181 c2a03789 Iustin Pop
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
182 c2a03789 Iustin Pop
    else:
183 c2a03789 Iustin Pop
      self._ctimeout, self._rwtimeout = timeouts
184 c2a03789 Iustin Pop
185 c2a03789 Iustin Pop
    self.socket = None
186 c2a03789 Iustin Pop
    self._buffer = ""
187 c2a03789 Iustin Pop
    self._msgs = collections.deque()
188 c2a03789 Iustin Pop
189 c2a03789 Iustin Pop
    try:
190 c2a03789 Iustin Pop
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
191 cb462b06 Michael Hanselmann
192 cb462b06 Michael Hanselmann
      # Try to connect
193 c2a03789 Iustin Pop
      try:
194 cb462b06 Michael Hanselmann
        utils.Retry(self._Connect, 1.0, self._ctimeout,
195 cb462b06 Michael Hanselmann
                    args=(self.socket, address, self._ctimeout))
196 cb462b06 Michael Hanselmann
      except utils.RetryTimeout:
197 cb462b06 Michael Hanselmann
        raise TimeoutError("Connect timed out")
198 cb462b06 Michael Hanselmann
199 c2a03789 Iustin Pop
      self.socket.settimeout(self._rwtimeout)
200 03a8dbdc Iustin Pop
    except (socket.error, NoMasterError):
201 c2a03789 Iustin Pop
      if self.socket is not None:
202 c2a03789 Iustin Pop
        self.socket.close()
203 c2a03789 Iustin Pop
      self.socket = None
204 c2a03789 Iustin Pop
      raise
205 c2a03789 Iustin Pop
206 cb462b06 Michael Hanselmann
  @staticmethod
207 cb462b06 Michael Hanselmann
  def _Connect(sock, address, timeout):
208 cb462b06 Michael Hanselmann
    sock.settimeout(timeout)
209 cb462b06 Michael Hanselmann
    try:
210 cb462b06 Michael Hanselmann
      sock.connect(address)
211 cb462b06 Michael Hanselmann
    except socket.timeout, err:
212 cb462b06 Michael Hanselmann
      raise TimeoutError("Connect timed out: %s" % str(err))
213 cb462b06 Michael Hanselmann
    except socket.error, err:
214 5a1c22fe Iustin Pop
      error_code = err.args[0]
215 5a1c22fe Iustin Pop
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
216 cb462b06 Michael Hanselmann
        raise NoMasterError(address)
217 5a1c22fe Iustin Pop
      elif error_code in (errno.EPERM, errno.EACCES):
218 5a1c22fe Iustin Pop
        raise PermissionError(address)
219 5a1c22fe Iustin Pop
      elif error_code == errno.EAGAIN:
220 cb462b06 Michael Hanselmann
        # Server's socket backlog is full at the moment
221 cb462b06 Michael Hanselmann
        raise utils.RetryAgain()
222 cb462b06 Michael Hanselmann
      raise
223 cb462b06 Michael Hanselmann
224 c2a03789 Iustin Pop
  def _CheckSocket(self):
225 c2a03789 Iustin Pop
    """Make sure we are connected.
226 c2a03789 Iustin Pop

227 c2a03789 Iustin Pop
    """
228 c2a03789 Iustin Pop
    if self.socket is None:
229 c2a03789 Iustin Pop
      raise ProtocolError("Connection is closed")
230 c2a03789 Iustin Pop
231 c2a03789 Iustin Pop
  def Send(self, msg):
232 c2a03789 Iustin Pop
    """Send a message.
233 c2a03789 Iustin Pop

234 c2a03789 Iustin Pop
    This just sends a message and doesn't wait for the response.
235 c2a03789 Iustin Pop

236 c2a03789 Iustin Pop
    """
237 25942a6c Guido Trotter
    if constants.LUXI_EOM in msg:
238 797506fc Michael Hanselmann
      raise ProtocolError("Message terminator found in payload")
239 797506fc Michael Hanselmann
240 c2a03789 Iustin Pop
    self._CheckSocket()
241 c2a03789 Iustin Pop
    try:
242 6096ee13 Michael Hanselmann
      # TODO: sendall is not guaranteed to send everything
243 25942a6c Guido Trotter
      self.socket.sendall(msg + constants.LUXI_EOM)
244 c2a03789 Iustin Pop
    except socket.timeout, err:
245 c2a03789 Iustin Pop
      raise TimeoutError("Sending timeout: %s" % str(err))
246 c2a03789 Iustin Pop
247 c2a03789 Iustin Pop
  def Recv(self):
248 5bbd3f7f Michael Hanselmann
    """Try to receive a message from the socket.
249 c2a03789 Iustin Pop

250 c2a03789 Iustin Pop
    In case we already have messages queued, we just return from the
251 c2a03789 Iustin Pop
    queue. Otherwise, we try to read data with a _rwtimeout network
252 c2a03789 Iustin Pop
    timeout, and making sure we don't go over 2x_rwtimeout as a global
253 c2a03789 Iustin Pop
    limit.
254 c2a03789 Iustin Pop

255 c2a03789 Iustin Pop
    """
256 c2a03789 Iustin Pop
    self._CheckSocket()
257 c2a03789 Iustin Pop
    etime = time.time() + self._rwtimeout
258 c2a03789 Iustin Pop
    while not self._msgs:
259 c2a03789 Iustin Pop
      if time.time() > etime:
260 c2a03789 Iustin Pop
        raise TimeoutError("Extended receive timeout")
261 6096ee13 Michael Hanselmann
      while True:
262 6096ee13 Michael Hanselmann
        try:
263 6096ee13 Michael Hanselmann
          data = self.socket.recv(4096)
264 28e3e216 Michael Hanselmann
        except socket.timeout, err:
265 28e3e216 Michael Hanselmann
          raise TimeoutError("Receive timeout: %s" % str(err))
266 6096ee13 Michael Hanselmann
        except socket.error, err:
267 6096ee13 Michael Hanselmann
          if err.args and err.args[0] == errno.EAGAIN:
268 6096ee13 Michael Hanselmann
            continue
269 6096ee13 Michael Hanselmann
          raise
270 6096ee13 Michael Hanselmann
        break
271 c2a03789 Iustin Pop
      if not data:
272 c2a03789 Iustin Pop
        raise ConnectionClosedError("Connection closed while reading")
273 25942a6c Guido Trotter
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
274 c2a03789 Iustin Pop
      self._buffer = new_msgs.pop()
275 c2a03789 Iustin Pop
      self._msgs.extend(new_msgs)
276 c2a03789 Iustin Pop
    return self._msgs.popleft()
277 c2a03789 Iustin Pop
278 c2a03789 Iustin Pop
  def Call(self, msg):
279 c2a03789 Iustin Pop
    """Send a message and wait for the response.
280 c2a03789 Iustin Pop

281 c2a03789 Iustin Pop
    This is just a wrapper over Send and Recv.
282 c2a03789 Iustin Pop

283 c2a03789 Iustin Pop
    """
284 c2a03789 Iustin Pop
    self.Send(msg)
285 c2a03789 Iustin Pop
    return self.Recv()
286 c2a03789 Iustin Pop
287 c2a03789 Iustin Pop
  def Close(self):
288 c2a03789 Iustin Pop
    """Close the socket"""
289 c2a03789 Iustin Pop
    if self.socket is not None:
290 c2a03789 Iustin Pop
      self.socket.close()
291 c2a03789 Iustin Pop
      self.socket = None
292 c2a03789 Iustin Pop
293 c2a03789 Iustin Pop
294 231db3a5 Michael Hanselmann
def ParseRequest(msg):
295 231db3a5 Michael Hanselmann
  """Parses a LUXI request message.
296 231db3a5 Michael Hanselmann

297 231db3a5 Michael Hanselmann
  """
298 231db3a5 Michael Hanselmann
  try:
299 231db3a5 Michael Hanselmann
    request = serializer.LoadJson(msg)
300 231db3a5 Michael Hanselmann
  except ValueError, err:
301 231db3a5 Michael Hanselmann
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
302 231db3a5 Michael Hanselmann
303 231db3a5 Michael Hanselmann
  logging.debug("LUXI request: %s", request)
304 231db3a5 Michael Hanselmann
305 231db3a5 Michael Hanselmann
  if not isinstance(request, dict):
306 231db3a5 Michael Hanselmann
    logging.error("LUXI request not a dict: %r", msg)
307 231db3a5 Michael Hanselmann
    raise ProtocolError("Invalid LUXI request (not a dict)")
308 231db3a5 Michael Hanselmann
309 b459a848 Andrea Spadaccini
  method = request.get(KEY_METHOD, None) # pylint: disable=E1103
310 b459a848 Andrea Spadaccini
  args = request.get(KEY_ARGS, None) # pylint: disable=E1103
311 b459a848 Andrea Spadaccini
  version = request.get(KEY_VERSION, None) # pylint: disable=E1103
312 e7a25b08 Guido Trotter
313 231db3a5 Michael Hanselmann
  if method is None or args is None:
314 231db3a5 Michael Hanselmann
    logging.error("LUXI request missing method or arguments: %r", msg)
315 231db3a5 Michael Hanselmann
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
316 231db3a5 Michael Hanselmann
                         " in request): %r") % msg)
317 231db3a5 Michael Hanselmann
318 e986f20c Michael Hanselmann
  return (method, args, version)
319 231db3a5 Michael Hanselmann
320 231db3a5 Michael Hanselmann
321 231db3a5 Michael Hanselmann
def ParseResponse(msg):
322 231db3a5 Michael Hanselmann
  """Parses a LUXI response message.
323 231db3a5 Michael Hanselmann

324 231db3a5 Michael Hanselmann
  """
325 231db3a5 Michael Hanselmann
  # Parse the result
326 231db3a5 Michael Hanselmann
  try:
327 231db3a5 Michael Hanselmann
    data = serializer.LoadJson(msg)
328 d143f2c6 Iustin Pop
  except KeyboardInterrupt:
329 d143f2c6 Iustin Pop
    raise
330 231db3a5 Michael Hanselmann
  except Exception, err:
331 231db3a5 Michael Hanselmann
    raise ProtocolError("Error while deserializing response: %s" % str(err))
332 231db3a5 Michael Hanselmann
333 231db3a5 Michael Hanselmann
  # Validate response
334 231db3a5 Michael Hanselmann
  if not (isinstance(data, dict) and
335 231db3a5 Michael Hanselmann
          KEY_SUCCESS in data and
336 231db3a5 Michael Hanselmann
          KEY_RESULT in data):
337 231db3a5 Michael Hanselmann
    raise ProtocolError("Invalid response from server: %r" % data)
338 231db3a5 Michael Hanselmann
339 2317945a Guido Trotter
  return (data[KEY_SUCCESS], data[KEY_RESULT],
340 b459a848 Andrea Spadaccini
          data.get(KEY_VERSION, None)) # pylint: disable=E1103
341 231db3a5 Michael Hanselmann
342 231db3a5 Michael Hanselmann
343 e986f20c Michael Hanselmann
def FormatResponse(success, result, version=None):
344 231db3a5 Michael Hanselmann
  """Formats a LUXI response message.
345 231db3a5 Michael Hanselmann

346 231db3a5 Michael Hanselmann
  """
347 231db3a5 Michael Hanselmann
  response = {
348 231db3a5 Michael Hanselmann
    KEY_SUCCESS: success,
349 231db3a5 Michael Hanselmann
    KEY_RESULT: result,
350 231db3a5 Michael Hanselmann
    }
351 231db3a5 Michael Hanselmann
352 e986f20c Michael Hanselmann
  if version is not None:
353 e986f20c Michael Hanselmann
    response[KEY_VERSION] = version
354 e986f20c Michael Hanselmann
355 231db3a5 Michael Hanselmann
  logging.debug("LUXI response: %s", response)
356 231db3a5 Michael Hanselmann
357 231db3a5 Michael Hanselmann
  return serializer.DumpJson(response)
358 231db3a5 Michael Hanselmann
359 231db3a5 Michael Hanselmann
360 e986f20c Michael Hanselmann
def FormatRequest(method, args, version=None):
361 231db3a5 Michael Hanselmann
  """Formats a LUXI request message.
362 231db3a5 Michael Hanselmann

363 231db3a5 Michael Hanselmann
  """
364 231db3a5 Michael Hanselmann
  # Build request
365 231db3a5 Michael Hanselmann
  request = {
366 231db3a5 Michael Hanselmann
    KEY_METHOD: method,
367 231db3a5 Michael Hanselmann
    KEY_ARGS: args,
368 231db3a5 Michael Hanselmann
    }
369 231db3a5 Michael Hanselmann
370 e986f20c Michael Hanselmann
  if version is not None:
371 e986f20c Michael Hanselmann
    request[KEY_VERSION] = version
372 e986f20c Michael Hanselmann
373 231db3a5 Michael Hanselmann
  # Serialize the request
374 a182a3ed Michael Hanselmann
  return serializer.DumpJson(request)
375 231db3a5 Michael Hanselmann
376 231db3a5 Michael Hanselmann
377 e986f20c Michael Hanselmann
def CallLuxiMethod(transport_cb, method, args, version=None):
378 231db3a5 Michael Hanselmann
  """Send a LUXI request via a transport and return the response.
379 231db3a5 Michael Hanselmann

380 231db3a5 Michael Hanselmann
  """
381 231db3a5 Michael Hanselmann
  assert callable(transport_cb)
382 231db3a5 Michael Hanselmann
383 e986f20c Michael Hanselmann
  request_msg = FormatRequest(method, args, version=version)
384 231db3a5 Michael Hanselmann
385 231db3a5 Michael Hanselmann
  # Send request and wait for response
386 231db3a5 Michael Hanselmann
  response_msg = transport_cb(request_msg)
387 231db3a5 Michael Hanselmann
388 e986f20c Michael Hanselmann
  (success, result, resp_version) = ParseResponse(response_msg)
389 e986f20c Michael Hanselmann
390 e986f20c Michael Hanselmann
  # Verify version if there was one in the response
391 e986f20c Michael Hanselmann
  if resp_version is not None and resp_version != version:
392 e986f20c Michael Hanselmann
    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
393 e986f20c Michael Hanselmann
                           (version, resp_version))
394 231db3a5 Michael Hanselmann
395 231db3a5 Michael Hanselmann
  if success:
396 231db3a5 Michael Hanselmann
    return result
397 231db3a5 Michael Hanselmann
398 231db3a5 Michael Hanselmann
  errors.MaybeRaise(result)
399 231db3a5 Michael Hanselmann
  raise RequestError(result)
400 231db3a5 Michael Hanselmann
401 231db3a5 Michael Hanselmann
402 c2a03789 Iustin Pop
class Client(object):
403 c2a03789 Iustin Pop
  """High-level client implementation.
404 c2a03789 Iustin Pop

405 c2a03789 Iustin Pop
  This uses a backing Transport-like class on top of which it
406 c2a03789 Iustin Pop
  implements data serialization/deserialization.
407 c2a03789 Iustin Pop

408 c2a03789 Iustin Pop
  """
409 ceab32dd Iustin Pop
  def __init__(self, address=None, timeouts=None, transport=Transport):
410 c2a03789 Iustin Pop
    """Constructor for the Client class.
411 c2a03789 Iustin Pop

412 c2a03789 Iustin Pop
    Arguments:
413 c2a03789 Iustin Pop
      - address: a valid address the the used transport class
414 c2a03789 Iustin Pop
      - timeout: a list of timeouts, to be used on connect and read/write
415 c2a03789 Iustin Pop
      - transport: a Transport-like class
416 c2a03789 Iustin Pop

417 c2a03789 Iustin Pop

418 c2a03789 Iustin Pop
    If timeout is not passed, the default timeouts of the transport
419 c2a03789 Iustin Pop
    class are used.
420 c2a03789 Iustin Pop

421 c2a03789 Iustin Pop
    """
422 ceab32dd Iustin Pop
    if address is None:
423 b87ee98f Michael Hanselmann
      address = pathutils.MASTER_SOCKET
424 8d5b316c Iustin Pop
    self.address = address
425 8d5b316c Iustin Pop
    self.timeouts = timeouts
426 8d5b316c Iustin Pop
    self.transport_class = transport
427 8d5b316c Iustin Pop
    self.transport = None
428 8d5b316c Iustin Pop
    self._InitTransport()
429 8d5b316c Iustin Pop
430 8d5b316c Iustin Pop
  def _InitTransport(self):
431 8d5b316c Iustin Pop
    """(Re)initialize the transport if needed.
432 8d5b316c Iustin Pop

433 8d5b316c Iustin Pop
    """
434 8d5b316c Iustin Pop
    if self.transport is None:
435 8d5b316c Iustin Pop
      self.transport = self.transport_class(self.address,
436 8d5b316c Iustin Pop
                                            timeouts=self.timeouts)
437 8d5b316c Iustin Pop
438 8d5b316c Iustin Pop
  def _CloseTransport(self):
439 8d5b316c Iustin Pop
    """Close the transport, ignoring errors.
440 8d5b316c Iustin Pop

441 8d5b316c Iustin Pop
    """
442 8d5b316c Iustin Pop
    if self.transport is None:
443 8d5b316c Iustin Pop
      return
444 8d5b316c Iustin Pop
    try:
445 8d5b316c Iustin Pop
      old_transp = self.transport
446 8d5b316c Iustin Pop
      self.transport = None
447 8d5b316c Iustin Pop
      old_transp.Close()
448 b459a848 Andrea Spadaccini
    except Exception: # pylint: disable=W0703
449 8d5b316c Iustin Pop
      pass
450 c2a03789 Iustin Pop
451 231db3a5 Michael Hanselmann
  def _SendMethodCall(self, data):
452 3d8548c4 Michael Hanselmann
    # Send request and wait for response
453 8d5b316c Iustin Pop
    try:
454 8d5b316c Iustin Pop
      self._InitTransport()
455 231db3a5 Michael Hanselmann
      return self.transport.Call(data)
456 8d5b316c Iustin Pop
    except Exception:
457 8d5b316c Iustin Pop
      self._CloseTransport()
458 8d5b316c Iustin Pop
      raise
459 8d5b316c Iustin Pop
460 2a917701 Michael Hanselmann
  def Close(self):
461 2a917701 Michael Hanselmann
    """Close the underlying connection.
462 2a917701 Michael Hanselmann

463 2a917701 Michael Hanselmann
    """
464 2a917701 Michael Hanselmann
    self._CloseTransport()
465 2a917701 Michael Hanselmann
466 231db3a5 Michael Hanselmann
  def CallMethod(self, method, args):
467 231db3a5 Michael Hanselmann
    """Send a generic request and return the response.
468 3d8548c4 Michael Hanselmann

469 231db3a5 Michael Hanselmann
    """
470 a629ecb9 Iustin Pop
    if not isinstance(args, (list, tuple)):
471 a629ecb9 Iustin Pop
      raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
472 a629ecb9 Iustin Pop
                                   " expected list, got %s" % type(args))
473 e986f20c Michael Hanselmann
    return CallLuxiMethod(self._SendMethodCall, method, args,
474 e986f20c Michael Hanselmann
                          version=constants.LUXI_VERSION)
475 c2a03789 Iustin Pop
476 3ccafd0e Iustin Pop
  def SetQueueDrainFlag(self, drain_flag):
477 83c046a2 Iustin Pop
    return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
478 3ccafd0e Iustin Pop
479 05e50653 Michael Hanselmann
  def SetWatcherPause(self, until):
480 a629ecb9 Iustin Pop
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
481 05e50653 Michael Hanselmann
482 0bbe448c Michael Hanselmann
  def SubmitJob(self, ops):
483 0bbe448c Michael Hanselmann
    ops_state = map(lambda op: op.__getstate__(), ops)
484 734a2a7c René Nussbaumer
    return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
485 0bbe448c Michael Hanselmann
486 346c3037 Klaus Aehlig
  def SubmitJobToDrainedQueue(self, ops):
487 346c3037 Klaus Aehlig
    ops_state = map(lambda op: op.__getstate__(), ops)
488 346c3037 Klaus Aehlig
    return self.CallMethod(REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, (ops_state, ))
489 346c3037 Klaus Aehlig
490 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
491 2971c913 Iustin Pop
    jobs_state = []
492 2971c913 Iustin Pop
    for ops in jobs:
493 2971c913 Iustin Pop
      jobs_state.append([op.__getstate__() for op in ops])
494 734a2a7c René Nussbaumer
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
495 2971c913 Iustin Pop
496 0bbe448c Michael Hanselmann
  def CancelJob(self, job_id):
497 a629ecb9 Iustin Pop
    return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
498 0bbe448c Michael Hanselmann
499 0bbe448c Michael Hanselmann
  def ArchiveJob(self, job_id):
500 a629ecb9 Iustin Pop
    return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
501 0bbe448c Michael Hanselmann
502 f63ffb37 Michael Hanselmann
  def ChangeJobPriority(self, job_id, priority):
503 f63ffb37 Michael Hanselmann
    return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
504 f63ffb37 Michael Hanselmann
505 07cd723a Iustin Pop
  def AutoArchiveJobs(self, age):
506 f8ad5591 Michael Hanselmann
    timeout = (DEF_RWTO - 1) / 2
507 83c046a2 Iustin Pop
    return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
508 07cd723a Iustin Pop
509 f4484122 Michael Hanselmann
  def WaitForJobChangeOnce(self, job_id, fields,
510 793a8f7c Michael Hanselmann
                           prev_job_info, prev_log_serial,
511 793a8f7c Michael Hanselmann
                           timeout=WFJC_TIMEOUT):
512 793a8f7c Michael Hanselmann
    """Waits for changes on a job.
513 793a8f7c Michael Hanselmann

514 793a8f7c Michael Hanselmann
    @param job_id: Job ID
515 793a8f7c Michael Hanselmann
    @type fields: list
516 793a8f7c Michael Hanselmann
    @param fields: List of field names to be observed
517 793a8f7c Michael Hanselmann
    @type prev_job_info: None or list
518 793a8f7c Michael Hanselmann
    @param prev_job_info: Previously received job information
519 793a8f7c Michael Hanselmann
    @type prev_log_serial: None or int/long
520 793a8f7c Michael Hanselmann
    @param prev_log_serial: Highest log serial number previously received
521 793a8f7c Michael Hanselmann
    @type timeout: int/float
522 793a8f7c Michael Hanselmann
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
523 793a8f7c Michael Hanselmann
                    be capped to that value)
524 793a8f7c Michael Hanselmann

525 793a8f7c Michael Hanselmann
    """
526 793a8f7c Michael Hanselmann
    assert timeout >= 0, "Timeout can not be negative"
527 f4484122 Michael Hanselmann
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
528 f4484122 Michael Hanselmann
                           (job_id, fields, prev_job_info,
529 793a8f7c Michael Hanselmann
                            prev_log_serial,
530 793a8f7c Michael Hanselmann
                            min(WFJC_TIMEOUT, timeout)))
531 f4484122 Michael Hanselmann
532 f4484122 Michael Hanselmann
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
533 5c735209 Iustin Pop
    while True:
534 f4484122 Michael Hanselmann
      result = self.WaitForJobChangeOnce(job_id, fields,
535 f4484122 Michael Hanselmann
                                         prev_job_info, prev_log_serial)
536 5c735209 Iustin Pop
      if result != constants.JOB_NOTCHANGED:
537 5c735209 Iustin Pop
        break
538 5c735209 Iustin Pop
    return result
539 dfe57c22 Michael Hanselmann
540 2e5c33db Iustin Pop
  def Query(self, what, fields, qfilter):
541 28b71a76 Michael Hanselmann
    """Query for resources/items.
542 28b71a76 Michael Hanselmann

543 abd66bf8 Michael Hanselmann
    @param what: One of L{constants.QR_VIA_LUXI}
544 28b71a76 Michael Hanselmann
    @type fields: List of strings
545 28b71a76 Michael Hanselmann
    @param fields: List of requested fields
546 2e5c33db Iustin Pop
    @type qfilter: None or list
547 2e5c33db Iustin Pop
    @param qfilter: Query filter
548 28b71a76 Michael Hanselmann
    @rtype: L{objects.QueryResponse}
549 28b71a76 Michael Hanselmann

550 28b71a76 Michael Hanselmann
    """
551 a629ecb9 Iustin Pop
    result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
552 28b71a76 Michael Hanselmann
    return objects.QueryResponse.FromDict(result)
553 28b71a76 Michael Hanselmann
554 28b71a76 Michael Hanselmann
  def QueryFields(self, what, fields):
555 28b71a76 Michael Hanselmann
    """Query for available fields.
556 28b71a76 Michael Hanselmann

557 abd66bf8 Michael Hanselmann
    @param what: One of L{constants.QR_VIA_LUXI}
558 28b71a76 Michael Hanselmann
    @type fields: None or list of strings
559 28b71a76 Michael Hanselmann
    @param fields: List of requested fields
560 28b71a76 Michael Hanselmann
    @rtype: L{objects.QueryFieldsResponse}
561 28b71a76 Michael Hanselmann

562 28b71a76 Michael Hanselmann
    """
563 a629ecb9 Iustin Pop
    result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
564 28b71a76 Michael Hanselmann
    return objects.QueryFieldsResponse.FromDict(result)
565 28b71a76 Michael Hanselmann
566 0bbe448c Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
567 0bbe448c Michael Hanselmann
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
568 3d8548c4 Michael Hanselmann
569 ec79568d Iustin Pop
  def QueryInstances(self, names, fields, use_locking):
570 ec79568d Iustin Pop
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
571 ee6c7b94 Michael Hanselmann
572 ec79568d Iustin Pop
  def QueryNodes(self, names, fields, use_locking):
573 ec79568d Iustin Pop
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
574 02f7fe54 Michael Hanselmann
575 a79ef2a5 Adeodato Simo
  def QueryGroups(self, names, fields, use_locking):
576 a79ef2a5 Adeodato Simo
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
577 a79ef2a5 Adeodato Simo
578 306bed0e Apollon Oikonomopoulos
  def QueryNetworks(self, names, fields, use_locking):
579 306bed0e Apollon Oikonomopoulos
    return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
580 306bed0e Apollon Oikonomopoulos
581 ec79568d Iustin Pop
  def QueryExports(self, nodes, use_locking):
582 ec79568d Iustin Pop
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
583 32f93223 Michael Hanselmann
584 66baeccc Iustin Pop
  def QueryClusterInfo(self):
585 66baeccc Iustin Pop
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
586 66baeccc Iustin Pop
587 ae5849b5 Michael Hanselmann
  def QueryConfigValues(self, fields):
588 a629ecb9 Iustin Pop
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
589 ae5849b5 Michael Hanselmann
590 7699c3af Iustin Pop
  def QueryTags(self, kind, name):
591 7699c3af Iustin Pop
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))