Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 84a12e40

History | View | Annotate | Download (13.3 kB)

1 c2a03789 Iustin Pop
#
2 c2a03789 Iustin Pop
#
3 c2a03789 Iustin Pop
4 c2a03789 Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 c2a03789 Iustin Pop
#
6 c2a03789 Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 c2a03789 Iustin Pop
# it under the terms of the GNU General Public License as published by
8 c2a03789 Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 c2a03789 Iustin Pop
# (at your option) any later version.
10 c2a03789 Iustin Pop
#
11 c2a03789 Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 c2a03789 Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 c2a03789 Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 c2a03789 Iustin Pop
# General Public License for more details.
15 c2a03789 Iustin Pop
#
16 c2a03789 Iustin Pop
# You should have received a copy of the GNU General Public License
17 c2a03789 Iustin Pop
# along with this program; if not, write to the Free Software
18 c2a03789 Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 c2a03789 Iustin Pop
# 02110-1301, USA.
20 c2a03789 Iustin Pop
21 c2a03789 Iustin Pop
22 c2a03789 Iustin Pop
"""Module for the unix socket protocol
23 c2a03789 Iustin Pop

24 8d5b316c Iustin Pop
This module implements the local unix socket protocol. You only need
25 c2a03789 Iustin Pop
this module and the opcodes module in the client program in order to
26 c2a03789 Iustin Pop
communicate with the master.
27 c2a03789 Iustin Pop

28 7577196d Guido Trotter
The module is also used by the master daemon.
29 c2a03789 Iustin Pop

30 c2a03789 Iustin Pop
"""
31 c2a03789 Iustin Pop
32 c2a03789 Iustin Pop
import socket
33 c2a03789 Iustin Pop
import collections
34 c2a03789 Iustin Pop
import time
35 03a8dbdc Iustin Pop
import errno
36 231db3a5 Michael Hanselmann
import logging
37 c2a03789 Iustin Pop
38 fad50141 Michael Hanselmann
from ganeti import serializer
39 ceab32dd Iustin Pop
from ganeti import constants
40 6797ec29 Iustin Pop
from ganeti import errors
41 cb462b06 Michael Hanselmann
from ganeti import utils
42 c2a03789 Iustin Pop
43 c2a03789 Iustin Pop
44 231db3a5 Michael Hanselmann
KEY_METHOD = "method"
45 231db3a5 Michael Hanselmann
KEY_ARGS = "args"
46 3d8548c4 Michael Hanselmann
KEY_SUCCESS = "success"
47 3d8548c4 Michael Hanselmann
KEY_RESULT = "result"
48 3d8548c4 Michael Hanselmann
49 0bbe448c Michael Hanselmann
REQ_SUBMIT_JOB = "SubmitJob"
50 2971c913 Iustin Pop
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
51 dfe57c22 Michael Hanselmann
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
52 0bbe448c Michael Hanselmann
REQ_CANCEL_JOB = "CancelJob"
53 0bbe448c Michael Hanselmann
REQ_ARCHIVE_JOB = "ArchiveJob"
54 07cd723a Iustin Pop
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
55 0bbe448c Michael Hanselmann
REQ_QUERY_JOBS = "QueryJobs"
56 ee6c7b94 Michael Hanselmann
REQ_QUERY_INSTANCES = "QueryInstances"
57 02f7fe54 Michael Hanselmann
REQ_QUERY_NODES = "QueryNodes"
58 32f93223 Michael Hanselmann
REQ_QUERY_EXPORTS = "QueryExports"
59 ae5849b5 Michael Hanselmann
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
60 66baeccc Iustin Pop
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
61 7699c3af Iustin Pop
REQ_QUERY_TAGS = "QueryTags"
62 3ccafd0e Iustin Pop
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
63 05e50653 Michael Hanselmann
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
64 c2a03789 Iustin Pop
65 c2a03789 Iustin Pop
DEF_CTMO = 10
66 c2a03789 Iustin Pop
DEF_RWTO = 60
67 c2a03789 Iustin Pop
68 793a8f7c Michael Hanselmann
# WaitForJobChange timeout
69 793a8f7c Michael Hanselmann
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
70 793a8f7c Michael Hanselmann
71 c2a03789 Iustin Pop
72 797506fc Michael Hanselmann
class ProtocolError(errors.GenericError):
73 797506fc Michael Hanselmann
  """Denotes an error in the LUXI protocol"""
