Revision 651ce6a3

b/lib/cli.py
229 229
  "GenericListFields",
230 230
  "GetClient",
231 231
  "GetOnlineNodes",
232
  "GetNodesSshPorts",
232 233
  "JobExecutor",
233 234
  "JobSubmittedException",
234 235
  "ParseTimespec",
......
2761 2762
  """Helper class for L{RunWhileClusterStopped} to simplify state management
2762 2763

  
2763 2764
  """
2764
  def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
2765
  def __init__(self, feedback_fn, cluster_name, master_node,
2766
               online_nodes, ssh_ports):
2765 2767
    """Initializes this class.
2766 2768

  
2767 2769
    @type feedback_fn: callable
......
2772 2774
    @param master_node Master node name
2773 2775
    @type online_nodes: list
2774 2776
    @param online_nodes: List of names of online nodes
2777
    @type ssh_ports: list
2778
    @param ssh_ports: List of SSH ports of online nodes
2775 2779

  
2776 2780
    """
2777 2781
    self.feedback_fn = feedback_fn
2778 2782
    self.cluster_name = cluster_name
2779 2783
    self.master_node = master_node
2780 2784
    self.online_nodes = online_nodes
2785
    self.ssh_ports = dict(zip(online_nodes, ssh_ports))
2781 2786

  
2782 2787
    self.ssh = ssh.SshRunner(self.cluster_name)
2783 2788

  
......
2800 2805
      result = utils.RunCmd(cmd)
2801 2806
    else:
2802 2807
      result = self.ssh.Run(node_name, constants.SSH_LOGIN_USER,
2803
                            utils.ShellQuoteArgs(cmd))
2808
                            utils.ShellQuoteArgs(cmd),
2809
                            port=self.ssh_ports[node_name])
2804 2810

  
2805 2811
    if result.failed:
2806 2812
      errmsg = ["Failed to run command %s" % result.cmd]
......
2871 2877
    cl.QueryConfigValues(["cluster_name", "master_node"])
2872 2878

  
2873 2879
  online_nodes = GetOnlineNodes([], cl=cl)
2880
  ssh_ports = GetNodesSshPorts(online_nodes, cl)
2874 2881

  
2875 2882
  # Don't keep a reference to the client. The master daemon will go away.
2876 2883
  del cl
......
2878 2885
  assert master_node in online_nodes
2879 2886

  
2880 2887
  return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
2881
                                       online_nodes).Call(fn, *args)
2888
                                       online_nodes, ssh_ports).Call(fn, *args)
2882 2889

  
2883 2890

  
2884 2891
def GenerateTable(headers, fields, separator, data,
......
3535 3542
  return map(fn, online)
3536 3543

  
3537 3544

  
3545
def GetNodesSshPorts(nodes, cl):
3546
  """Retrieves SSH ports of given nodes.
3547

  
3548
  @param nodes: the names of nodes
3549
  @type nodes: a list of strings
3550
  @param cl: a client to use for the query
3551
  @type cl: L{Client}
3552
  @return: the list of SSH ports corresponding to the nodes
3553
  @rtype: a list of tuples
3554
  """
3555
  return map(lambda t: t[0],
3556
             cl.QueryNodes(names=nodes,
3557
                           fields=["ndp/ssh_port"],
3558
                           use_locking=False))
3559

  
3560

  
3538 3561
def _ToStream(stream, txt, *args):
3539 3562
  """Write a message to a stream, bypassing the logging system
3540 3563

  
b/lib/client/gnt_cluster.py
570 570
                               errors.ECODE_INVAL)
571 571

  
572 572
  cl = GetClient()
573
  try:
574
    cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
573 575

  
574
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
575

  
576
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
577
                           secondary_ips=opts.use_replication_network,
578
                           nodegroup=opts.nodegroup)
576
    results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
577
                             secondary_ips=opts.use_replication_network,
578
                             nodegroup=opts.nodegroup)
579
    ports = GetNodesSshPorts(opts.nodes, cl)
580
  finally:
581
    cl.Close()
579 582

  
580 583
  srun = ssh.SshRunner(cluster_name)
581
  for node in results:
582
    if not srun.CopyFileToNode(node, filename):
583
      ToStderr("Copy of file %s to node %s failed", filename, node)
584
  for (node, port) in zip(results, ports):
585
    if not srun.CopyFileToNode(node, port, filename):
586
      ToStderr("Copy of file %s to node %s:%d failed", filename, node, port)
584 587

  
585 588
  return 0
586 589

  
......
600 603
  command = " ".join(args)
601 604

  
602 605
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
606
  ports = GetNodesSshPorts(nodes, cl)
603 607

  
604 608
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
605 609
                                                    "master_node"])
......
611 615
    nodes.remove(master_node)
612 616
    nodes.append(master_node)
613 617

  
614
  for name in nodes:
615
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
618
  for (name, port) in zip(nodes, ports):
619
    result = srun.Run(name, constants.SSH_LOGIN_USER, command, port=port)
616 620

  
617 621
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
618 622
      # Do not output anything for successful commands
......
978 982

  
979 983
    if files_to_copy:
980 984
      for node_name in ctx.nonmaster_nodes:
981
        ctx.feedback_fn("Copying %s to %s" %
982
                        (", ".join(files_to_copy), node_name))
985
        port = ctx.ssh_ports[node_name]
986
        ctx.feedback_fn("Copying %s to %s:%d" %
987
                        (", ".join(files_to_copy), node_name, port))
983 988
        for file_name in files_to_copy:
984
          ctx.ssh.CopyFileToNode(node_name, file_name)
989
          ctx.ssh.CopyFileToNode(node_name, port, file_name)
985 990

  
986 991
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
987 992

  
b/lib/client/gnt_node.py
249 249

  
250 250
  # Retrieve relevant parameters of the node group.
251 251
  ssh_port = None
252
  if opts.nodegroup:
253
    try:
254
      output = cl.QueryGroups(names=[opts.nodegroup], fields=["ndp/ssh_port"],
255
                              use_locking=False)
256
      (ssh_port, ) = output[0]
257
    except (errors.OpPrereqError, errors.OpExecError):
258
      pass
252
  try:
253
    # Passing [] to QueryGroups means query the default group:
254
    node_groups = [opts.nodegroup] if opts.nodegroup is not None else []
255
    output = cl.QueryGroups(names=node_groups, fields=["ndp/ssh_port"],
256
                            use_locking=False)
257
    (ssh_port, ) = output[0]
258
  except (errors.OpPrereqError, errors.OpExecError):
259
    pass
259 260

  
260 261
  try:
261 262
    output = cl.QueryNodes(names=[node],
b/lib/ssh.py
248 248
    """
249 249
    return utils.RunCmd(self.BuildCmd(*args, **kwargs))
250 250

  
251
  def CopyFileToNode(self, node, filename):
251
  def CopyFileToNode(self, node, port, filename):
252 252
    """Copy a file to another node with scp.
253 253

  
254 254
    @param node: node in the cluster
......
267 267
      return False
268 268

  
269 269
    command = [constants.SCP, "-p"]
270
    command.extend(self._BuildSshOptions(True, False, True, True))
270
    command.extend(self._BuildSshOptions(True, False, True, True, port=port))
271 271
    command.append(filename)
272 272
    if netutils.IP6Address.IsValid(node):
273 273
      node = netutils.FormatAddress((node, None))

Also available in: Unified diff