Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 4008c8ed

History | View | Annotate | Download (13.7 kB)

1 c2a03789 Iustin Pop
#
2 c2a03789 Iustin Pop
#
3 c2a03789 Iustin Pop
4 c2a03789 Iustin Pop
# Copyright (C) 2006, 2007 Google Inc.
5 c2a03789 Iustin Pop
#
6 c2a03789 Iustin Pop
# This program is free software; you can redistribute it and/or modify
7 c2a03789 Iustin Pop
# it under the terms of the GNU General Public License as published by
8 c2a03789 Iustin Pop
# the Free Software Foundation; either version 2 of the License, or
9 c2a03789 Iustin Pop
# (at your option) any later version.
10 c2a03789 Iustin Pop
#
11 c2a03789 Iustin Pop
# This program is distributed in the hope that it will be useful, but
12 c2a03789 Iustin Pop
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 c2a03789 Iustin Pop
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 c2a03789 Iustin Pop
# General Public License for more details.
15 c2a03789 Iustin Pop
#
16 c2a03789 Iustin Pop
# You should have received a copy of the GNU General Public License
17 c2a03789 Iustin Pop
# along with this program; if not, write to the Free Software
18 c2a03789 Iustin Pop
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 c2a03789 Iustin Pop
# 02110-1301, USA.
20 c2a03789 Iustin Pop
21 c2a03789 Iustin Pop
22 c2a03789 Iustin Pop
"""Module for the unix socket protocol
23 c2a03789 Iustin Pop

24 8d5b316c Iustin Pop
This module implements the local unix socket protocol. You only need
25 c2a03789 Iustin Pop
this module and the opcodes module in the client program in order to
26 c2a03789 Iustin Pop
communicate with the master.
27 c2a03789 Iustin Pop

28 7577196d Guido Trotter
The module is also used by the master daemon.
29 c2a03789 Iustin Pop

30 c2a03789 Iustin Pop
"""
31 c2a03789 Iustin Pop
32 c2a03789 Iustin Pop
import socket
33 c2a03789 Iustin Pop
import collections
34 c2a03789 Iustin Pop
import time
35 03a8dbdc Iustin Pop
import errno
36 231db3a5 Michael Hanselmann
import logging
37 c2a03789 Iustin Pop
38 fad50141 Michael Hanselmann
from ganeti import serializer
39 ceab32dd Iustin Pop
from ganeti import constants
40 6797ec29 Iustin Pop
from ganeti import errors
41 cb462b06 Michael Hanselmann
from ganeti import utils
42 c2a03789 Iustin Pop
43 c2a03789 Iustin Pop
44 231db3a5 Michael Hanselmann
KEY_METHOD = "method"
45 231db3a5 Michael Hanselmann
KEY_ARGS = "args"
46 3d8548c4 Michael Hanselmann
KEY_SUCCESS = "success"
47 3d8548c4 Michael Hanselmann
KEY_RESULT = "result"
48 3d8548c4 Michael Hanselmann
49 0bbe448c Michael Hanselmann
REQ_SUBMIT_JOB = "SubmitJob"
50 2971c913 Iustin Pop
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
51 dfe57c22 Michael Hanselmann
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
52 0bbe448c Michael Hanselmann
REQ_CANCEL_JOB = "CancelJob"
53 0bbe448c Michael Hanselmann
REQ_ARCHIVE_JOB = "ArchiveJob"
54 07cd723a Iustin Pop
REQ_AUTOARCHIVE_JOBS = "AutoArchiveJobs"
55 0bbe448c Michael Hanselmann
REQ_QUERY_JOBS = "QueryJobs"
56 ee6c7b94 Michael Hanselmann
REQ_QUERY_INSTANCES = "QueryInstances"
57 02f7fe54 Michael Hanselmann
REQ_QUERY_NODES = "QueryNodes"
58 32f93223 Michael Hanselmann
REQ_QUERY_EXPORTS = "QueryExports"
59 ae5849b5 Michael Hanselmann
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
60 66baeccc Iustin Pop
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
61 7699c3af Iustin Pop
REQ_QUERY_TAGS = "QueryTags"
62 19b9ba9a Michael Hanselmann
REQ_QUERY_LOCKS = "QueryLocks"
63 3ccafd0e Iustin Pop
REQ_QUEUE_SET_DRAIN_FLAG = "SetDrainFlag"
64 05e50653 Michael Hanselmann
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
65 c2a03789 Iustin Pop
66 c2a03789 Iustin Pop
DEF_CTMO = 10
67 c2a03789 Iustin Pop
DEF_RWTO = 60
68 c2a03789 Iustin Pop
69 793a8f7c Michael Hanselmann
# WaitForJobChange timeout
70 793a8f7c Michael Hanselmann
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
71 793a8f7c Michael Hanselmann
72 c2a03789 Iustin Pop
73 797506fc Michael Hanselmann
class ProtocolError(errors.GenericError):
74 5a1c22fe Iustin Pop
  """Denotes an error in the LUXI protocol."""
