Revision d9de612c lib/rpc.py
b/lib/rpc.py | ||
---|---|---|
338 | 338 |
def _PrepareRequests(hosts, port, procedure, body, read_timeout): |
339 | 339 |
"""Prepares requests by sorting offline hosts into separate list. |
340 | 340 |
|
341 |
@type body: dict |
|
342 |
@param body: a dictionary with per-host body data |
|
343 |
|
|
341 | 344 |
""" |
342 | 345 |
results = {} |
343 | 346 |
requests = {} |
344 | 347 |
|
348 |
assert isinstance(body, dict) |
|
349 |
assert len(body) == len(hosts) |
|
350 |
assert compat.all(isinstance(v, str) for v in body.values()) |
|
351 |
assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \ |
|
352 |
"%s != %s" % (hosts, body.keys()) |
|
353 |
|
|
345 | 354 |
for (name, ip) in hosts: |
346 | 355 |
if ip is _OFFLINE: |
347 | 356 |
# Node is marked as offline |
... | ... | |
351 | 360 |
http.client.HttpClientRequest(str(ip), port, |
352 | 361 |
http.HTTP_PUT, str("/%s" % procedure), |
353 | 362 |
headers=_RPC_CLIENT_HEADERS, |
354 |
post_data=body, |
|
363 |
post_data=body[name],
|
|
355 | 364 |
read_timeout=read_timeout, |
356 | 365 |
nicename="%s/%s" % (name, procedure), |
357 | 366 |
curl_config_fn=_ConfigRpcCurl) |
... | ... | |
390 | 399 |
@param hosts: Hostnames |
391 | 400 |
@type procedure: string |
392 | 401 |
@param procedure: Request path |
393 |
@type body: string
|
|
394 |
@param body: Request body
|
|
402 |
@type body: dictionary
|
|
403 |
@param body: dictionary with request bodies per host
|
|
395 | 404 |
@type read_timeout: int or None |
396 | 405 |
@param read_timeout: Read timeout for request |
397 | 406 |
|
... | ... | |
401 | 410 |
|
402 | 411 |
(results, requests) = \ |
403 | 412 |
self._PrepareRequests(self._resolver(hosts), self._port, procedure, |
404 |
str(body), read_timeout)
|
|
413 |
body, read_timeout)
|
|
405 | 414 |
|
406 | 415 |
_req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb) |
407 | 416 |
|
... | ... | |
434 | 443 |
"""Entry point for automatically generated RPC wrappers. |
435 | 444 |
|
436 | 445 |
""" |
437 |
(procedure, _, timeout, argdefs, _, postproc_fn, _) = cdef
|
|
446 |
(procedure, _, timeout, argdefs, prep_fn, postproc_fn, _) = cdef
|
|
438 | 447 |
|
439 | 448 |
if callable(timeout): |
440 | 449 |
read_timeout = timeout(args) |
441 | 450 |
else: |
442 | 451 |
read_timeout = timeout |
443 | 452 |
|
444 |
body = serializer.DumpJson(map(self._encoder, |
|
445 |
zip(map(compat.snd, argdefs), args))) |
|
446 |
|
|
447 |
result = self._proc(node_list, procedure, body, read_timeout=read_timeout) |
|
453 |
enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args)) |
|
454 |
if prep_fn is None: |
|
455 |
# for a no-op prep_fn, we serialise the body once, and then we |
|
456 |
# reuse it in the dictionary values |
|
457 |
body = serializer.DumpJson(enc_args) |
|
458 |
pnbody = dict((n, body) for n in node_list) |
|
459 |
else: |
|
460 |
# for a custom prep_fn, we pass the encoded arguments and the |
|
461 |
# node name to the prep_fn, and we serialise its return value |
|
462 |
assert(callable(prep_fn)) |
|
463 |
pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args))) |
|
464 |
for n in node_list) |
|
465 |
|
|
466 |
result = self._proc(node_list, procedure, pnbody, |
|
467 |
read_timeout=read_timeout) |
|
448 | 468 |
|
449 | 469 |
if postproc_fn: |
450 | 470 |
return dict(map(lambda (key, value): (key, postproc_fn(value)), |
Also available in: Unified diff