Revision 912b2278 lib/luxi.py

b/lib/luxi.py
29 29

  
30 30
"""
31 31

  
32
import logging
33

  
34
from ganeti import serializer
35 32
from ganeti import constants
36
from ganeti import errors
37 33
from ganeti import objects
38
from ganeti import pathutils
39
from ganeti.rpc import transport as t
34
import ganeti.rpc.client as cl
35
from ganeti.rpc.transport import Transport
40 36
from ganeti.rpc.errors import (ProtocolError, ConnectionClosedError,
41 37
                               TimeoutError, RequestError, NoMasterError,
42 38
                               PermissionError)
......
49 45
  "RequestError",
50 46
  "NoMasterError",
51 47
  "PermissionError",
52
  "ParseRequest",
53
  "ParseResponse",
54
  "FormatRequest",
55
  "FormatResponse",
56
  "CallLuxiMethod",
57 48
  # classes:
58 49
  "Client"
59 50
  ]
60 51

  
61

  
62
KEY_METHOD = constants.LUXI_KEY_METHOD
63
KEY_ARGS = constants.LUXI_KEY_ARGS
64
KEY_SUCCESS = constants.LUXI_KEY_SUCCESS
65
KEY_RESULT = constants.LUXI_KEY_RESULT
66
KEY_VERSION = constants.LUXI_KEY_VERSION
67

  
68 52
REQ_SUBMIT_JOB = constants.LUXI_REQ_SUBMIT_JOB
69 53
REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = constants.LUXI_REQ_SUBMIT_JOB_TO_DRAINED_QUEUE
70 54
REQ_SUBMIT_MANY_JOBS = constants.LUXI_REQ_SUBMIT_MANY_JOBS
......
93 77
WFJC_TIMEOUT = constants.LUXI_WFJC_TIMEOUT
94 78

  
95 79

  
96
def ParseRequest(msg):
97
  """Parses a LUXI request message.
98

  
99
  """
100
  try:
101
    request = serializer.LoadJson(msg)
102
  except ValueError, err:
103
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
104

  
105
  logging.debug("LUXI request: %s", request)
106

  
107
  if not isinstance(request, dict):
108
    logging.error("LUXI request not a dict: %r", msg)
109
    raise ProtocolError("Invalid LUXI request (not a dict)")
110

  
111
  method = request.get(KEY_METHOD, None) # pylint: disable=E1103
112
  args = request.get(KEY_ARGS, None) # pylint: disable=E1103
113
  version = request.get(KEY_VERSION, None) # pylint: disable=E1103
114

  
115
  if method is None or args is None:
116
    logging.error("LUXI request missing method or arguments: %r", msg)
117
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
118
                         " in request): %r") % msg)
119

  
120
  return (method, args, version)
121

  
122

  
123
def ParseResponse(msg):
124
  """Parses a LUXI response message.
125

  
126
  """
127
  # Parse the result
128
  try:
129
    data = serializer.LoadJson(msg)
130
  except KeyboardInterrupt:
131
    raise
132
  except Exception, err:
133
    raise ProtocolError("Error while deserializing response: %s" % str(err))
134

  
135
  # Validate response
136
  if not (isinstance(data, dict) and
137
          KEY_SUCCESS in data and
138
          KEY_RESULT in data):
139
    raise ProtocolError("Invalid response from server: %r" % data)
140

  
141
  return (data[KEY_SUCCESS], data[KEY_RESULT],
142
          data.get(KEY_VERSION, None)) # pylint: disable=E1103
143

  
144

  
145
def FormatResponse(success, result, version=None):
146
  """Formats a LUXI response message.
147

  
148
  """
149
  response = {
150
    KEY_SUCCESS: success,
151
    KEY_RESULT: result,
152
    }
153

  
154
  if version is not None:
155
    response[KEY_VERSION] = version
156

  
157
  logging.debug("LUXI response: %s", response)
158

  
159
  return serializer.DumpJson(response)
160

  
161

  
162
def FormatRequest(method, args, version=None):
163
  """Formats a LUXI request message.
164

  
165
  """
166
  # Build request
167
  request = {
168
    KEY_METHOD: method,
169
    KEY_ARGS: args,
170
    }
171

  
172
  if version is not None:
173
    request[KEY_VERSION] = version
174

  
175
  # Serialize the request
176
  return serializer.DumpJson(request)
177

  
178

  
179
def CallLuxiMethod(transport_cb, method, args, version=None):
180
  """Send a LUXI request via a transport and return the response.
181

  
182
  """
183
  assert callable(transport_cb)
184

  
185
  request_msg = FormatRequest(method, args, version=version)
186

  
187
  # Send request and wait for response
188
  response_msg = transport_cb(request_msg)
189

  
190
  (success, result, resp_version) = ParseResponse(response_msg)
191

  
192
  # Verify version if there was one in the response
193
  if resp_version is not None and resp_version != version:
194
    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
195
                           (version, resp_version))
196

  
197
  if success:
198
    return result
199

  
200
  errors.MaybeRaise(result)
201
  raise RequestError(result)
202

  
203

  
204
class Client(object):
80
class Client(cl.AbstractClient):
205 81
  """High-level client implementation.
206 82

  
207 83
  This uses a backing Transport-like class on top of which it
208 84
  implements data serialization/deserialization.
209 85

  
210 86
  """
211
  def __init__(self, address=None, timeouts=None, transport=t.Transport):
87
  def __init__(self, address=None, timeouts=None, transport=Transport):
212 88
    """Constructor for the Client class.
213 89

  
214
    Arguments:
215
      - address: a valid address the the used transport class
216
      - timeout: a list of timeouts, to be used on connect and read/write
217
      - transport: a Transport-like class
218

  
219

  
220
    If timeout is not passed, the default timeouts of the transport
221
    class are used.
222

  
223
    """
224
    if address is None:
225
      address = pathutils.MASTER_SOCKET
226
    self.address = address
227
    self.timeouts = timeouts
228
    self.transport_class = transport
229
    self.transport = None
230
    self._InitTransport()
231

  
232
  def _InitTransport(self):
233
    """(Re)initialize the transport if needed.
234

  
235
    """
236
    if self.transport is None:
237
      self.transport = self.transport_class(self.address,
238
                                            timeouts=self.timeouts)
239

  
240
  def _CloseTransport(self):
241
    """Close the transport, ignoring errors.
242

  
243
    """
244
    if self.transport is None:
245
      return
246
    try:
247
      old_transp = self.transport
248
      self.transport = None
249
      old_transp.Close()
250
    except Exception: # pylint: disable=W0703
251
      pass
252

  
253
  def _SendMethodCall(self, data):
254
    # Send request and wait for response
255
    try:
256
      self._InitTransport()
257
      return self.transport.Call(data)
258
    except Exception:
259
      self._CloseTransport()
260
      raise
261

  
262
  def Close(self):
263
    """Close the underlying connection.
264

  
265
    """
266
    self._CloseTransport()
267

  
268
  def CallMethod(self, method, args):
269
    """Send a generic request and return the response.
90
    Arguments are the same as for L{AbstractClient}.
270 91

  
271 92
    """
272
    if not isinstance(args, (list, tuple)):
273
      raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
274
                                   " expected list, got %s" % type(args))
275
    return CallLuxiMethod(self._SendMethodCall, method, args,
276
                          version=constants.LUXI_VERSION)
93
    super(Client, self).__init__(address, timeouts, transport)
277 94

  
278 95
  def SetQueueDrainFlag(self, drain_flag):
279 96
    return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))

Also available in: Unified diff