Revision 99aabbed lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
364 | 364 |
" check in jstore and here") |
365 | 365 |
|
366 | 366 |
# Get initial list of nodes |
367 |
self._nodes = set(self.context.cfg.GetNodeList()) |
|
367 |
self._nodes = dict((n.name, n.primary_ip) |
|
368 |
for n in self.context.cfg.GetAllNodesInfo().values()) |
|
368 | 369 |
|
369 | 370 |
# Remove master node |
370 | 371 |
try: |
371 |
self._nodes.remove(self._my_hostname)
|
|
372 |
del self._nodes[self._my_hostname]
|
|
372 | 373 |
except ValueError: |
373 | 374 |
pass |
374 | 375 |
|
... | ... | |
405 | 406 |
|
406 | 407 |
@utils.LockedMethod |
407 | 408 |
@_RequireOpenQueue |
408 |
def AddNode(self, node_name): |
|
409 |
def AddNode(self, node): |
|
410 |
"""Register a new node with the queue. |
|
411 |
|
|
412 |
@type node: L{objects.Node} |
|
413 |
@param node: the node object to be added |
|
414 |
|
|
415 |
""" |
|
416 |
node_name = node.name |
|
409 | 417 |
assert node_name != self._my_hostname |
410 | 418 |
|
411 | 419 |
# Clean queue directory on added node |
... | ... | |
425 | 433 |
finally: |
426 | 434 |
fd.close() |
427 | 435 |
|
428 |
result = RpcRunner.call_jobqueue_update([node_name], file_name, content) |
|
436 |
result = RpcRunner.call_jobqueue_update([node_name], [node.primary_ip], |
|
437 |
file_name, content) |
|
429 | 438 |
if not result[node_name]: |
430 | 439 |
logging.error("Failed to upload %s to %s", file_name, node_name) |
431 | 440 |
|
432 |
self._nodes.add(node_name)
|
|
441 |
self._nodes[node_name] = node.primary_ip
|
|
433 | 442 |
|
434 | 443 |
@utils.LockedMethod |
435 | 444 |
@_RequireOpenQueue |
436 | 445 |
def RemoveNode(self, node_name): |
437 | 446 |
try: |
438 | 447 |
# The queue is removed by the "leave node" RPC call. |
439 |
self._nodes.remove(node_name)
|
|
448 |
del self._nodes[node_name]
|
|
440 | 449 |
except KeyError: |
441 | 450 |
pass |
442 | 451 |
|
... | ... | |
458 | 467 |
# TODO: Handle failing nodes |
459 | 468 |
logging.error("More than half of the nodes failed") |
460 | 469 |
|
470 |
def _GetNodeIp(self): |
|
471 |
"""Helper for returning the node name/ip list. |
|
472 |
|
|
473 |
""" |
|
474 |
name_list = self._nodes.keys() |
|
475 |
addr_list = [self._nodes[name] for name in name_list] |
|
476 |
return name_list, addr_list |
|
477 |
|
|
461 | 478 |
def _WriteAndReplicateFileUnlocked(self, file_name, data): |
462 | 479 |
"""Writes a file locally and then replicates it to all nodes. |
463 | 480 |
|
464 | 481 |
""" |
465 | 482 |
utils.WriteFile(file_name, data=data) |
466 | 483 |
|
467 |
result = RpcRunner.call_jobqueue_update(self._nodes, file_name, data) |
|
484 |
names, addrs = self._GetNodeIp() |
|
485 |
result = RpcRunner.call_jobqueue_update(names, addrs, file_name, data) |
|
468 | 486 |
self._CheckRpcResult(result, self._nodes, |
469 | 487 |
"Updating %s" % file_name) |
470 | 488 |
|
471 | 489 |
def _RenameFileUnlocked(self, old, new): |
472 | 490 |
os.rename(old, new) |
473 | 491 |
|
474 |
result = RpcRunner.call_jobqueue_rename(self._nodes, old, new) |
|
492 |
names, addrs = self._GetNodeIp() |
|
493 |
result = RpcRunner.call_jobqueue_rename(names, addrs, old, new) |
|
475 | 494 |
self._CheckRpcResult(result, self._nodes, |
476 | 495 |
"Moving %s to %s" % (old, new)) |
477 | 496 |
|
Also available in: Unified diff