75 c2a03789 Iustin Pop
76 c2a03789 Iustin Pop
77 c2a03789 Iustin Pop
class ConnectionClosedError(ProtocolError):
78 5a1c22fe Iustin Pop
  """Connection closed error."""
79 c2a03789 Iustin Pop
80 c2a03789 Iustin Pop
81 c2a03789 Iustin Pop
class TimeoutError(ProtocolError):
82 5a1c22fe Iustin Pop
  """Operation timeout error."""
83 c2a03789 Iustin Pop
84 c2a03789 Iustin Pop
85 b77acb3e Iustin Pop
class RequestError(ProtocolError):
86 5a1c22fe Iustin Pop
  """Error on request.
87 b77acb3e Iustin Pop

88 b77acb3e Iustin Pop
  This signifies an error in the request format or request handling,
89 b77acb3e Iustin Pop
  but not (e.g.) an error in starting up an instance.
90 b77acb3e Iustin Pop

91 b77acb3e Iustin Pop
  Some common conditions that can trigger this exception:
92 b77acb3e Iustin Pop
    - job submission failed because the job data was wrong
93 b77acb3e Iustin Pop
    - query failed because required fields were missing
94 b77acb3e Iustin Pop

95 b77acb3e Iustin Pop
  """
96 b77acb3e Iustin Pop
97 3d8548c4 Michael Hanselmann
98 03a8dbdc Iustin Pop
class NoMasterError(ProtocolError):
99 5a1c22fe Iustin Pop
  """The master cannot be reached.
100 03a8dbdc Iustin Pop

101 03a8dbdc Iustin Pop
  This means that the master daemon is not running or the socket has
102 03a8dbdc Iustin Pop
  been removed.
103 03a8dbdc Iustin Pop

104 03a8dbdc Iustin Pop
  """
105 03a8dbdc Iustin Pop
106 b77acb3e Iustin Pop
107 5a1c22fe Iustin Pop
class PermissionError(ProtocolError):
108 5a1c22fe Iustin Pop
  """Permission denied while connecting to the master socket.
109 5a1c22fe Iustin Pop

110 5a1c22fe Iustin Pop
  This means the user doesn't have the proper rights.
111 5a1c22fe Iustin Pop

112 5a1c22fe Iustin Pop
  """
113 5a1c22fe Iustin Pop
114 5a1c22fe Iustin Pop
115 c2a03789 Iustin Pop
class Transport:
116 c2a03789 Iustin Pop
  """Low-level transport class.
117 c2a03789 Iustin Pop

118 c2a03789 Iustin Pop
  This is used on the client side.
119 c2a03789 Iustin Pop

120 c2a03789 Iustin Pop
  This could be replace by any other class that provides the same
121 c2a03789 Iustin Pop
  semantics to the Client. This means:
122 c2a03789 Iustin Pop
    - can send messages and receive messages
123 c2a03789 Iustin Pop
    - safe for multithreading
124 c2a03789 Iustin Pop

125 c2a03789 Iustin Pop
  """
126 c2a03789 Iustin Pop
127 25942a6c Guido Trotter
  def __init__(self, address, timeouts=None):
