Revision d9de612c lib/rpc.py

b/lib/rpc.py
338 338
  def _PrepareRequests(hosts, port, procedure, body, read_timeout):
339 339
    """Prepares requests by sorting offline hosts into separate list.
340 340

  
341
    @type body: dict
342
    @param body: a dictionary with per-host body data
343

  
341 344
    """
342 345
    results = {}
343 346
    requests = {}
344 347

  
348
    assert isinstance(body, dict)
349
    assert len(body) == len(hosts)
350
    assert compat.all(isinstance(v, str) for v in body.values())
351
    assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
352
        "%s != %s" % (hosts, body.keys())
353

  
345 354
    for (name, ip) in hosts:
346 355
      if ip is _OFFLINE:
347 356
        # Node is marked as offline
......
351 360
          http.client.HttpClientRequest(str(ip), port,
352 361
                                        http.HTTP_PUT, str("/%s" % procedure),
353 362
                                        headers=_RPC_CLIENT_HEADERS,
354
                                        post_data=body,
363
                                        post_data=body[name],
355 364
                                        read_timeout=read_timeout,
356 365
                                        nicename="%s/%s" % (name, procedure),
357 366
                                        curl_config_fn=_ConfigRpcCurl)
......
390 399
    @param hosts: Hostnames
391 400
    @type procedure: string
392 401
    @param procedure: Request path
393
    @type body: string
394
    @param body: Request body
402
    @type body: dictionary
403
    @param body: dictionary with request bodies per host
395 404
    @type read_timeout: int or None
396 405
    @param read_timeout: Read timeout for request
397 406

  
......
401 410

  
402 411
    (results, requests) = \
403 412
      self._PrepareRequests(self._resolver(hosts), self._port, procedure,
404
                            str(body), read_timeout)
413
                            body, read_timeout)
405 414

  
406 415
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
407 416

  
......
434 443
    """Entry point for automatically generated RPC wrappers.
435 444

  
436 445
    """
437
    (procedure, _, timeout, argdefs, _, postproc_fn, _) = cdef
446
    (procedure, _, timeout, argdefs, prep_fn, postproc_fn, _) = cdef
438 447

  
439 448
    if callable(timeout):
440 449
      read_timeout = timeout(args)
441 450
    else:
442 451
      read_timeout = timeout
443 452

  
444
    body = serializer.DumpJson(map(self._encoder,
445
                                   zip(map(compat.snd, argdefs), args)))
446

  
447
    result = self._proc(node_list, procedure, body, read_timeout=read_timeout)
453
    enc_args = map(self._encoder, zip(map(compat.snd, argdefs), args))
454
    if prep_fn is None:
455
      # for a no-op prep_fn, we serialise the body once, and then we
456
      # reuse it in the dictionary values
457
      body = serializer.DumpJson(enc_args)
458
      pnbody = dict((n, body) for n in node_list)
459
    else:
460
      # for a custom prep_fn, we pass the encoded arguments and the
461
      # node name to the prep_fn, and we serialise its return value
462
      assert(callable(prep_fn))
463
      pnbody = dict((n, serializer.DumpJson(prep_fn(n, enc_args)))
464
                    for n in node_list)
465

  
466
    result = self._proc(node_list, procedure, pnbody,
467
                        read_timeout=read_timeout)
448 468

  
449 469
    if postproc_fn:
450 470
      return dict(map(lambda (key, value): (key, postproc_fn(value)),

Also available in: Unified diff