Revision 912b2278 lib/luxi.py
b/lib/luxi.py | ||
---|---|---|
29 | 29 |
|
30 | 30 |
""" |
31 | 31 |
|
32 |
import logging |
|
33 |
|
|
34 |
from ganeti import serializer |
|
35 | 32 |
from ganeti import constants |
36 |
from ganeti import errors |
|
37 | 33 |
from ganeti import objects |
38 |
from ganeti import pathutils
|
|
39 |
from ganeti.rpc import transport as t
|
|
34 |
import ganeti.rpc.client as cl
|
|
35 |
from ganeti.rpc.transport import Transport
|
|
40 | 36 |
from ganeti.rpc.errors import (ProtocolError, ConnectionClosedError, |
41 | 37 |
TimeoutError, RequestError, NoMasterError, |
42 | 38 |
PermissionError) |
... | ... | |
49 | 45 |
"RequestError", |
50 | 46 |
"NoMasterError", |
51 | 47 |
"PermissionError", |
52 |
"ParseRequest", |
|
53 |
"ParseResponse", |
|
54 |
"FormatRequest", |
|
55 |
"FormatResponse", |
|
56 |
"CallLuxiMethod", |
|
57 | 48 |
# classes: |
58 | 49 |
"Client" |
59 | 50 |
] |
60 | 51 |
|
61 |
|
|
62 |
KEY_METHOD = constants.LUXI_KEY_METHOD |
|
63 |
KEY_ARGS = constants.LUXI_KEY_ARGS |
|
64 |
KEY_SUCCESS = constants.LUXI_KEY_SUCCESS |
|
65 |
KEY_RESULT = constants.LUXI_KEY_RESULT |
|
66 |
KEY_VERSION = constants.LUXI_KEY_VERSION |
|
67 |
|
|
68 | 52 |
REQ_SUBMIT_JOB = constants.LUXI_REQ_SUBMIT_JOB |
69 | 53 |
REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = constants.LUXI_REQ_SUBMIT_JOB_TO_DRAINED_QUEUE |
70 | 54 |
REQ_SUBMIT_MANY_JOBS = constants.LUXI_REQ_SUBMIT_MANY_JOBS |
... | ... | |
93 | 77 |
WFJC_TIMEOUT = constants.LUXI_WFJC_TIMEOUT |
94 | 78 |
|
95 | 79 |
|
96 |
def ParseRequest(msg): |
|
97 |
"""Parses a LUXI request message. |
|
98 |
|
|
99 |
""" |
|
100 |
try: |
|
101 |
request = serializer.LoadJson(msg) |
|
102 |
except ValueError, err: |
|
103 |
raise ProtocolError("Invalid LUXI request (parsing error): %s" % err) |
|
104 |
|
|
105 |
logging.debug("LUXI request: %s", request) |
|
106 |
|
|
107 |
if not isinstance(request, dict): |
|
108 |
logging.error("LUXI request not a dict: %r", msg) |
|
109 |
raise ProtocolError("Invalid LUXI request (not a dict)") |
|
110 |
|
|
111 |
method = request.get(KEY_METHOD, None) # pylint: disable=E1103 |
|
112 |
args = request.get(KEY_ARGS, None) # pylint: disable=E1103 |
|
113 |
version = request.get(KEY_VERSION, None) # pylint: disable=E1103 |
|
114 |
|
|
115 |
if method is None or args is None: |
|
116 |
logging.error("LUXI request missing method or arguments: %r", msg) |
|
117 |
raise ProtocolError(("Invalid LUXI request (no method or arguments" |
|
118 |
" in request): %r") % msg) |
|
119 |
|
|
120 |
return (method, args, version) |
|
121 |
|
|
122 |
|
|
123 |
def ParseResponse(msg): |
|
124 |
"""Parses a LUXI response message. |
|
125 |
|
|
126 |
""" |
|
127 |
# Parse the result |
|
128 |
try: |
|
129 |
data = serializer.LoadJson(msg) |
|
130 |
except KeyboardInterrupt: |
|
131 |
raise |
|
132 |
except Exception, err: |
|
133 |
raise ProtocolError("Error while deserializing response: %s" % str(err)) |
|
134 |
|
|
135 |
# Validate response |
|
136 |
if not (isinstance(data, dict) and |
|
137 |
KEY_SUCCESS in data and |
|
138 |
KEY_RESULT in data): |
|
139 |
raise ProtocolError("Invalid response from server: %r" % data) |
|
140 |
|
|
141 |
return (data[KEY_SUCCESS], data[KEY_RESULT], |
|
142 |
data.get(KEY_VERSION, None)) # pylint: disable=E1103 |
|
143 |
|
|
144 |
|
|
145 |
def FormatResponse(success, result, version=None): |
|
146 |
"""Formats a LUXI response message. |
|
147 |
|
|
148 |
""" |
|
149 |
response = { |
|
150 |
KEY_SUCCESS: success, |
|
151 |
KEY_RESULT: result, |
|
152 |
} |
|
153 |
|
|
154 |
if version is not None: |
|
155 |
response[KEY_VERSION] = version |
|
156 |
|
|
157 |
logging.debug("LUXI response: %s", response) |
|
158 |
|
|
159 |
return serializer.DumpJson(response) |
|
160 |
|
|
161 |
|
|
162 |
def FormatRequest(method, args, version=None): |
|
163 |
"""Formats a LUXI request message. |
|
164 |
|
|
165 |
""" |
|
166 |
# Build request |
|
167 |
request = { |
|
168 |
KEY_METHOD: method, |
|
169 |
KEY_ARGS: args, |
|
170 |
} |
|
171 |
|
|
172 |
if version is not None: |
|
173 |
request[KEY_VERSION] = version |
|
174 |
|
|
175 |
# Serialize the request |
|
176 |
return serializer.DumpJson(request) |
|
177 |
|
|
178 |
|
|
179 |
def CallLuxiMethod(transport_cb, method, args, version=None): |
|
180 |
"""Send a LUXI request via a transport and return the response. |
|
181 |
|
|
182 |
""" |
|
183 |
assert callable(transport_cb) |
|
184 |
|
|
185 |
request_msg = FormatRequest(method, args, version=version) |
|
186 |
|
|
187 |
# Send request and wait for response |
|
188 |
response_msg = transport_cb(request_msg) |
|
189 |
|
|
190 |
(success, result, resp_version) = ParseResponse(response_msg) |
|
191 |
|
|
192 |
# Verify version if there was one in the response |
|
193 |
if resp_version is not None and resp_version != version: |
|
194 |
raise errors.LuxiError("LUXI version mismatch, client %s, response %s" % |
|
195 |
(version, resp_version)) |
|
196 |
|
|
197 |
if success: |
|
198 |
return result |
|
199 |
|
|
200 |
errors.MaybeRaise(result) |
|
201 |
raise RequestError(result) |
|
202 |
|
|
203 |
|
|
204 |
class Client(object): |
|
80 |
class Client(cl.AbstractClient): |
|
205 | 81 |
"""High-level client implementation. |
206 | 82 |
|
207 | 83 |
This uses a backing Transport-like class on top of which it |
208 | 84 |
implements data serialization/deserialization. |
209 | 85 |
|
210 | 86 |
""" |
211 |
def __init__(self, address=None, timeouts=None, transport=t.Transport):
|
|
87 |
def __init__(self, address=None, timeouts=None, transport=Transport): |
|
212 | 88 |
"""Constructor for the Client class. |
213 | 89 |
|
214 |
Arguments: |
|
215 |
- address: a valid address the the used transport class |
|
216 |
- timeout: a list of timeouts, to be used on connect and read/write |
|
217 |
- transport: a Transport-like class |
|
218 |
|
|
219 |
|
|
220 |
If timeout is not passed, the default timeouts of the transport |
|
221 |
class are used. |
|
222 |
|
|
223 |
""" |
|
224 |
if address is None: |
|
225 |
address = pathutils.MASTER_SOCKET |
|
226 |
self.address = address |
|
227 |
self.timeouts = timeouts |
|
228 |
self.transport_class = transport |
|
229 |
self.transport = None |
|
230 |
self._InitTransport() |
|
231 |
|
|
232 |
def _InitTransport(self): |
|
233 |
"""(Re)initialize the transport if needed. |
|
234 |
|
|
235 |
""" |
|
236 |
if self.transport is None: |
|
237 |
self.transport = self.transport_class(self.address, |
|
238 |
timeouts=self.timeouts) |
|
239 |
|
|
240 |
def _CloseTransport(self): |
|
241 |
"""Close the transport, ignoring errors. |
|
242 |
|
|
243 |
""" |
|
244 |
if self.transport is None: |
|
245 |
return |
|
246 |
try: |
|
247 |
old_transp = self.transport |
|
248 |
self.transport = None |
|
249 |
old_transp.Close() |
|
250 |
except Exception: # pylint: disable=W0703 |
|
251 |
pass |
|
252 |
|
|
253 |
def _SendMethodCall(self, data): |
|
254 |
# Send request and wait for response |
|
255 |
try: |
|
256 |
self._InitTransport() |
|
257 |
return self.transport.Call(data) |
|
258 |
except Exception: |
|
259 |
self._CloseTransport() |
|
260 |
raise |
|
261 |
|
|
262 |
def Close(self): |
|
263 |
"""Close the underlying connection. |
|
264 |
|
|
265 |
""" |
|
266 |
self._CloseTransport() |
|
267 |
|
|
268 |
def CallMethod(self, method, args): |
|
269 |
"""Send a generic request and return the response. |
|
90 |
Arguments are the same as for L{AbstractClient}. |
|
270 | 91 |
|
271 | 92 |
""" |
272 |
if not isinstance(args, (list, tuple)): |
|
273 |
raise errors.ProgrammerError("Invalid parameter passed to CallMethod:" |
|
274 |
" expected list, got %s" % type(args)) |
|
275 |
return CallLuxiMethod(self._SendMethodCall, method, args, |
|
276 |
version=constants.LUXI_VERSION) |
|
93 |
super(Client, self).__init__(address, timeouts, transport) |
|
277 | 94 |
|
278 | 95 |
def SetQueueDrainFlag(self, drain_flag): |
279 | 96 |
return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, )) |
Also available in: Unified diff