128 c2a03789 Iustin Pop
    """Constructor for the Client class.
129 c2a03789 Iustin Pop

130 c2a03789 Iustin Pop
    Arguments:
131 c2a03789 Iustin Pop
      - address: a valid address the the used transport class
132 c2a03789 Iustin Pop
      - timeout: a list of timeouts, to be used on connect and read/write
133 c2a03789 Iustin Pop

134 c2a03789 Iustin Pop
    There are two timeouts used since we might want to wait for a long
135 c2a03789 Iustin Pop
    time for a response, but the connect timeout should be lower.
136 c2a03789 Iustin Pop

137 c2a03789 Iustin Pop
    If not passed, we use a default of 10 and respectively 60 seconds.
138 c2a03789 Iustin Pop

139 c2a03789 Iustin Pop
    Note that on reading data, since the timeout applies to an
140 c2a03789 Iustin Pop
    invidual receive, it might be that the total duration is longer
141 c2a03789 Iustin Pop
    than timeout value passed (we make a hard limit at twice the read
142 c2a03789 Iustin Pop
    timeout).
143 c2a03789 Iustin Pop

144 c2a03789 Iustin Pop
    """
145 c2a03789 Iustin Pop
    self.address = address
146 c2a03789 Iustin Pop
    if timeouts is None:
147 c2a03789 Iustin Pop
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
148 c2a03789 Iustin Pop
    else:
149 c2a03789 Iustin Pop
      self._ctimeout, self._rwtimeout = timeouts
150 c2a03789 Iustin Pop
151 c2a03789 Iustin Pop
    self.socket = None
152 c2a03789 Iustin Pop
    self._buffer = ""
153 c2a03789 Iustin Pop
    self._msgs = collections.deque()
154 c2a03789 Iustin Pop
155 c2a03789 Iustin Pop
    try:
156 c2a03789 Iustin Pop
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
157 cb462b06 Michael Hanselmann
158 cb462b06 Michael Hanselmann
      # Try to connect
159 c2a03789 Iustin Pop
      try:
160 cb462b06 Michael Hanselmann
        utils.Retry(self._Connect, 1.0, self._ctimeout,
161 cb462b06 Michael Hanselmann
                    args=(self.socket, address, self._ctimeout))
162 cb462b06 Michael Hanselmann
      except utils.RetryTimeout:
163 cb462b06 Michael Hanselmann
        raise TimeoutError("Connect timed out")
164 cb462b06 Michael Hanselmann
165 c2a03789 Iustin Pop
      self.socket.settimeout(self._rwtimeout)
166 03a8dbdc Iustin Pop
    except (socket.error, NoMasterError):
167 c2a03789 Iustin Pop
      if self.socket is not None:
168 c2a03789 Iustin Pop
        self.socket.close()
169 c2a03789 Iustin Pop
      self.socket = None
170 c2a03789 Iustin Pop
      raise
171 c2a03789 Iustin Pop
172 cb462b06 Michael Hanselmann
  @staticmethod
173 cb462b06 Michael Hanselmann
  def _Connect(sock, address, timeout):
174 cb462b06 Michael Hanselmann
    sock.settimeout(timeout)
175 cb462b06 Michael Hanselmann
    try:
176 cb462b06 Michael Hanselmann
      sock.connect(address)
177 cb462b06 Michael Hanselmann
    except socket.timeout, err:
178 cb462b06 Michael Hanselmann
      raise TimeoutError("Connect timed out: %s" % str(err))
179 cb462b06 Michael Hanselmann
    except socket.error, err:
180 5a1c22fe Iustin Pop
      error_code = err.args[0]
181 5a1c22fe Iustin Pop
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
182 cb462b06 Michael Hanselmann
        raise NoMasterError(address)
183 5a1c22fe Iustin Pop
      elif error_code in (errno.EPERM, errno.EACCES):
184 5a1c22fe Iustin Pop
        raise PermissionError(address)
185 5a1c22fe Iustin Pop
      elif error_code == errno.EAGAIN:
186 cb462b06 Michael Hanselmann
        # Server's socket backlog is full at the moment
187 cb462b06 Michael Hanselmann
        raise utils.RetryAgain()
188 cb462b06 Michael Hanselmann
      raise
189 cb462b06 Michael Hanselmann
190 c2a03789 Iustin Pop
  def _CheckSocket(self):
