Revision 651ce6a3
b/lib/cli.py | ||
---|---|---|
229 | 229 |
"GenericListFields", |
230 | 230 |
"GetClient", |
231 | 231 |
"GetOnlineNodes", |
232 |
"GetNodesSshPorts", |
|
232 | 233 |
"JobExecutor", |
233 | 234 |
"JobSubmittedException", |
234 | 235 |
"ParseTimespec", |
... | ... | |
2761 | 2762 |
"""Helper class for L{RunWhileClusterStopped} to simplify state management |
2762 | 2763 |
|
2763 | 2764 |
""" |
2764 |
def __init__(self, feedback_fn, cluster_name, master_node, online_nodes): |
|
2765 |
def __init__(self, feedback_fn, cluster_name, master_node, |
|
2766 |
online_nodes, ssh_ports): |
|
2765 | 2767 |
"""Initializes this class. |
2766 | 2768 |
|
2767 | 2769 |
@type feedback_fn: callable |
... | ... | |
2772 | 2774 |
@param master_node Master node name |
2773 | 2775 |
@type online_nodes: list |
2774 | 2776 |
@param online_nodes: List of names of online nodes |
2777 |
@type ssh_ports: list |
|
2778 |
@param ssh_ports: List of SSH ports of online nodes |
|
2775 | 2779 |
|
2776 | 2780 |
""" |
2777 | 2781 |
self.feedback_fn = feedback_fn |
2778 | 2782 |
self.cluster_name = cluster_name |
2779 | 2783 |
self.master_node = master_node |
2780 | 2784 |
self.online_nodes = online_nodes |
2785 |
self.ssh_ports = dict(zip(online_nodes, ssh_ports)) |
|
2781 | 2786 |
|
2782 | 2787 |
self.ssh = ssh.SshRunner(self.cluster_name) |
2783 | 2788 |
|
... | ... | |
2800 | 2805 |
result = utils.RunCmd(cmd) |
2801 | 2806 |
else: |
2802 | 2807 |
result = self.ssh.Run(node_name, constants.SSH_LOGIN_USER, |
2803 |
utils.ShellQuoteArgs(cmd)) |
|
2808 |
utils.ShellQuoteArgs(cmd), |
|
2809 |
port=self.ssh_ports[node_name]) |
|
2804 | 2810 |
|
2805 | 2811 |
if result.failed: |
2806 | 2812 |
errmsg = ["Failed to run command %s" % result.cmd] |
... | ... | |
2871 | 2877 |
cl.QueryConfigValues(["cluster_name", "master_node"]) |
2872 | 2878 |
|
2873 | 2879 |
online_nodes = GetOnlineNodes([], cl=cl) |
2880 |
ssh_ports = GetNodesSshPorts(online_nodes, cl) |
|
2874 | 2881 |
|
2875 | 2882 |
# Don't keep a reference to the client. The master daemon will go away. |
2876 | 2883 |
del cl |
... | ... | |
2878 | 2885 |
assert master_node in online_nodes |
2879 | 2886 |
|
2880 | 2887 |
return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node, |
2881 |
online_nodes).Call(fn, *args) |
|
2888 |
online_nodes, ssh_ports).Call(fn, *args)
|
|
2882 | 2889 |
|
2883 | 2890 |
|
2884 | 2891 |
def GenerateTable(headers, fields, separator, data, |
... | ... | |
3535 | 3542 |
return map(fn, online) |
3536 | 3543 |
|
3537 | 3544 |
|
3545 |
def GetNodesSshPorts(nodes, cl): |
|
3546 |
"""Retrieves SSH ports of given nodes. |
|
3547 |
|
|
3548 |
@param nodes: the names of nodes |
|
3549 |
@type nodes: a list of strings |
|
3550 |
@param cl: a client to use for the query |
|
3551 |
@type cl: L{Client} |
|
3552 |
@return: the list of SSH ports corresponding to the nodes |
|
3553 |
@rtype: a list of tuples |
|
3554 |
""" |
|
3555 |
return map(lambda t: t[0], |
|
3556 |
cl.QueryNodes(names=nodes, |
|
3557 |
fields=["ndp/ssh_port"], |
|
3558 |
use_locking=False)) |
|
3559 |
|
|
3560 |
|
|
3538 | 3561 |
def _ToStream(stream, txt, *args): |
3539 | 3562 |
"""Write a message to a stream, bypassing the logging system |
3540 | 3563 |
|
b/lib/client/gnt_cluster.py | ||
---|---|---|
570 | 570 |
errors.ECODE_INVAL) |
571 | 571 |
|
572 | 572 |
cl = GetClient() |
573 |
try: |
|
574 |
cluster_name = cl.QueryConfigValues(["cluster_name"])[0] |
|
573 | 575 |
|
574 |
cluster_name = cl.QueryConfigValues(["cluster_name"])[0] |
|
575 |
|
|
576 |
results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True, |
|
577 |
secondary_ips=opts.use_replication_network, |
|
578 |
nodegroup=opts.nodegroup) |
|
576 |
results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True, |
|
577 |
secondary_ips=opts.use_replication_network, |
|
578 |
nodegroup=opts.nodegroup) |
|
579 |
ports = GetNodesSshPorts(opts.nodes, cl) |
|
580 |
finally: |
|
581 |
cl.Close() |
|
579 | 582 |
|
580 | 583 |
srun = ssh.SshRunner(cluster_name) |
581 |
for node in results:
|
|
582 |
if not srun.CopyFileToNode(node, filename): |
|
583 |
ToStderr("Copy of file %s to node %s failed", filename, node)
|
|
584 |
for (node, port) in zip(results, ports):
|
|
585 |
if not srun.CopyFileToNode(node, port, filename):
|
|
586 |
ToStderr("Copy of file %s to node %s:%d failed", filename, node, port)
|
|
584 | 587 |
|
585 | 588 |
return 0 |
586 | 589 |
|
... | ... | |
600 | 603 |
command = " ".join(args) |
601 | 604 |
|
602 | 605 |
nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup) |
606 |
ports = GetNodesSshPorts(nodes, cl) |
|
603 | 607 |
|
604 | 608 |
cluster_name, master_node = cl.QueryConfigValues(["cluster_name", |
605 | 609 |
"master_node"]) |
... | ... | |
611 | 615 |
nodes.remove(master_node) |
612 | 616 |
nodes.append(master_node) |
613 | 617 |
|
614 |
for name in nodes:
|
|
615 |
result = srun.Run(name, constants.SSH_LOGIN_USER, command) |
|
618 |
for (name, port) in zip(nodes, ports):
|
|
619 |
result = srun.Run(name, constants.SSH_LOGIN_USER, command, port=port)
|
|
616 | 620 |
|
617 | 621 |
if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS: |
618 | 622 |
# Do not output anything for successful commands |
... | ... | |
978 | 982 |
|
979 | 983 |
if files_to_copy: |
980 | 984 |
for node_name in ctx.nonmaster_nodes: |
981 |
ctx.feedback_fn("Copying %s to %s" % |
|
982 |
(", ".join(files_to_copy), node_name)) |
|
985 |
port = ctx.ssh_ports[node_name] |
|
986 |
ctx.feedback_fn("Copying %s to %s:%d" % |
|
987 |
(", ".join(files_to_copy), node_name, port)) |
|
983 | 988 |
for file_name in files_to_copy: |
984 |
ctx.ssh.CopyFileToNode(node_name, file_name) |
|
989 |
ctx.ssh.CopyFileToNode(node_name, port, file_name)
|
|
985 | 990 |
|
986 | 991 |
RunWhileClusterStopped(ToStdout, _RenewCryptoInner) |
987 | 992 |
|
b/lib/client/gnt_node.py | ||
---|---|---|
249 | 249 |
|
250 | 250 |
# Retrieve relevant parameters of the node group. |
251 | 251 |
ssh_port = None |
252 |
if opts.nodegroup: |
|
253 |
try: |
|
254 |
output = cl.QueryGroups(names=[opts.nodegroup], fields=["ndp/ssh_port"], |
|
255 |
use_locking=False) |
|
256 |
(ssh_port, ) = output[0] |
|
257 |
except (errors.OpPrereqError, errors.OpExecError): |
|
258 |
pass |
|
252 |
try: |
|
253 |
# Passing [] to QueryGroups means query the default group: |
|
254 |
node_groups = [opts.nodegroup] if opts.nodegroup is not None else [] |
|
255 |
output = cl.QueryGroups(names=node_groups, fields=["ndp/ssh_port"], |
|
256 |
use_locking=False) |
|
257 |
(ssh_port, ) = output[0] |
|
258 |
except (errors.OpPrereqError, errors.OpExecError): |
|
259 |
pass |
|
259 | 260 |
|
260 | 261 |
try: |
261 | 262 |
output = cl.QueryNodes(names=[node], |
b/lib/ssh.py | ||
---|---|---|
248 | 248 |
""" |
249 | 249 |
return utils.RunCmd(self.BuildCmd(*args, **kwargs)) |
250 | 250 |
|
251 |
def CopyFileToNode(self, node, filename): |
|
251 |
def CopyFileToNode(self, node, port, filename):
|
|
252 | 252 |
"""Copy a file to another node with scp. |
253 | 253 |
|
254 | 254 |
@param node: node in the cluster |
... | ... | |
267 | 267 |
return False |
268 | 268 |
|
269 | 269 |
command = [constants.SCP, "-p"] |
270 |
command.extend(self._BuildSshOptions(True, False, True, True)) |
|
270 |
command.extend(self._BuildSshOptions(True, False, True, True, port=port))
|
|
271 | 271 |
command.append(filename) |
272 | 272 |
if netutils.IP6Address.IsValid(node): |
273 | 273 |
node = netutils.FormatAddress((node, None)) |
Also available in: Unified diff