74 c2a03789 Iustin Pop
75 c2a03789 Iustin Pop
76 c2a03789 Iustin Pop
class ConnectionClosedError(ProtocolError):
77 c2a03789 Iustin Pop
  """Connection closed error"""
78 c2a03789 Iustin Pop
79 c2a03789 Iustin Pop
80 c2a03789 Iustin Pop
class TimeoutError(ProtocolError):
81 c2a03789 Iustin Pop
  """Operation timeout error"""
82 c2a03789 Iustin Pop
83 c2a03789 Iustin Pop
84 b77acb3e Iustin Pop
class RequestError(ProtocolError):
85 b77acb3e Iustin Pop
  """Error on request
86 b77acb3e Iustin Pop

87 b77acb3e Iustin Pop
  This signifies an error in the request format or request handling,
88 b77acb3e Iustin Pop
  but not (e.g.) an error in starting up an instance.
89 b77acb3e Iustin Pop

90 b77acb3e Iustin Pop
  Some common conditions that can trigger this exception:
91 b77acb3e Iustin Pop
    - job submission failed because the job data was wrong
92 b77acb3e Iustin Pop
    - query failed because required fields were missing
93 b77acb3e Iustin Pop

94 b77acb3e Iustin Pop
  """
95 b77acb3e Iustin Pop
96 3d8548c4 Michael Hanselmann
97 03a8dbdc Iustin Pop
class NoMasterError(ProtocolError):
98 03a8dbdc Iustin Pop
  """The master cannot be reached
99 03a8dbdc Iustin Pop

100 03a8dbdc Iustin Pop
  This means that the master daemon is not running or the socket has
101 03a8dbdc Iustin Pop
  been removed.
102 03a8dbdc Iustin Pop

103 03a8dbdc Iustin Pop
  """
104 03a8dbdc Iustin Pop
105 b77acb3e Iustin Pop
106 c2a03789 Iustin Pop
class Transport:
107 c2a03789 Iustin Pop
  """Low-level transport class.
108 c2a03789 Iustin Pop

109 c2a03789 Iustin Pop
  This is used on the client side.
110 c2a03789 Iustin Pop

111 c2a03789 Iustin Pop
  This could be replace by any other class that provides the same
112 c2a03789 Iustin Pop
  semantics to the Client. This means:
113 c2a03789 Iustin Pop
    - can send messages and receive messages
114 c2a03789 Iustin Pop
    - safe for multithreading
115 c2a03789 Iustin Pop

116 c2a03789 Iustin Pop
  """
117 c2a03789 Iustin Pop
118 25942a6c Guido Trotter
  def __init__(self, address, timeouts=None):
119 c2a03789 Iustin Pop
    """Constructor for the Client class.
120 c2a03789 Iustin Pop

121 c2a03789 Iustin Pop
    Arguments:
122 c2a03789 Iustin Pop
      - address: a valid address the the used transport class
123 c2a03789 Iustin Pop
      - timeout: a list of timeouts, to be used on connect and read/write
124 c2a03789 Iustin Pop

125 c2a03789 Iustin Pop
    There are two timeouts used since we might want to wait for a long
126 c2a03789 Iustin Pop
    time for a response, but the connect timeout should be lower.
127 c2a03789 Iustin Pop

128 c2a03789 Iustin Pop
    If not passed, we use a default of 10 and respectively 60 seconds.
129 c2a03789 Iustin Pop

130 c2a03789 Iustin Pop
    Note that on reading data, since the timeout applies to an
131 c2a03789 Iustin Pop
    invidual receive, it might be that the total duration is longer
132 c2a03789 Iustin Pop
    than timeout value passed (we make a hard limit at twice the read
133 c2a03789 Iustin Pop
    timeout).
134 c2a03789 Iustin Pop

135 c2a03789 Iustin Pop
    """