191 c2a03789 Iustin Pop
    """Make sure we are connected.
192 c2a03789 Iustin Pop

193 c2a03789 Iustin Pop
    """
194 c2a03789 Iustin Pop
    if self.socket is None:
195 c2a03789 Iustin Pop
      raise ProtocolError("Connection is closed")
196 c2a03789 Iustin Pop
197 c2a03789 Iustin Pop
  def Send(self, msg):
198 c2a03789 Iustin Pop
    """Send a message.
199 c2a03789 Iustin Pop

200 c2a03789 Iustin Pop
    This just sends a message and doesn't wait for the response.
201 c2a03789 Iustin Pop

202 c2a03789 Iustin Pop
    """
203 25942a6c Guido Trotter
    if constants.LUXI_EOM in msg:
204 797506fc Michael Hanselmann
      raise ProtocolError("Message terminator found in payload")
205 797506fc Michael Hanselmann
206 c2a03789 Iustin Pop
    self._CheckSocket()
207 c2a03789 Iustin Pop
    try:
208 6096ee13 Michael Hanselmann
      # TODO: sendall is not guaranteed to send everything
209 25942a6c Guido Trotter
      self.socket.sendall(msg + constants.LUXI_EOM)
210 c2a03789 Iustin Pop
    except socket.timeout, err:
211 c2a03789 Iustin Pop
      raise TimeoutError("Sending timeout: %s" % str(err))
212 c2a03789 Iustin Pop
213 c2a03789 Iustin Pop
  def Recv(self):
214 5bbd3f7f Michael Hanselmann
    """Try to receive a message from the socket.
215 c2a03789 Iustin Pop

216 c2a03789 Iustin Pop
    In case we already have messages queued, we just return from the
217 c2a03789 Iustin Pop
    queue. Otherwise, we try to read data with a _rwtimeout network
218 c2a03789 Iustin Pop
    timeout, and making sure we don't go over 2x_rwtimeout as a global
219 c2a03789 Iustin Pop
    limit.
220 c2a03789 Iustin Pop

221 c2a03789 Iustin Pop
    """
222 c2a03789 Iustin Pop
    self._CheckSocket()
223 c2a03789 Iustin Pop
    etime = time.time() + self._rwtimeout
224 c2a03789 Iustin Pop
    while not self._msgs:
225 c2a03789 Iustin Pop
      if time.time() > etime:
226 c2a03789 Iustin Pop
        raise TimeoutError("Extended receive timeout")
227 6096ee13 Michael Hanselmann
      while True:
228 6096ee13 Michael Hanselmann
        try:
229 6096ee13 Michael Hanselmann
          data = self.socket.recv(4096)
230 6096ee13 Michael Hanselmann
        except socket.error, err:
231 6096ee13 Michael Hanselmann
          if err.args and err.args[0] == errno.EAGAIN:
232 6096ee13 Michael Hanselmann
            continue
233 6096ee13 Michael Hanselmann
          raise
234 6096ee13 Michael Hanselmann
        except socket.timeout, err:
235 6096ee13 Michael Hanselmann
          raise TimeoutError("Receive timeout: %s" % str(err))
236 6096ee13 Michael Hanselmann
        break
237 c2a03789 Iustin Pop
      if not data:
238 c2a03789 Iustin Pop
        raise ConnectionClosedError("Connection closed while reading")
239 25942a6c Guido Trotter
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
240 c2a03789 Iustin Pop
      self._buffer = new_msgs.pop()
241 c2a03789 Iustin Pop
      self._msgs.extend(new_msgs)
242 c2a03789 Iustin Pop
    return self._msgs.popleft()
243 c2a03789 Iustin Pop
244 c2a03789 Iustin Pop
  def Call(self, msg):
245 c2a03789 Iustin Pop
    """Send a message and wait for the response.
246 c2a03789 Iustin Pop

247 c2a03789 Iustin Pop
    This is just a wrapper over Send and Recv.
248 c2a03789 Iustin Pop

249 c2a03789 Iustin Pop
    """
250 c2a03789 Iustin Pop
    self.Send(msg)
