Revision 99aabbed lib/jqueue.py

b/lib/jqueue.py
364 364
                                           " check in jstore and here")
365 365

  
366 366
    # Get initial list of nodes
367
    self._nodes = set(self.context.cfg.GetNodeList())
367
    self._nodes = dict((n.name, n.primary_ip)
368
                       for n in self.context.cfg.GetAllNodesInfo().values())
368 369

  
369 370
    # Remove master node
370 371
    try:
371
      self._nodes.remove(self._my_hostname)
372
      del self._nodes[self._my_hostname]
372 373
    except ValueError:
373 374
      pass
374 375

  
......
405 406

  
406 407
  @utils.LockedMethod
407 408
  @_RequireOpenQueue
408
  def AddNode(self, node_name):
409
  def AddNode(self, node):
410
    """Register a new node with the queue.
411

  
412
    @type node: L{objects.Node}
413
    @param node: the node object to be added
414

  
415
    """
416
    node_name = node.name
409 417
    assert node_name != self._my_hostname
410 418

  
411 419
    # Clean queue directory on added node
......
425 433
      finally:
426 434
        fd.close()
427 435

  
428
      result = RpcRunner.call_jobqueue_update([node_name], file_name, content)
436
      result = RpcRunner.call_jobqueue_update([node_name], [node.primary_ip],
437
                                              file_name, content)
429 438
      if not result[node_name]:
430 439
        logging.error("Failed to upload %s to %s", file_name, node_name)
431 440

  
432
    self._nodes.add(node_name)
441
    self._nodes[node_name] = node.primary_ip
433 442

  
434 443
  @utils.LockedMethod
435 444
  @_RequireOpenQueue
436 445
  def RemoveNode(self, node_name):
437 446
    try:
438 447
      # The queue is removed by the "leave node" RPC call.
439
      self._nodes.remove(node_name)
448
      del self._nodes[node_name]
440 449
    except KeyError:
441 450
      pass
442 451

  
......
458 467
      # TODO: Handle failing nodes
459 468
      logging.error("More than half of the nodes failed")
460 469

  
470
  def _GetNodeIp(self):
471
    """Helper for returning the node name/ip list.
472

  
473
    """
474
    name_list = self._nodes.keys()
475
    addr_list = [self._nodes[name] for name in name_list]
476
    return name_list, addr_list
477

  
461 478
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
462 479
    """Writes a file locally and then replicates it to all nodes.
463 480

  
464 481
    """
465 482
    utils.WriteFile(file_name, data=data)
466 483

  
467
    result = RpcRunner.call_jobqueue_update(self._nodes, file_name, data)
484
    names, addrs = self._GetNodeIp()
485
    result = RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
468 486
    self._CheckRpcResult(result, self._nodes,
469 487
                         "Updating %s" % file_name)
470 488

  
471 489
  def _RenameFileUnlocked(self, old, new):
472 490
    os.rename(old, new)
473 491

  
474
    result = RpcRunner.call_jobqueue_rename(self._nodes, old, new)
492
    names, addrs = self._GetNodeIp()
493
    result = RpcRunner.call_jobqueue_rename(names, addrs, old, new)
475 494
    self._CheckRpcResult(result, self._nodes,
476 495
                         "Moving %s to %s" % (old, new))
477 496

  

Also available in: Unified diff