136 c2a03789 Iustin Pop
    self.address = address
137 c2a03789 Iustin Pop
    if timeouts is None:
138 c2a03789 Iustin Pop
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
139 c2a03789 Iustin Pop
    else:
140 c2a03789 Iustin Pop
      self._ctimeout, self._rwtimeout = timeouts
141 c2a03789 Iustin Pop
142 c2a03789 Iustin Pop
    self.socket = None
143 c2a03789 Iustin Pop
    self._buffer = ""
144 c2a03789 Iustin Pop
    self._msgs = collections.deque()
145 c2a03789 Iustin Pop
146 c2a03789 Iustin Pop
    try:
147 c2a03789 Iustin Pop
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
148 cb462b06 Michael Hanselmann
149 cb462b06 Michael Hanselmann
      # Try to connect
150 c2a03789 Iustin Pop
      try:
151 cb462b06 Michael Hanselmann
        utils.Retry(self._Connect, 1.0, self._ctimeout,
152 cb462b06 Michael Hanselmann
                    args=(self.socket, address, self._ctimeout))
153 cb462b06 Michael Hanselmann
      except utils.RetryTimeout:
154 cb462b06 Michael Hanselmann
        raise TimeoutError("Connect timed out")
155 cb462b06 Michael Hanselmann
156 c2a03789 Iustin Pop
      self.socket.settimeout(self._rwtimeout)
157 03a8dbdc Iustin Pop
    except (socket.error, NoMasterError):
158 c2a03789 Iustin Pop
      if self.socket is not None:
159 c2a03789 Iustin Pop
        self.socket.close()
160 c2a03789 Iustin Pop
      self.socket = None
161 c2a03789 Iustin Pop
      raise
162 c2a03789 Iustin Pop
163 cb462b06 Michael Hanselmann
  @staticmethod
164 cb462b06 Michael Hanselmann
  def _Connect(sock, address, timeout):
165 cb462b06 Michael Hanselmann
    sock.settimeout(timeout)
166 cb462b06 Michael Hanselmann
    try:
167 cb462b06 Michael Hanselmann
      sock.connect(address)
168 cb462b06 Michael Hanselmann
    except socket.timeout, err:
169 cb462b06 Michael Hanselmann
      raise TimeoutError("Connect timed out: %s" % str(err))
170 cb462b06 Michael Hanselmann
    except socket.error, err:
171 cb462b06 Michael Hanselmann
      if err.args[0] in (errno.ENOENT, errno.ECONNREFUSED):
172 cb462b06 Michael Hanselmann
        raise NoMasterError(address)
173 cb462b06 Michael Hanselmann
      if err.args[0] == errno.EAGAIN:
174 cb462b06 Michael Hanselmann
        # Server's socket backlog is full at the moment
175 cb462b06 Michael Hanselmann
        raise utils.RetryAgain()
176 cb462b06 Michael Hanselmann
      raise
177 cb462b06 Michael Hanselmann
178 c2a03789 Iustin Pop
  def _CheckSocket(self):
179 c2a03789 Iustin Pop
    """Make sure we are connected.
180 c2a03789 Iustin Pop

181 c2a03789 Iustin Pop
    """
182 c2a03789 Iustin Pop
    if self.socket is None:
183 c2a03789 Iustin Pop
      raise ProtocolError("Connection is closed")
184 c2a03789 Iustin Pop
185 c2a03789 Iustin Pop
  def Send(self, msg):