251 c2a03789 Iustin Pop
    return self.Recv()
252 c2a03789 Iustin Pop
253 c2a03789 Iustin Pop
  def Close(self):
254 c2a03789 Iustin Pop
    """Close the socket"""
255 c2a03789 Iustin Pop
    if self.socket is not None:
256 c2a03789 Iustin Pop
      self.socket.close()
257 c2a03789 Iustin Pop
      self.socket = None
258 c2a03789 Iustin Pop
259 c2a03789 Iustin Pop
260 231db3a5 Michael Hanselmann
def ParseRequest(msg):
261 231db3a5 Michael Hanselmann
  """Parses a LUXI request message.
262 231db3a5 Michael Hanselmann

263 231db3a5 Michael Hanselmann
  """
264 231db3a5 Michael Hanselmann
  try:
265 231db3a5 Michael Hanselmann
    request = serializer.LoadJson(msg)
266 231db3a5 Michael Hanselmann
  except ValueError, err:
267 231db3a5 Michael Hanselmann
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
268 231db3a5 Michael Hanselmann
269 231db3a5 Michael Hanselmann
  logging.debug("LUXI request: %s", request)
270 231db3a5 Michael Hanselmann
271 231db3a5 Michael Hanselmann
  if not isinstance(request, dict):
272 231db3a5 Michael Hanselmann
    logging.error("LUXI request not a dict: %r", msg)
273 231db3a5 Michael Hanselmann
    raise ProtocolError("Invalid LUXI request (not a dict)")
274 231db3a5 Michael Hanselmann
275 e7a25b08 Guido Trotter
  method = request.get(KEY_METHOD, None) # pylint: disable-msg=E1103
276 e7a25b08 Guido Trotter
  args = request.get(KEY_ARGS, None) # pylint: disable-msg=E1103
277 e7a25b08 Guido Trotter
278 231db3a5 Michael Hanselmann
  if method is None or args is None:
279 231db3a5 Michael Hanselmann
    logging.error("LUXI request missing method or arguments: %r", msg)
280 231db3a5 Michael Hanselmann
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
281 231db3a5 Michael Hanselmann
                         " in request): %r") % msg)
282 231db3a5 Michael Hanselmann
283 231db3a5 Michael Hanselmann
  return (method, args)
284 231db3a5 Michael Hanselmann
285 231db3a5 Michael Hanselmann
286 231db3a5 Michael Hanselmann
def ParseResponse(msg):
287 231db3a5 Michael Hanselmann
  """Parses a LUXI response message.
288 231db3a5 Michael Hanselmann

289 231db3a5 Michael Hanselmann
  """
290 231db3a5 Michael Hanselmann
  # Parse the result
291 231db3a5 Michael Hanselmann
  try:
292 231db3a5 Michael Hanselmann
    data = serializer.LoadJson(msg)
293 231db3a5 Michael Hanselmann
  except Exception, err:
294 231db3a5 Michael Hanselmann
    raise ProtocolError("Error while deserializing response: %s" % str(err))
295 231db3a5 Michael Hanselmann
296 231db3a5 Michael Hanselmann
  # Validate response
297 231db3a5 Michael Hanselmann
  if not (isinstance(data, dict) and
298 231db3a5 Michael Hanselmann
          KEY_SUCCESS in data and
299 231db3a5 Michael Hanselmann
          KEY_RESULT in data):
300 231db3a5 Michael Hanselmann
    raise ProtocolError("Invalid response from server: %r" % data)
301 231db3a5 Michael Hanselmann
302 231db3a5 Michael Hanselmann
  return (data[KEY_SUCCESS], data[KEY_RESULT])
303 231db3a5 Michael Hanselmann
304 231db3a5 Michael Hanselmann
305 231db3a5 Michael Hanselmann
def FormatResponse(success, result):
306 231db3a5 Michael Hanselmann
  """Formats a LUXI response message.
307 231db3a5 Michael Hanselmann

308 231db3a5 Michael Hanselmann
  """
309 231db3a5 Michael Hanselmann
  response = {
310 231db3a5 Michael Hanselmann
    KEY_SUCCESS: success,
311 231db3a5 Michael Hanselmann
    KEY_RESULT: result,
312 231db3a5 Michael Hanselmann
    }
