Revision 7e49b6ce

b/lib/cli.py
36 36
from ganeti import luxi
37 37
from ganeti import ssconf
38 38
from ganeti import rpc
39
from ganeti import ssh
39 40

  
40 41
from optparse import (OptionParser, TitledHelpFormatter,
41 42
                      Option, OptionValueError)
......
128 129
  "JobExecutor",
129 130
  "JobSubmittedException",
130 131
  "ParseTimespec",
132
  "RunWhileClusterStopped",
131 133
  "SubmitOpCode",
132 134
  "SubmitOrSend",
133 135
  "UsesRPC",
......
1549 1551
  return 0
1550 1552

  
1551 1553

  
1554
class _RunWhileClusterStoppedHelper:
1555
  """Helper class for L{RunWhileClusterStopped} to simplify state management
1556

  
1557
  """
1558
  def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
1559
    """Initializes this class.
1560

  
1561
    @type feedback_fn: callable
1562
    @param feedback_fn: Feedback function
1563
    @type cluster_name: string
1564
    @param cluster_name: Cluster name
1565
    @type master_node: string
1566
    @param master_node Master node name
1567
    @type online_nodes: list
1568
    @param online_nodes: List of names of online nodes
1569

  
1570
    """
1571
    self.feedback_fn = feedback_fn
1572
    self.cluster_name = cluster_name
1573
    self.master_node = master_node
1574
    self.online_nodes = online_nodes
1575

  
1576
    self.ssh = ssh.SshRunner(self.cluster_name)
1577

  
1578
    self.nonmaster_nodes = [name for name in online_nodes
1579
                            if name != master_node]
1580

  
1581
    assert self.master_node not in self.nonmaster_nodes
1582

  
1583
  def _RunCmd(self, node_name, cmd):
1584
    """Runs a command on the local or a remote machine.
1585

  
1586
    @type node_name: string
1587
    @param node_name: Machine name
1588
    @type cmd: list
1589
    @param cmd: Command
1590

  
1591
    """
1592
    if node_name is None or node_name == self.master_node:
1593
      # No need to use SSH
1594
      result = utils.RunCmd(cmd)
1595
    else:
1596
      result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
1597

  
1598
    if result.failed:
1599
      errmsg = ["Failed to run command %s" % result.cmd]
1600
      if node_name:
1601
        errmsg.append("on node %s" % node_name)
1602
      errmsg.append(": exitcode %s and error %s" %
1603
                    (result.exit_code, result.output))
1604
      raise errors.OpExecError(" ".join(errmsg))
1605

  
1606
  def Call(self, fn, *args):
1607
    """Call function while all daemons are stopped.
1608

  
1609
    @type fn: callable
1610
    @param fn: Function to be called
1611

  
1612
    """
1613
    # Pause watcher by acquiring an exclusive lock on watcher state file
1614
    self.feedback_fn("Blocking watcher")
1615
    watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
1616
    try:
1617
      # TODO: Currently, this just blocks. There's no timeout.
1618
      # TODO: Should it be a shared lock?
1619
      watcher_block.Exclusive(blocking=True)
1620

  
1621
      # Stop master daemons, so that no new jobs can come in and all running
1622
      # ones are finished
1623
      self.feedback_fn("Stopping master daemons")
1624
      self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
1625
      try:
1626
        # Stop daemons on all nodes
1627
        for node_name in self.online_nodes:
1628
          self.feedback_fn("Stopping daemons on %s" % node_name)
1629
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
1630

  
1631
        # All daemons are shut down now
1632
        try:
1633
          return fn(self, *args)
1634
        except Exception:
1635
          logging.exception("Caught exception")
1636
          raise
1637
      finally:
1638
        # Start cluster again, master node last
1639
        for node_name in self.nonmaster_nodes + [self.master_node]:
1640
          self.feedback_fn("Starting daemons on %s" % node_name)
1641
          self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
1642
    finally:
1643
      # Resume watcher
1644
      watcher_block.Close()
1645

  
1646

  
1647
def RunWhileClusterStopped(feedback_fn, fn, *args):
1648
  """Calls a function while all cluster daemons are stopped.
1649

  
1650
  @type feedback_fn: callable
1651
  @param feedback_fn: Feedback function
1652
  @type fn: callable
1653
  @param fn: Function to be called when daemons are stopped
1654

  
1655
  """
1656
  feedback_fn("Gathering cluster information")
1657

  
1658
  # This ensures we're running on the master daemon
1659
  cl = GetClient()
1660

  
1661
  (cluster_name, master_node) = \
1662
    cl.QueryConfigValues(["cluster_name", "master_node"])
1663

  
1664
  online_nodes = GetOnlineNodes([], cl=cl)
1665

  
1666
  # Don't keep a reference to the client. The master daemon will go away.
1667
  del cl
1668

  
1669
  assert master_node in online_nodes
1670

  
1671
  return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
1672
                                       online_nodes).Call(fn, *args)
1673

  
1674

  
1552 1675
def GenerateTable(headers, fields, separator, data,
1553 1676
                  numfields=None, unitfields=None,
1554 1677
                  units=None):

Also available in: Unified diff