186 c2a03789 Iustin Pop
    """Send a message.
187 c2a03789 Iustin Pop

188 c2a03789 Iustin Pop
    This just sends a message and doesn't wait for the response.
189 c2a03789 Iustin Pop

190 c2a03789 Iustin Pop
    """
191 25942a6c Guido Trotter
    if constants.LUXI_EOM in msg:
192 797506fc Michael Hanselmann
      raise ProtocolError("Message terminator found in payload")
193 797506fc Michael Hanselmann
194 c2a03789 Iustin Pop
    self._CheckSocket()
195 c2a03789 Iustin Pop
    try:
196 6096ee13 Michael Hanselmann
      # TODO: sendall is not guaranteed to send everything
197 25942a6c Guido Trotter
      self.socket.sendall(msg + constants.LUXI_EOM)
198 c2a03789 Iustin Pop
    except socket.timeout, err:
199 c2a03789 Iustin Pop
      raise TimeoutError("Sending timeout: %s" % str(err))
200 c2a03789 Iustin Pop
201 c2a03789 Iustin Pop
  def Recv(self):
202 5bbd3f7f Michael Hanselmann
    """Try to receive a message from the socket.
203 c2a03789 Iustin Pop

204 c2a03789 Iustin Pop
    In case we already have messages queued, we just return from the
205 c2a03789 Iustin Pop
    queue. Otherwise, we try to read data with a _rwtimeout network
206 c2a03789 Iustin Pop
    timeout, and making sure we don't go over 2x_rwtimeout as a global
207 c2a03789 Iustin Pop
    limit.
208 c2a03789 Iustin Pop

209 c2a03789 Iustin Pop
    """
210 c2a03789 Iustin Pop
    self._CheckSocket()
211 c2a03789 Iustin Pop
    etime = time.time() + self._rwtimeout
212 c2a03789 Iustin Pop
    while not self._msgs:
213 c2a03789 Iustin Pop
      if time.time() > etime:
214 c2a03789 Iustin Pop
        raise TimeoutError("Extended receive timeout")
215 6096ee13 Michael Hanselmann
      while True:
216 6096ee13 Michael Hanselmann
        try:
217 6096ee13 Michael Hanselmann
          data = self.socket.recv(4096)
218 6096ee13 Michael Hanselmann
        except socket.error, err:
219 6096ee13 Michael Hanselmann
          if err.args and err.args[0] == errno.EAGAIN:
220 6096ee13 Michael Hanselmann
            continue
221 6096ee13 Michael Hanselmann
          raise
222 6096ee13 Michael Hanselmann
        except socket.timeout, err:
223 6096ee13 Michael Hanselmann
          raise TimeoutError("Receive timeout: %s" % str(err))
224 6096ee13 Michael Hanselmann
        break
225 c2a03789 Iustin Pop
      if not data:
226 c2a03789 Iustin Pop
        raise ConnectionClosedError("Connection closed while reading")
227 25942a6c Guido Trotter
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
228 c2a03789 Iustin Pop
      self._buffer = new_msgs.pop()
229 c2a03789 Iustin Pop
      self._msgs.extend(new_msgs)
230 c2a03789 Iustin Pop
    return self._msgs.popleft()
231 c2a03789 Iustin Pop
232 c2a03789 Iustin Pop
  def Call(self, msg):
233 c2a03789 Iustin Pop
    """Send a message and wait for the response.
234 c2a03789 Iustin Pop

235 c2a03789 Iustin Pop
    This is just a wrapper over Send and Recv.
236 c2a03789 Iustin Pop

237 c2a03789 Iustin Pop
    """
238 c2a03789 Iustin Pop
    self.Send(msg)
239 c2a03789 Iustin Pop
    return self.Recv()
240 c2a03789 Iustin Pop
241 c2a03789 Iustin Pop
  def Close(self):
242 c2a03789 Iustin Pop
    """Close the socket"""