313 231db3a5 Michael Hanselmann
314 231db3a5 Michael Hanselmann
  logging.debug("LUXI response: %s", response)
315 231db3a5 Michael Hanselmann
316 231db3a5 Michael Hanselmann
  return serializer.DumpJson(response)
317 231db3a5 Michael Hanselmann
318 231db3a5 Michael Hanselmann
319 231db3a5 Michael Hanselmann
def FormatRequest(method, args):
320 231db3a5 Michael Hanselmann
  """Formats a LUXI request message.
321 231db3a5 Michael Hanselmann

322 231db3a5 Michael Hanselmann
  """
323 231db3a5 Michael Hanselmann
  # Build request
324 231db3a5 Michael Hanselmann
  request = {
325 231db3a5 Michael Hanselmann
    KEY_METHOD: method,
326 231db3a5 Michael Hanselmann
    KEY_ARGS: args,
327 231db3a5 Michael Hanselmann
    }
328 231db3a5 Michael Hanselmann
329 231db3a5 Michael Hanselmann
  # Serialize the request
330 231db3a5 Michael Hanselmann
  return serializer.DumpJson(request, indent=False)
331 231db3a5 Michael Hanselmann
332 231db3a5 Michael Hanselmann
333 231db3a5 Michael Hanselmann
def CallLuxiMethod(transport_cb, method, args):
334 231db3a5 Michael Hanselmann
  """Send a LUXI request via a transport and return the response.
335 231db3a5 Michael Hanselmann

336 231db3a5 Michael Hanselmann
  """
337 231db3a5 Michael Hanselmann
  assert callable(transport_cb)
338 231db3a5 Michael Hanselmann
339 231db3a5 Michael Hanselmann
  request_msg = FormatRequest(method, args)
340 231db3a5 Michael Hanselmann
341 231db3a5 Michael Hanselmann
  # Send request and wait for response
342 231db3a5 Michael Hanselmann
  response_msg = transport_cb(request_msg)
343 231db3a5 Michael Hanselmann
344 231db3a5 Michael Hanselmann
  (success, result) = ParseResponse(response_msg)
345 231db3a5 Michael Hanselmann
346 231db3a5 Michael Hanselmann
  if success:
347 231db3a5 Michael Hanselmann
    return result
348 231db3a5 Michael Hanselmann
349 231db3a5 Michael Hanselmann
  errors.MaybeRaise(result)
350 231db3a5 Michael Hanselmann
  raise RequestError(result)
351 231db3a5 Michael Hanselmann
352 231db3a5 Michael Hanselmann
353 c2a03789 Iustin Pop
class Client(object):
354 c2a03789 Iustin Pop
  """High-level client implementation.
355 c2a03789 Iustin Pop

356 c2a03789 Iustin Pop
  This uses a backing Transport-like class on top of which it
357 c2a03789 Iustin Pop
  implements data serialization/deserialization.
358 c2a03789 Iustin Pop

359 c2a03789 Iustin Pop
  """
360 ceab32dd Iustin Pop
  def __init__(self, address=None, timeouts=None, transport=Transport):
361 c2a03789 Iustin Pop
    """Constructor for the Client class.
362 c2a03789 Iustin Pop

363 c2a03789 Iustin Pop
    Arguments:
364 c2a03789 Iustin Pop
      - address: a valid address the the used transport class
365 c2a03789 Iustin Pop
      - timeout: a list of timeouts, to be used on connect and read/write
366 c2a03789 Iustin Pop
      - transport: a Transport-like class
367 c2a03789 Iustin Pop

368 c2a03789 Iustin Pop

369 c2a03789 Iustin Pop
    If timeout is not passed, the default timeouts of the transport
370 c2a03789 Iustin Pop
    class are used.
371 c2a03789 Iustin Pop

372 c2a03789 Iustin Pop
    """
373 ceab32dd Iustin Pop
    if address is None:
374 ceab32dd Iustin Pop
      address = constants.MASTER_SOCKET
375 8d5b316c Iustin Pop
    self.address = address
