Add an upgrade command to gnt-cluster
[ganeti-local] / lib / client / gnt_cluster.py
index 8d81519..225cd56 100644 (file)
@@ -43,7 +43,9 @@ from ganeti import objects
 from ganeti import uidpool
 from ganeti import compat
 from ganeti import netutils
+from ganeti import ssconf
 from ganeti import pathutils
+from ganeti import qlang
 
 
 ON_OPT = cli_option("--on", default=False,
@@ -64,6 +66,12 @@ FORCE_DISTRIBUTION = cli_option("--yes-do-it", dest="yes_do_it",
                                 " is drained",
                                 default=False, action="store_true")
 
+TO_OPT = cli_option("--to", default=None, type="string",
+                    help="The Ganeti version to upgrade to")
+
+RESUME_OPT = cli_option("--resume", default=False, action="store_true",
+                        help="Resume any pending Ganeti upgrades")
+
 _EPO_PING_INTERVAL = 30 # 30 seconds between pings
 _EPO_PING_TIMEOUT = 1 # 1 second
 _EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
@@ -1617,6 +1625,278 @@ def ShowCreateCommand(opts, args):
   ToStdout(_GetCreateCommand(result))
 
 
+def _RunCommandAndReport(cmd):
+  """Run a command and report its output, iff it failed.
+
+  @param cmd: the command to execute
+  @type cmd: list
+  @rtype: bool
+  @return: False, if the execution failed.
+
+  """
+  result = utils.RunCmd(cmd)
+  if result.failed:
+    ToStderr("Command %s failed: %s; Output %s" %
+             (cmd, result.fail_reason, result.output))
+    return False
+  return True
+
+
+def _VerifyCommand(cmd):
+  """Verify that a given command succeeds on all online nodes.
+
+  As this function is intended to run during upgrades, it
+  is implemented in such a way that it still works, if all Ganeti
+  daemons are down.
+
+  @param cmd: the command to execute
+  @type cmd: list
+  @rtype: list
+  @return: the list of node names that are online where
+      the command failed.
+
+  """
+  command = utils.text.ShellQuoteArgs([str(val) for val in cmd])
+
+  nodes = ssconf.SimpleStore().GetOnlineNodeList()
+  master_node = ssconf.SimpleStore().GetMasterNode()
+  cluster_name = ssconf.SimpleStore().GetClusterName()
+
+  # If master node is in 'nodes', make sure master node is at list end
+  if master_node in nodes:
+    nodes.remove(master_node)
+    nodes.append(master_node)
+
+  failed = []
+
+  srun = ssh.SshRunner(cluster_name=cluster_name)
+  for name in nodes:
+    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
+    if result.exit_code != 0:
+      failed.append(name)
+
+  return failed
+
+
+def _VerifyVersionInstalled(versionstring):
+  """Verify that the given version of ganeti is installed on all online nodes.
+
+  Do nothing, if this is the case, otherwise print an appropriate
+  message to stderr.
+
+  @param versionstring: the version to check for
+  @type versionstring: string
+  @rtype: bool
+  @return: True, if the version is installed on all online nodes
+
+  """
+  badnodes = _VerifyCommand(["test", "-d",
+                             os.path.join(pathutils.PKGLIBDIR, versionstring)])
+  if badnodes:
+    ToStderr("Ganeti version %s not installed on nodes %s"
+             % (versionstring, ", ".join(badnodes)))
+    return False
+
+  return True
+
+
+def _GetRunning():
+  """Determine the list of running jobs.
+
+  @rtype: list
+  @return: the number of jobs still running
+
+  """
+  cl = GetClient()
+  qfilter = qlang.MakeSimpleFilter("status",
+                                   frozenset([constants.JOB_STATUS_RUNNING]))
+  return len(cl.Query(constants.QR_JOB, [], qfilter).data)
+
+
+def _SetGanetiVersion(versionstring):
+  """Set the active version of ganeti to the given versionstring
+
+  @type versionstring: string
+  @rtype: list
+  @return: the list of nodes where the version change failed
+
+  """
+  failed = []
+  failed.extend(_VerifyCommand(
+      ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
+  failed.extend(_VerifyCommand(
+      ["ln", "-s", "-f", os.path.join(pathutils.PKGLIBDIR, versionstring),
+       os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
+  failed.extend(_VerifyCommand(
+      ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
+  failed.extend(_VerifyCommand(
+      ["ln", "-s", "-f", os.path.join(pathutils.SHAREDIR, versionstring),
+       os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
+  return list(set(failed))
+
+
+def _ExecuteCommands(fns):
+  """Execute a list of functions, in reverse order.
+
+  @type fns: list of functions.
+  @param fns: the functions to be executed.
+
+  """
+  for fn in reversed(fns):
+    fn()
+
+
+# pylint: disable=R0911
+def UpgradeGanetiCommand(opts, args):
+  """Upgrade a cluster to a new ganeti version.
+
+  @param opts: the command line options selected by the user
+  @type args: list
+  @param args: should be an empty list
+  @rtype: int
+  @return: the desired exit code
+
+  """
+  if ((not opts.resume and opts.to is None)
+      or (opts.resume and opts.to is not None)):
+    ToStderr("Precisely one of the options --to and --resume"
+             " has to be given")
+    return 1
+
+  if opts.resume:
+    # TODO: implement
+    ToStderr("The --resume mode is not yet implemented")
+    return 1
+
+  rollback = []
+
+  versionstring = opts.to
+  version = utils.version.ParseVersion(versionstring)
+  if version is None:
+    ToStderr("Could not parse version string %s" % versionstring)
+    return 1
+
+  msg = utils.version.UpgradeRange(version)
+  if msg is not None:
+    ToStderr("Cannot upgrade to %s: %s" % (versionstring, msg))
+    return 1
+
+  downgrade = utils.version.ShouldCfgdowngrade(version)
+
+  if not _VerifyVersionInstalled(versionstring):
+    return 1
+
+  # TODO: write intent-to-upgrade file
+
+  ToStdout("Draining queue")
+  client = GetClient()
+  client.SetQueueDrainFlag(True)
+
+  rollback.append(lambda: GetClient().SetQueueDrainFlag(False))
+
+  if utils.SimpleRetry(0, _GetRunning,
+                       constants.UPGRADE_QUEUE_POLL_INTERVAL,
+                       constants.UPGRADE_QUEUE_DRAIN_TIMEOUT):
+    ToStderr("Failed to completely empty the queue.")
+    _ExecuteCommands(rollback)
+    return 1
+
+  ToStdout("Stopping daemons on master node.")
+  if not _RunCommandAndReport([pathutils.DAEMON_UTIL, "stop-all"]):
+    _ExecuteCommands(rollback)
+    return 1
+
+  if not _VerifyVersionInstalled(versionstring):
+    utils.RunCmd([pathutils.DAEMON_UTIL, "start-all"])
+    _ExecuteCommands(rollback)
+    return 1
+
+  ToStdout("Stopping daemons everywhere.")
+  rollback.append(lambda: _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
+  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
+  if badnodes:
+    ToStderr("Failed to stop daemons on %s." % (", ".join(badnodes),))
+    _ExecuteCommands(rollback)
+    return 1
+
+  backuptar = os.path.join(pathutils.LOCALSTATEDIR,
+                           "lib/ganeti%d.tar" % time.time())
+  ToStdout("Backing up configuration as %s" % backuptar)
+  if not _RunCommandAndReport(["tar", "cf", backuptar,
+                               pathutils.DATA_DIR]):
+    _ExecuteCommands(rollback)
+    return 1
+
+  if downgrade:
+    ToStdout("Downgrading configuration")
+    if not _RunCommandAndReport([pathutils.CFGUPGRADE, "--downgrade", "-f"]):
+      _ExecuteCommands(rollback)
+      return 1
+
+  # Configuration change is the point of no return. From then onwards, it is
+  # safer to push through the up/dowgrade than to try to roll it back.
+
+  returnvalue = 0
+
+  ToStdout("Switching to version %s on all nodes" % versionstring)
+  rollback.append(lambda: _SetGanetiVersion(constants.DIR_VERSION))
+  badnodes = _SetGanetiVersion(versionstring)
+  if badnodes:
+    ToStderr("Failed to switch to Ganeti version %s on nodes %s"
+             % (versionstring, ", ".join(badnodes)))
+    if not downgrade:
+      _ExecuteCommands(rollback)
+      return 1
+
+  # Now that we have changed to the new version of Ganeti we should
+  # not communicate over luxi any more, as luxi might have changed in
+  # incompatible ways. Therefore, manually call the corresponding ganeti
+  # commands using their canonical (version independent) path.
+
+  if not downgrade:
+    ToStdout("Upgrading configuration")
+    if not _RunCommandAndReport([pathutils.CFGUPGRADE, "-f"]):
+      _ExecuteCommands(rollback)
+      return 1
+
+  ToStdout("Starting daemons everywhere.")
+  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
+  if badnodes:
+    ToStderr("Warning: failed to start daemons on %s." % (", ".join(badnodes),))
+    returnvalue = 1
+
+  ToStdout("Ensuring directories everywhere.")
+  badnodes = _VerifyCommand([pathutils.ENSURE_DIRS])
+  if badnodes:
+    ToStderr("Warning: failed to ensure directories on %s." %
+             (", ".join(badnodes)))
+    returnvalue = 1
+
+  ToStdout("Redistributing the configuration.")
+  if not _RunCommandAndReport(["gnt-cluster", "redist-conf", "--yes-do-it"]):
+    returnvalue = 1
+
+  ToStdout("Restarting daemons everywhere.")
+  badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
+  badnodes.extend(_VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
+  if badnodes:
+    ToStderr("Warning: failed to start daemons on %s." %
+             (", ".join(list(set(badnodes))),))
+    returnvalue = 1
+
+  ToStdout("Undraining the queue.")
+  if not _RunCommandAndReport(["gnt-cluster", "queue", "undrain"]):
+    returnvalue = 1
+
+  # TODO: write intent-to-upgrade file
+
+  ToStdout("Verifying cluster.")
+  if not _RunCommandAndReport(["gnt-cluster", "verify"]):
+    returnvalue = 1
+
+  return returnvalue
+
+
 commands = {
   "init": (
     InitCluster, [ArgHost(min=1, max=1)],
@@ -1735,6 +2015,9 @@ commands = {
   "show-ispecs-cmd": (
     ShowCreateCommand, ARGS_NONE, [], "",
     "Show the command line to re-create the cluster"),
+  "upgrade": (
+    UpgradeGanetiCommand, ARGS_NONE, [TO_OPT, RESUME_OPT], "",
+    "Upgrade (or downgrade) to a new Ganeti version"),
   }