Revision 1c3231aa lib/rpc.py

b/lib/rpc.py
275 275
    ip = ipmap.get(node)
276 276
    if ip is None:
277 277
      ip = nslookup_fn(node, family=family)
278
    result.append((node, ip))
278
    result.append((node, ip, node))
279 279

  
280 280
  return result
281 281

  
......
292 292

  
293 293
    """
294 294
    assert len(hosts) == len(self._addresses)
295
    return zip(hosts, self._addresses)
295
    return zip(hosts, self._addresses, hosts)
296 296

  
297 297

  
298
def _CheckConfigNode(name, node, accept_offline_node):
298
def _CheckConfigNode(node_uuid_or_name, node, accept_offline_node):
299 299
  """Checks if a node is online.
300 300

  
301
  @type name: string
302
  @param name: Node name
301
  @type node_uuid_or_name: string
302
  @param node_uuid_or_name: Node UUID
303 303
  @type node: L{objects.Node} or None
304 304
  @param node: Node object
305 305

  
306 306
  """
307 307
  if node is None:
308
    # Depend on DNS for name resolution
309
    ip = name
310
  elif node.offline and not accept_offline_node:
311
    ip = _OFFLINE
308
    # Assume that the passed parameter was actually a node name, so depend on
309
    # DNS for name resolution
310
    return (node_uuid_or_name, node_uuid_or_name, node_uuid_or_name)
312 311
  else:
313
    ip = node.primary_ip
314
  return (name, ip)
312
    if node.offline and not accept_offline_node:
313
      ip = _OFFLINE
314
    else:
315
      ip = node.primary_ip
316
    return (node.name, ip, node_uuid_or_name)
315 317

  
316 318

  
317
def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts, opts):
319
def _NodeConfigResolver(single_node_fn, all_nodes_fn, node_uuids, opts):
318 320
  """Calculate node addresses using configuration.
319 321

  
322
  Note that strings in node_uuids are treated as node names if the UUID is not
323
  found in the configuration.
324

  
320 325
  """
321 326
  accept_offline_node = (opts is rpc_defs.ACCEPT_OFFLINE_NODE)
322 327

  
323 328
  assert accept_offline_node or opts is None, "Unknown option"
324 329

  
325 330
  # Special case for single-host lookups
326
  if len(hosts) == 1:
327
    (name, ) = hosts
328
    return [_CheckConfigNode(name, single_node_fn(name), accept_offline_node)]
331
  if len(node_uuids) == 1:
332
    (uuid, ) = node_uuids
333
    return [_CheckConfigNode(uuid, single_node_fn(uuid), accept_offline_node)]
329 334
  else:
330 335
    all_nodes = all_nodes_fn()
331
    return [_CheckConfigNode(name, all_nodes.get(name, None),
336
    return [_CheckConfigNode(uuid, all_nodes.get(uuid, None),
332 337
                             accept_offline_node)
333
            for name in hosts]
338
            for uuid in node_uuids]
334 339

  
335 340

  
336 341
class _RpcProcessor:
337 342
  def __init__(self, resolver, port, lock_monitor_cb=None):
338 343
    """Initializes this class.
339 344

  
340
    @param resolver: callable accepting a list of hostnames, returning a list
341
      of tuples containing name and IP address (IP address can be the name or
342
      the special value L{_OFFLINE} to mark offline machines)
345
    @param resolver: callable accepting a list of node UUIDs or hostnames,
346
      returning a list of tuples containing name, IP address and original name
347
      of the resolved node. IP address can be the name or the special value
348
      L{_OFFLINE} to mark offline machines.
343 349
    @type port: int
344 350
    @param port: TCP port
345 351
    @param lock_monitor_cb: Callable for registering with lock monitor
......
363 369
    assert isinstance(body, dict)
364 370
    assert len(body) == len(hosts)
365 371
    assert compat.all(isinstance(v, str) for v in body.values())
366
    assert frozenset(map(compat.fst, hosts)) == frozenset(body.keys()), \