376 8d5b316c Iustin Pop
    self.timeouts = timeouts
377 8d5b316c Iustin Pop
    self.transport_class = transport
378 8d5b316c Iustin Pop
    self.transport = None
379 8d5b316c Iustin Pop
    self._InitTransport()
380 8d5b316c Iustin Pop
381 8d5b316c Iustin Pop
  def _InitTransport(self):
382 8d5b316c Iustin Pop
    """(Re)initialize the transport if needed.
383 8d5b316c Iustin Pop

384 8d5b316c Iustin Pop
    """
385 8d5b316c Iustin Pop
    if self.transport is None:
386 8d5b316c Iustin Pop
      self.transport = self.transport_class(self.address,
387 8d5b316c Iustin Pop
                                            timeouts=self.timeouts)
388 8d5b316c Iustin Pop
389 8d5b316c Iustin Pop
  def _CloseTransport(self):
390 8d5b316c Iustin Pop
    """Close the transport, ignoring errors.
391 8d5b316c Iustin Pop

392 8d5b316c Iustin Pop
    """
393 8d5b316c Iustin Pop
    if self.transport is None:
394 8d5b316c Iustin Pop
      return
395 8d5b316c Iustin Pop
    try:
396 8d5b316c Iustin Pop
      old_transp = self.transport
397 8d5b316c Iustin Pop
      self.transport = None
398 8d5b316c Iustin Pop
      old_transp.Close()
399 7260cfbe Iustin Pop
    except Exception: # pylint: disable-msg=W0703
400 8d5b316c Iustin Pop
      pass
401 c2a03789 Iustin Pop
402 231db3a5 Michael Hanselmann
  def _SendMethodCall(self, data):
403 3d8548c4 Michael Hanselmann
    # Send request and wait for response
404 8d5b316c Iustin Pop
    try:
405 8d5b316c Iustin Pop
      self._InitTransport()
406 231db3a5 Michael Hanselmann
      return self.transport.Call(data)
407 8d5b316c Iustin Pop
    except Exception:
408 8d5b316c Iustin Pop
      self._CloseTransport()
409 8d5b316c Iustin Pop
      raise
410 8d5b316c Iustin Pop
411 231db3a5 Michael Hanselmann
  def CallMethod(self, method, args):
412 231db3a5 Michael Hanselmann
    """Send a generic request and return the response.
413 3d8548c4 Michael Hanselmann

414 231db3a5 Michael Hanselmann
    """
415 231db3a5 Michael Hanselmann
    return CallLuxiMethod(self._SendMethodCall, method, args)
416 c2a03789 Iustin Pop
417 3ccafd0e Iustin Pop
  def SetQueueDrainFlag(self, drain_flag):
418 3ccafd0e Iustin Pop
    return self.CallMethod(REQ_QUEUE_SET_DRAIN_FLAG, drain_flag)
419 3ccafd0e Iustin Pop
420 05e50653 Michael Hanselmann
  def SetWatcherPause(self, until):
421 05e50653 Michael Hanselmann
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, [until])
422 05e50653 Michael Hanselmann
423 0bbe448c Michael Hanselmann
  def SubmitJob(self, ops):
424 0bbe448c Michael Hanselmann
    ops_state = map(lambda op: op.__getstate__(), ops)
425 0bbe448c Michael Hanselmann
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
426 0bbe448c Michael Hanselmann
427 2971c913 Iustin Pop
  def SubmitManyJobs(self, jobs):
428 2971c913 Iustin Pop
    jobs_state = []
429 2971c913 Iustin Pop
    for ops in jobs:
430 2971c913 Iustin Pop
      jobs_state.append([op.__getstate__() for op in ops])
431 2971c913 Iustin Pop
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
432 2971c913 Iustin Pop
433 0bbe448c Michael Hanselmann
  def CancelJob(self, job_id):
434 0bbe448c Michael Hanselmann
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
435 0bbe448c Michael Hanselmann
436 0bbe448c Michael Hanselmann
  def ArchiveJob(self, job_id):
437 0bbe448c Michael Hanselmann
    return self.CallMethod(REQ_ARCHIVE_JOB, job_id)