243 c2a03789 Iustin Pop
    if self.socket is not None:
244 c2a03789 Iustin Pop
      self.socket.close()
245 c2a03789 Iustin Pop
      self.socket = None
246 c2a03789 Iustin Pop
247 c2a03789 Iustin Pop
248 231db3a5 Michael Hanselmann
def ParseRequest(msg):
249 231db3a5 Michael Hanselmann
  """Parses a LUXI request message.
250 231db3a5 Michael Hanselmann

251 231db3a5 Michael Hanselmann
  """
252 231db3a5 Michael Hanselmann
  try:
253 231db3a5 Michael Hanselmann
    request = serializer.LoadJson(msg)
254 231db3a5 Michael Hanselmann
  except ValueError, err:
255 231db3a5 Michael Hanselmann
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
256 231db3a5 Michael Hanselmann
257 231db3a5 Michael Hanselmann
  logging.debug("LUXI request: %s", request)
258 231db3a5 Michael Hanselmann
259 231db3a5 Michael Hanselmann
  if not isinstance(request, dict):
260 231db3a5 Michael Hanselmann
    logging.error("LUXI request not a dict: %r", msg)
261 231db3a5 Michael Hanselmann
    raise ProtocolError("Invalid LUXI request (not a dict)")
262 231db3a5 Michael Hanselmann
263 e7a25b08 Guido Trotter
  method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
264 e7a25b08 Guido Trotter
  args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
265 e7a25b08 Guido Trotter
266 231db3a5 Michael Hanselmann
  if method is None or args is None:
267 231db3a5 Michael Hanselmann
    logging.error("LUXI request missing method or arguments: %r", msg)
268 231db3a5 Michael Hanselmann
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
269 231db3a5 Michael Hanselmann
                         " in request): %r") % msg)
270 231db3a5 Michael Hanselmann
271 231db3a5 Michael Hanselmann
  return (method, args)
272 231db3a5 Michael Hanselmann
273 231db3a5 Michael Hanselmann
274 231db3a5 Michael Hanselmann
def ParseResponse(msg):
275 231db3a5 Michael Hanselmann
  """Parses a LUXI response message.
276 231db3a5 Michael Hanselmann

277 231db3a5 Michael Hanselmann
  """
278 231db3a5 Michael Hanselmann
  # Parse the result
279 231db3a5 Michael Hanselmann
  try:
280 231db3a5 Michael Hanselmann
    data = serializer.LoadJson(msg)
281 231db3a5 Michael Hanselmann
  except Exception, err:
282 231db3a5 Michael Hanselmann
    raise ProtocolError("Error while deserializing response: %s" % str(err))
283 231db3a5 Michael Hanselmann
284 231db3a5 Michael Hanselmann
  # Validate response
285 231db3a5 Michael Hanselmann
  if not (isinstance(data, dict) and
286 231db3a5 Michael Hanselmann
          KEY_SUCCESS in data and
287 231db3a5 Michael Hanselmann
          KEY_RESULT in data):
288 231db3a5 Michael Hanselmann
    raise ProtocolError("Invalid response from server: %r" % data)
289 231db3a5 Michael Hanselmann
290 231db3a5 Michael Hanselmann
  return (data[KEY_SUCCESS], data[KEY_RESULT])
291 231db3a5 Michael Hanselmann
292 231db3a5 Michael Hanselmann
293 231db3a5 Michael Hanselmann
def FormatResponse(success, result):
294 231db3a5 Michael Hanselmann
  """Formats a LUXI response message.
295 231db3a5 Michael Hanselmann

296 231db3a5 Michael Hanselmann
  """
297 231db3a5 Michael Hanselmann
  response = {
298 231db3a5 Michael Hanselmann
    KEY_SUCCESS: success,
299 231db3a5 Michael Hanselmann
    KEY_RESULT: result,
300 231db3a5 Michael Hanselmann
    }