372
    assert frozenset(map(lambda x: x[2], hosts)) == frozenset(body.keys()), \
367 373
        "%s != %s" % (hosts, body.keys())
368 374

  
369
    for (name, ip) in hosts:
375
    for (name, ip, original_name) in hosts:
370 376
      if ip is _OFFLINE:
371 377
        # Node is marked as offline
372
        results[name] = RpcResult(node=name, offline=True, call=procedure)
378
        results[original_name] = RpcResult(node=name,
379
                                           offline=True,
380
                                           call=procedure)
373 381
      else:
374
        requests[name] = \
382
        requests[original_name] = \
375 383
          http.client.HttpClientRequest(str(ip), port,
376 384
                                        http.HTTP_POST, str("/%s" % procedure),
377 385
                                        headers=_RPC_CLIENT_HEADERS,
378
                                        post_data=body[name],
386
                                        post_data=body[original_name],
379 387
                                        read_timeout=read_timeout,
380 388
                                        nicename="%s/%s" % (name, procedure),
381 389
                                        curl_config_fn=_ConfigRpcCurl)
......
406 414

  
407 415
    return results
408 416

  
409
  def __call__(self, hosts, procedure, body, read_timeout, resolver_opts,
417
  def __call__(self, nodes, procedure, body, read_timeout, resolver_opts,
410 418
               _req_process_fn=None):
411 419
    """Makes an RPC request to a number of nodes.
412 420

  
413
    @type hosts: sequence
414
    @param hosts: Hostnames
421
    @type nodes: sequence
422
    @param nodes: node UUIDs or Hostnames
415 423
    @type procedure: string
416 424
    @param procedure: Request path
417 425
    @type body: dictionary
......
429 437
      _req_process_fn = http.client.ProcessRequests
430 438

  
431 439
    (results, requests) = \
432
      self._PrepareRequests(self._resolver(hosts, resolver_opts), self._port,
440
      self._PrepareRequests(self._resolver(nodes, resolver_opts), self._port,
433 441
                            procedure, body, read_timeout)
434 442

  
435 443
    _req_process_fn(requests.values(), lock_monitor_cb=self._lock_monitor_cb)
......
675 683
  return [annotation_fn(disk.Copy(), ld_params) for disk in disks]
676 684

  
677 685

  
678
def _GetESFlag(cfg, nodename):
679
  ni = cfg.GetNodeInfo(nodename)
686
def _GetESFlag(cfg, node_uuid):
687
  ni = cfg.GetNodeInfo(node_uuid)
680 688
  if ni is None:
681
    raise errors.OpPrereqError("Invalid node name %s" % nodename,
689
    raise errors.OpPrereqError("Invalid node name %s" % node_uuid,
682 690
                               errors.ECODE_NOENT)
683 691
  return cfg.GetNdParams(ni)[constants.ND_EXCLUSIVE_STORAGE]
684 692

  
685 693

  
686
def GetExclusiveStorageForNodeNames(cfg, nodelist):
694
def GetExclusiveStorageForNodes(cfg, node_uuids):
687 695
  """Return the exclusive storage flag for all the given nodes.
688 696

  
689 697
  @type cfg: L{config.ConfigWriter}
690 698
  @param cfg: cluster configuration
691
  @type nodelist: list or tuple
692
  @param nodelist: node names for which to read the flag
699
  @type node_uuids: list or tuple
700
  @param node_uuids: node UUIDs for which to read the flag
693 701
  @rtype: dict
694 702
  @return: mapping from node names to exclusive storage flags
695
  @raise errors.OpPrereqError: if any given node name has no corresponding node
703
  @raise errors.OpPrereqError: if any given node name has no corresponding
704
  node
696 705

  
697 706
  """
698 707
  getflag = lambda n: _GetESFlag(cfg, n)
699
  flags = map(getflag, nodelist)
700
  return dict(zip(nodelist, flags))
708
  flags = map(getflag, node_uuids)
709
  return dict(zip(node_uuids, flags))
701 710

  
702 711

  
703 712
#: Generic encoders

Also available in: Unified diff