438 0bbe448c Michael Hanselmann
439 07cd723a Iustin Pop
  def AutoArchiveJobs(self, age):
440 f8ad5591 Michael Hanselmann
    timeout = (DEF_RWTO - 1) / 2
441 f8ad5591 Michael Hanselmann
    return self.CallMethod(REQ_AUTOARCHIVE_JOBS, (age, timeout))
442 07cd723a Iustin Pop
443 f4484122 Michael Hanselmann
  def WaitForJobChangeOnce(self, job_id, fields,
444 793a8f7c Michael Hanselmann
                           prev_job_info, prev_log_serial,
445 793a8f7c Michael Hanselmann
                           timeout=WFJC_TIMEOUT):
446 793a8f7c Michael Hanselmann
    """Waits for changes on a job.
447 793a8f7c Michael Hanselmann

448 793a8f7c Michael Hanselmann
    @param job_id: Job ID
449 793a8f7c Michael Hanselmann
    @type fields: list
450 793a8f7c Michael Hanselmann
    @param fields: List of field names to be observed
451 793a8f7c Michael Hanselmann
    @type prev_job_info: None or list
452 793a8f7c Michael Hanselmann
    @param prev_job_info: Previously received job information
453 793a8f7c Michael Hanselmann
    @type prev_log_serial: None or int/long
454 793a8f7c Michael Hanselmann
    @param prev_log_serial: Highest log serial number previously received
455 793a8f7c Michael Hanselmann
    @type timeout: int/float
456 793a8f7c Michael Hanselmann
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
457 793a8f7c Michael Hanselmann
                    be capped to that value)
458 793a8f7c Michael Hanselmann

459 793a8f7c Michael Hanselmann
    """
460 793a8f7c Michael Hanselmann
    assert timeout >= 0, "Timeout can not be negative"
461 f4484122 Michael Hanselmann
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
462 f4484122 Michael Hanselmann
                           (job_id, fields, prev_job_info,
463 793a8f7c Michael Hanselmann
                            prev_log_serial,
464 793a8f7c Michael Hanselmann
                            min(WFJC_TIMEOUT, timeout)))
465 f4484122 Michael Hanselmann
466 f4484122 Michael Hanselmann
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
467 5c735209 Iustin Pop
    while True:
468 f4484122 Michael Hanselmann
      result = self.WaitForJobChangeOnce(job_id, fields,
469 f4484122 Michael Hanselmann
                                         prev_job_info, prev_log_serial)
470 5c735209 Iustin Pop
      if result != constants.JOB_NOTCHANGED:
471 5c735209 Iustin Pop
        break
472 5c735209 Iustin Pop
    return result
473 dfe57c22 Michael Hanselmann
474 0bbe448c Michael Hanselmann
  def QueryJobs(self, job_ids, fields):
475 0bbe448c Michael Hanselmann
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
476 3d8548c4 Michael Hanselmann
477 ec79568d Iustin Pop
  def QueryInstances(self, names, fields, use_locking):
478 ec79568d Iustin Pop
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
479 ee6c7b94 Michael Hanselmann
480 ec79568d Iustin Pop
  def QueryNodes(self, names, fields, use_locking):
481 ec79568d Iustin Pop
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
482 02f7fe54 Michael Hanselmann
483 ec79568d Iustin Pop
  def QueryExports(self, nodes, use_locking):
484 ec79568d Iustin Pop
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
485 32f93223 Michael Hanselmann
486 66baeccc Iustin Pop
  def QueryClusterInfo(self):
487 66baeccc Iustin Pop
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
488 66baeccc Iustin Pop
489 ae5849b5 Michael Hanselmann
  def QueryConfigValues(self, fields):
490 ae5849b5 Michael Hanselmann
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, fields)
491 ae5849b5 Michael Hanselmann
492 7699c3af Iustin Pop
  def QueryTags(self, kind, name):
493 7699c3af Iustin Pop
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))
494 19b9ba9a Michael Hanselmann
495 19b9ba9a Michael Hanselmann
  def QueryLocks(self, fields, sync):
496 19b9ba9a Michael Hanselmann
    return self.CallMethod(REQ_QUERY_LOCKS, (fields, sync))