301 231db3a5 Michael Hanselmann
302 231db3a5 Michael Hanselmann
  logging.debug("LUXI response: %s", response)
303 231db3a5 Michael Hanselmann
304 231db3a5 Michael Hanselmann
  return serializer.DumpJson(response)
305 231db3a5 Michael Hanselmann
306 231db3a5 Michael Hanselmann
307 231db3a5 Michael Hanselmann
def FormatRequest(method, args):
308 231db3a5 Michael Hanselmann
  """Formats a LUXI request message.
309 231db3a5 Michael Hanselmann

310 231db3a5 Michael Hanselmann
  """
311 231db3a5 Michael Hanselmann
  # Build request
312 231db3a5 Michael Hanselmann
  request = {
313 231db3a5 Michael Hanselmann
    KEY_METHOD: method,
314 231db3a5 Michael Hanselmann
    KEY_ARGS: args,
315 231db3a5 Michael Hanselmann
    }
316 231db3a5 Michael Hanselmann
317 231db3a5 Michael Hanselmann
  # Serialize the request
318 231db3a5 Michael Hanselmann
  return serializer.DumpJson(request, indent=False)
319 231db3a5 Michael Hanselmann
320 231db3a5 Michael Hanselmann
321 231db3a5 Michael Hanselmann
def CallLuxiMethod(transport_cb, method, args):
322 231db3a5 Michael Hanselmann
  """Send a LUXI request via a transport and return the response.
323 231db3a5 Michael Hanselmann

324 231db3a5 Michael Hanselmann
  """
325 231db3a5 Michael Hanselmann
  assert callable(transport_cb)
326 231db3a5 Michael Hanselmann
327 231db3a5 Michael Hanselmann
  request_msg = FormatRequest(method, args)
328 231db3a5 Michael Hanselmann
329 231db3a5 Michael Hanselmann
  # Send request and wait for response
330 231db3a5 Michael Hanselmann
  response_msg = transport_cb(request_msg)
331 231db3a5 Michael Hanselmann
332 231db3a5 Michael Hanselmann
  (success, result) = ParseResponse(response_msg)
333 231db3a5 Michael Hanselmann
334 231db3a5 Michael Hanselmann
  if success:
335 231db3a5 Michael Hanselmann
    return result
336 231db3a5 Michael Hanselmann
337 231db3a5 Michael Hanselmann
  errors.MaybeRaise(result)
338 231db3a5 Michael Hanselmann
  raise RequestError(result)
339 231db3a5 Michael Hanselmann
340 231db3a5 Michael Hanselmann
341 c2a03789 Iustin Pop
class Client(object):
342 c2a03789 Iustin Pop
  """High-level client implementation.
343 c2a03789 Iustin Pop

344 c2a03789 Iustin Pop
  This uses a backing Transport-like class on top of which it
345 c2a03789 Iustin Pop
  implements data serialization/deserialization.
346 c2a03789 Iustin Pop

347 c2a03789 Iustin Pop
  """
348 ceab32dd Iustin Pop
  def __init__(self, address=None, timeouts=None, transport=Transport):
349 c2a03789 Iustin Pop
    """Constructor for the Client class.
350 c2a03789 Iustin Pop

351 c2a03789 Iustin Pop
    Arguments:
352 c2a03789 Iustin Pop
      - address: a valid address the the used transport class
353 c2a03789 Iustin Pop
      - timeout: a list of timeouts, to be used on connect and read/write
354 c2a03789 Iustin Pop
      - transport: a Transport-like class
355 c2a03789 Iustin Pop

356 c2a03789 Iustin Pop

357 c2a03789 Iustin Pop
    If timeout is not passed, the default timeouts of the transport
358 c2a03789 Iustin Pop
    class are used.
359 c2a03789 Iustin Pop

360 c2a03789 Iustin Pop
    """
361 ceab32dd Iustin Pop
    if address is None:
