Revision 99aabbed

b/daemons/ganeti-masterd
317 317
    self.cfg.AddNode(node)
318 318

  
319 319
    # If preseeding fails it'll not be added
320
    self.jobqueue.AddNode(node.name)
320
    self.jobqueue.AddNode(node)
321 321

  
322 322
    # Add the new node to the Ganeti Lock Manager
323 323
    self.glm.add(locking.LEVEL_NODE, node.name)
......
327 327

  
328 328
    """
329 329
    # Synchronize the queue again
330
    self.jobqueue.AddNode(node.name)
330
    self.jobqueue.AddNode(node)
331 331

  
332 332
  def RemoveNode(self, name):
333 333
    """Removes a node from the configuration and lock manager.
b/lib/jqueue.py
364 364
                                           " check in jstore and here")
365 365

  
366 366
    # Get initial list of nodes
367
    self._nodes = set(self.context.cfg.GetNodeList())
367
    self._nodes = dict((n.name, n.primary_ip)
368
                       for n in self.context.cfg.GetAllNodesInfo().values())
368 369

  
369 370
    # Remove master node
370 371
    try:
371
      self._nodes.remove(self._my_hostname)
372
      del self._nodes[self._my_hostname]
372 373
    except ValueError:
373 374
      pass
374 375

  
......
405 406

  
406 407
  @utils.LockedMethod
407 408
  @_RequireOpenQueue
408
  def AddNode(self, node_name):
409
  def AddNode(self, node):
410
    """Register a new node with the queue.
411

  
412
    @type node: L{objects.Node}
413
    @param node: the node object to be added
414

  
415
    """
416
    node_name = node.name
409 417
    assert node_name != self._my_hostname
410 418

  
411 419
    # Clean queue directory on added node
......
425 433
      finally:
426 434
        fd.close()
427 435

  
428
      result = RpcRunner.call_jobqueue_update([node_name], file_name, content)
436
      result = RpcRunner.call_jobqueue_update([node_name], [node.primary_ip],
437
                                              file_name, content)
429 438
      if not result[node_name]:
430 439
        logging.error("Failed to upload %s to %s", file_name, node_name)
431 440

  
432
    self._nodes.add(node_name)
441
    self._nodes[node_name] = node.primary_ip
433 442

  
434 443
  @utils.LockedMethod
435 444
  @_RequireOpenQueue
436 445
  def RemoveNode(self, node_name):
437 446
    try:
438 447
      # The queue is removed by the "leave node" RPC call.
439
      self._nodes.remove(node_name)
448
      del self._nodes[node_name]
440 449
    except KeyError:
441 450
      pass
442 451

  
......
458 467
      # TODO: Handle failing nodes
459 468
      logging.error("More than half of the nodes failed")
460 469

  
470
  def _GetNodeIp(self):
471
    """Helper for returning the node name/ip list.
472

  
473
    """
474
    name_list = self._nodes.keys()
475
    addr_list = [self._nodes[name] for name in name_list]
476
    return name_list, addr_list
477

  
461 478
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
462 479
    """Writes a file locally and then replicates it to all nodes.
463 480

  
464 481
    """
465 482
    utils.WriteFile(file_name, data=data)
466 483

  
467
    result = RpcRunner.call_jobqueue_update(self._nodes, file_name, data)
484
    names, addrs = self._GetNodeIp()
485
    result = RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
468 486
    self._CheckRpcResult(result, self._nodes,
469 487
                         "Updating %s" % file_name)
470 488

  
471 489
  def _RenameFileUnlocked(self, old, new):
472 490
    os.rename(old, new)
473 491

  
474
    result = RpcRunner.call_jobqueue_rename(self._nodes, old, new)
492
    names, addrs = self._GetNodeIp()
493
    result = RpcRunner.call_jobqueue_rename(names, addrs, old, new)
475 494
    self._CheckRpcResult(result, self._nodes,
476 495
                         "Moving %s to %s" % (old, new))
477 496

  
b/lib/rpc.py
941 941
    return c.GetResults().get(node, False)
942 942

  
943 943
  @staticmethod
944
  def call_jobqueue_update(node_list, file_name, content):
944
  def call_jobqueue_update(node_list, address_list, file_name, content):
945 945
    """Update job queue.
946 946

  
947 947
    This is a multi-node call.
948 948

  
949 949
    """
950 950
    c = Client("jobqueue_update", [file_name, content])
951
    c.ConnectList(node_list)
951
    c.ConnectList(node_list, address_list=address_list)
952 952
    c.Run()
953 953
    result = c.GetResults()
954 954
    return result
......
966 966
    return c.GetResults().get(node, False)
967 967

  
968 968
  @staticmethod
969
  def call_jobqueue_rename(node_list, old, new):
969
  def call_jobqueue_rename(node_list, address_list, old, new):
970 970
    """Rename a job queue file.
971 971

  
972 972
    This is a multi-node call.
973 973

  
974 974
    """
975 975
    c = Client("jobqueue_rename", [old, new])
976
    c.ConnectList(node_list)
976
    c.ConnectList(node_list, address_list=address_list)
977 977
    c.Run()
978 978
    result = c.GetResults()
979 979
    return result

Also available in: Unified diff