Revision 7e49b6ce
b/lib/cli.py | ||
---|---|---|
36 | 36 |
from ganeti import luxi |
37 | 37 |
from ganeti import ssconf |
38 | 38 |
from ganeti import rpc |
39 |
from ganeti import ssh |
|
39 | 40 |
|
40 | 41 |
from optparse import (OptionParser, TitledHelpFormatter, |
41 | 42 |
Option, OptionValueError) |
... | ... | |
128 | 129 |
"JobExecutor", |
129 | 130 |
"JobSubmittedException", |
130 | 131 |
"ParseTimespec", |
132 |
"RunWhileClusterStopped", |
|
131 | 133 |
"SubmitOpCode", |
132 | 134 |
"SubmitOrSend", |
133 | 135 |
"UsesRPC", |
... | ... | |
1549 | 1551 |
return 0 |
1550 | 1552 |
|
1551 | 1553 |
|
1554 |
class _RunWhileClusterStoppedHelper: |
|
1555 |
"""Helper class for L{RunWhileClusterStopped} to simplify state management |
|
1556 |
|
|
1557 |
""" |
|
1558 |
def __init__(self, feedback_fn, cluster_name, master_node, online_nodes): |
|
1559 |
"""Initializes this class. |
|
1560 |
|
|
1561 |
@type feedback_fn: callable |
|
1562 |
@param feedback_fn: Feedback function |
|
1563 |
@type cluster_name: string |
|
1564 |
@param cluster_name: Cluster name |
|
1565 |
@type master_node: string |
|
1566 |
@param master_node Master node name |
|
1567 |
@type online_nodes: list |
|
1568 |
@param online_nodes: List of names of online nodes |
|
1569 |
|
|
1570 |
""" |
|
1571 |
self.feedback_fn = feedback_fn |
|
1572 |
self.cluster_name = cluster_name |
|
1573 |
self.master_node = master_node |
|
1574 |
self.online_nodes = online_nodes |
|
1575 |
|
|
1576 |
self.ssh = ssh.SshRunner(self.cluster_name) |
|
1577 |
|
|
1578 |
self.nonmaster_nodes = [name for name in online_nodes |
|
1579 |
if name != master_node] |
|
1580 |
|
|
1581 |
assert self.master_node not in self.nonmaster_nodes |
|
1582 |
|
|
1583 |
def _RunCmd(self, node_name, cmd): |
|
1584 |
"""Runs a command on the local or a remote machine. |
|
1585 |
|
|
1586 |
@type node_name: string |
|
1587 |
@param node_name: Machine name |
|
1588 |
@type cmd: list |
|
1589 |
@param cmd: Command |
|
1590 |
|
|
1591 |
""" |
|
1592 |
if node_name is None or node_name == self.master_node: |
|
1593 |
# No need to use SSH |
|
1594 |
result = utils.RunCmd(cmd) |
|
1595 |
else: |
|
1596 |
result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd)) |
|
1597 |
|
|
1598 |
if result.failed: |
|
1599 |
errmsg = ["Failed to run command %s" % result.cmd] |
|
1600 |
if node_name: |
|
1601 |
errmsg.append("on node %s" % node_name) |
|
1602 |
errmsg.append(": exitcode %s and error %s" % |
|
1603 |
(result.exit_code, result.output)) |
|
1604 |
raise errors.OpExecError(" ".join(errmsg)) |
|
1605 |
|
|
1606 |
def Call(self, fn, *args): |
|
1607 |
"""Call function while all daemons are stopped. |
|
1608 |
|
|
1609 |
@type fn: callable |
|
1610 |
@param fn: Function to be called |
|
1611 |
|
|
1612 |
""" |
|
1613 |
# Pause watcher by acquiring an exclusive lock on watcher state file |
|
1614 |
self.feedback_fn("Blocking watcher") |
|
1615 |
watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE) |
|
1616 |
try: |
|
1617 |
# TODO: Currently, this just blocks. There's no timeout. |
|
1618 |
# TODO: Should it be a shared lock? |
|
1619 |
watcher_block.Exclusive(blocking=True) |
|
1620 |
|
|
1621 |
# Stop master daemons, so that no new jobs can come in and all running |
|
1622 |
# ones are finished |
|
1623 |
self.feedback_fn("Stopping master daemons") |
|
1624 |
self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"]) |
|
1625 |
try: |
|
1626 |
# Stop daemons on all nodes |
|
1627 |
for node_name in self.online_nodes: |
|
1628 |
self.feedback_fn("Stopping daemons on %s" % node_name) |
|
1629 |
self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"]) |
|
1630 |
|
|
1631 |
# All daemons are shut down now |
|
1632 |
try: |
|
1633 |
return fn(self, *args) |
|
1634 |
except Exception: |
|
1635 |
logging.exception("Caught exception") |
|
1636 |
raise |
|
1637 |
finally: |
|
1638 |
# Start cluster again, master node last |
|
1639 |
for node_name in self.nonmaster_nodes + [self.master_node]: |
|
1640 |
self.feedback_fn("Starting daemons on %s" % node_name) |
|
1641 |
self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"]) |
|
1642 |
finally: |
|
1643 |
# Resume watcher |
|
1644 |
watcher_block.Close() |
|
1645 |
|
|
1646 |
|
|
1647 |
def RunWhileClusterStopped(feedback_fn, fn, *args): |
|
1648 |
"""Calls a function while all cluster daemons are stopped. |
|
1649 |
|
|
1650 |
@type feedback_fn: callable |
|
1651 |
@param feedback_fn: Feedback function |
|
1652 |
@type fn: callable |
|
1653 |
@param fn: Function to be called when daemons are stopped |
|
1654 |
|
|
1655 |
""" |
|
1656 |
feedback_fn("Gathering cluster information") |
|
1657 |
|
|
1658 |
# This ensures we're running on the master daemon |
|
1659 |
cl = GetClient() |
|
1660 |
|
|
1661 |
(cluster_name, master_node) = \ |
|
1662 |
cl.QueryConfigValues(["cluster_name", "master_node"]) |
|
1663 |
|
|
1664 |
online_nodes = GetOnlineNodes([], cl=cl) |
|
1665 |
|
|
1666 |
# Don't keep a reference to the client. The master daemon will go away. |
|
1667 |
del cl |
|
1668 |
|
|
1669 |
assert master_node in online_nodes |
|
1670 |
|
|
1671 |
return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node, |
|
1672 |
online_nodes).Call(fn, *args) |
|
1673 |
|
|
1674 |
|
|
1552 | 1675 |
def GenerateTable(headers, fields, separator, data, |
1553 | 1676 |
numfields=None, unitfields=None, |
1554 | 1677 |
units=None): |
Also available in: Unified diff