362 ceab32dd Iustin Pop
      address = constants.MASTER_SOCKET
363 8d5b316c Iustin Pop
    self.address = address
364 8d5b316c Iustin Pop
    self.timeouts = timeouts
365 8d5b316c Iustin Pop
    self.transport_class = transport
366 8d5b316c Iustin Pop
    self.transport = None
367 8d5b316c Iustin Pop
    self._InitTransport()
368 8d5b316c Iustin Pop
369 8d5b316c Iustin Pop
  def _InitTransport(self):
370 8d5b316c Iustin Pop
    """(Re)initialize the transport if needed.
371 8d5b316c Iustin Pop

372 8d5b316c Iustin Pop
    """
373 8d5b316c Iustin Pop
    if self.transport is None:
374 8d5b316c Iustin Pop
      self.transport = self.transport_class(self.address,
375 8d5b316c Iustin Pop
                                            timeouts=self.timeouts)
376 8d5b316c Iustin Pop
377 8d5b316c Iustin Pop
  def _CloseTransport(self):
378 8d5b316c Iustin Pop
    """Close the transport, ignoring errors.
379 8d5b316c Iustin Pop

380 8d5b316c Iustin Pop
    """
381 8d5b316c Iustin Pop
    if self.transport is None:
382 8d5b316c Iustin Pop
      return
383 8d5b316c Iustin Pop
    try:
384 8d5b316c Iustin Pop
      old_transp = self.transport
385 8d5b316c Iustin Pop
      self.transport = None
386 8d5b316c Iustin Pop
      old_transp.Close()
387 7260cfbe Iustin Pop
    except Exception: # pylint: disable-msg=W0703
388 8d5b316c Iustin Pop
      pass
389 c2a03789 Iustin Pop
390 231db3a5 Michael Hanselmann
  def _SendMethodCall(self, data):
391 3d8548c4 Michael Hanselmann
    # Send request and wait for response
392 8d5b316c Iustin Pop
    try:
393 8d5b316c Iustin Pop
      self._InitTransport()
394 231db3a5 Michael Hanselmann
      return self.transport.Call(data)
395 8d5b316c Iustin Pop
    except Exception:
396 8d5b316c Iustin Pop
      self._CloseTransport()
397 8d5b316c Iustin Pop
      raise
398 8d5b316c Iustin Pop
399 231db3a5 Michael Hanselmann
  def CallMethod(self, method, args):
400 231db3a5 Michael Hanselmann
    """Send a generic request and return the response.
401 3d8548c4 Michael Hanselmann

402 231db3a5 Michael Hanselmann
    """
403 231db3a5 Michael Hanselmann
    return CallLuxiMethod(self._SendMethodCall, method, args)
404 c2a03789 Iustin Pop
405 3ccafd0e Iustin Pop
  def SetQueueDrainFlag(self, drain_flag):
406 3ccafd0e Iustin Pop
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
407 3ccafd0e Iustin Pop
408 05e50653 Michael Hanselmann
  def SetWatcherPause(self, until):
409 05e50653 Michael Hanselmann
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
410 05e50653 Michael Hanselmann
411 0bbe448c Michael Hanselmann
  def SubmitJob(self, ops):
412 0bbe448c Michael Hanselmann
    ops_state = map(lambda op: op.__getstate__(), ops)
413 0bbe448c Michael Hanselmann
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
414 0bbe448c Michael Hanselmann
415 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
416 2971c913 Iustin Pop
    jobs_state = []
417 2971c913 Iustin Pop
    for ops in jobs:
418 2971c913 Iustin Pop
      jobs_state.append([op.__getstate__() for op in ops])
419 2971c913 Iustin Pop
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
420 2971c913 Iustin Pop
421 0bbe448c Michael Hanselmann
  def CancelJob(self, job_id):
422 0bbe448c Michael Hanselmann
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
423 0bbe448c Michael Hanselmann
424 0bbe448c Michael Hanselmann
  def ArchiveJob(self, job_id):
425 0bbe448c Michael Hanselmann
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
426 0bbe448c Michael Hanselmann
427 07cd723a Iustin Pop
  def AutoArchiveJobs(self, age):
428 f8ad5591 Michael Hanselmann
    timeout = (DEF_RWTO - 1) / 2
429 f8ad5591 Michael Hanselmann
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
430 07cd723a Iustin Pop
431 f4484122 Michael Hanselmann
  def WaitForJobChangeOnce(self, job_id, fields,
432 793a8f7c Michael Hanselmann
                           prev_job_info, prev_log_serial,
433 793a8f7c Michael Hanselmann
                           timeout=WFJC_TIMEOUT):
434 793a8f7c Michael Hanselmann
    """Waits for changes on a job.
435 793a8f7c Michael Hanselmann

436 793a8f7c Michael Hanselmann
    @param job_id: Job ID
437 793a8f7c Michael Hanselmann
    @type fields: list
438 793a8f7c Michael Hanselmann
    @param fields: List of field names to be observed
439 793a8f7c Michael Hanselmann
    @type prev_job_info: None or list
440 793a8f7c Michael Hanselmann
    @param prev_job_info: Previously received job information
441 793a8f7c Michael Hanselmann
    @type prev_log_serial: None or int/long
442 793a8f7c Michael Hanselmann
    @param prev_log_serial: Highest log serial number previously received
443 793a8f7c Michael Hanselmann
    @type timeout: int/float
444 793a8f7c Michael Hanselmann
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
445 793a8f7c Michael Hanselmann
                    be capped to that value)
446 793a8f7c Michael Hanselmann

447 793a8f7c Michael Hanselmann
    """
448 793a8f7c Michael Hanselmann
    assert timeout >= 0, "Timeout can not be negative"
449 f4484122 Michael Hanselmann
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
450 f4484122 Michael Hanselmann
                           (job_id, fields, prev_job_info,
451 793a8f7c Michael Hanselmann
                            prev_log_serial,
452 793a8f7c Michael Hanselmann
                            min(WFJC_TIMEOUT, timeout)))
453 f4484122 Michael Hanselmann
454 f4484122 Michael Hanselmann
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
455 5c735209 Iustin Pop
    while True:
456 f4484122 Michael Hanselmann
      result = self.WaitForJobChangeOnce(job_id, fields,
457 f4484122 Michael Hanselmann
                                         prev_job_info, prev_log_serial)
458 5c735209 Iustin Pop
      if result != constants.JOB_NOTCHANGED:
459 5c735209 Iustin Pop
        break
460 5c735209 Iustin Pop
    return result
461 dfe57c22 Michael Hanselmann
462 0bbe448c Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
463 0bbe448c Michael Hanselmann
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
464 3d8548c4 Michael Hanselmann
465 ec79568d Iustin Pop
  def QueryInstances(self, names, fields, use_locking):
466 ec79568d Iustin Pop
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
467 ee6c7b94 Michael Hanselmann
468 ec79568d Iustin Pop
  def QueryNodes(self, names, fields, use_locking):
469 ec79568d Iustin Pop
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
470 02f7fe54 Michael Hanselmann
471 ec79568d Iustin Pop
  def QueryExports(self, nodes, use_locking):
472 ec79568d Iustin Pop
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
473 32f93223 Michael Hanselmann
474 66baeccc Iustin Pop
  def QueryClusterInfo(self):
475 66baeccc Iustin Pop
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
476 66baeccc Iustin Pop
477 ae5849b5 Michael Hanselmann
  def QueryConfigValues(self, fields):
478 ae5849b5 Michael Hanselmann
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
479 ae5849b5 Michael Hanselmann
480 7699c3af Iustin Pop
  def QueryTags(self, kind, name):
481 7699c3af Iustin Pop
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))