Fix burnin error when trying to grow a file volume
[ganeti-local] / lib / cli.py
index 92df9c5..5df4aae 100644 (file)
@@ -36,6 +36,7 @@ from ganeti import opcodes
 from ganeti import luxi
 from ganeti import ssconf
 from ganeti import rpc
+from ganeti import ssh
 
 from optparse import (OptionParser, TitledHelpFormatter,
                       Option, OptionValueError)
@@ -78,12 +79,16 @@ __all__ = [
   "MASTER_NETDEV_OPT",
   "MC_OPT",
   "NET_OPT",
+  "NEW_CLUSTER_CERT_OPT",
+  "NEW_CONFD_HMAC_KEY_OPT",
+  "NEW_RAPI_CERT_OPT",
   "NEW_SECONDARY_OPT",
   "NIC_PARAMS_OPT",
   "NODE_LIST_OPT",
   "NODE_PLACEMENT_OPT",
   "NOHDR_OPT",
   "NOIPCHECK_OPT",
+  "NO_INSTALL_OPT",
   "NONAMECHECK_OPT",
   "NOLVM_STORAGE_OPT",
   "NOMODIFY_ETCHOSTS_OPT",
@@ -101,6 +106,7 @@ __all__ = [
   "OFFLINE_OPT",
   "OS_OPT",
   "OS_SIZE_OPT",
+  "RAPI_CERT_OPT",
   "READD_OPT",
   "REBOOT_TYPE_OPT",
   "SECONDARY_IP_OPT",
@@ -117,6 +123,7 @@ __all__ = [
   "TAG_SRC_OPT",
   "TIMEOUT_OPT",
   "USEUNITS_OPT",
+  "USE_REPL_NET_OPT",
   "VERBOSE_OPT",
   "VG_NAME_OPT",
   "YES_DOIT_OPT",
@@ -128,6 +135,7 @@ __all__ = [
   "JobExecutor",
   "JobSubmittedException",
   "ParseTimespec",
+  "RunWhileClusterStopped",
   "SubmitOpCode",
   "SubmitOrSend",
   "UsesRPC",
@@ -449,6 +457,21 @@ def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
   return _SplitKeyVal(opt, value)
 
 
+def check_bool(option, opt, value): # pylint: disable-msg=W0613
+  """Custom parser for yes/no options.
+
+  This will store the parsed value as either True or False.
+
+  """
+  value = value.lower()
+  if value == constants.VALUE_FALSE or value == "no":
+    return False
+  elif value == constants.VALUE_TRUE or value == "yes":
+    return True
+  else:
+    raise errors.ParameterError("Invalid boolean value '%s'" % value)
+
+
 # completion_suggestion is normally a list. Using numeric values not evaluating
 # to False for dynamic completion.
 (OPT_COMPL_MANY_NODES,
@@ -479,18 +502,19 @@ class CliOption(Option):
     "identkeyval",
     "keyval",
     "unit",
+    "bool",
     )
   TYPE_CHECKER = Option.TYPE_CHECKER.copy()
   TYPE_CHECKER["identkeyval"] = check_ident_key_val
   TYPE_CHECKER["keyval"] = check_key_val
   TYPE_CHECKER["unit"] = check_unit
+  TYPE_CHECKER["bool"] = check_bool
 
 
 # optparse.py sets make_option, so we do it for our own option class, too
 cli_option = CliOption
 
 
-_YESNO = ("yes", "no")
 _YORNO = "yes|no"
 
 DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
@@ -585,6 +609,11 @@ FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
                                action="store_true", default=False,
                                help="Force an unknown variant")
 
+NO_INSTALL_OPT = cli_option("--no-install", dest="no_install",
+                            action="store_true", default=False,
+                            help="Do not install the OS (will"
+                            " enable no-start)")
+
 BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
                          type="keyval", default={},
                          help="Backend parameters")
@@ -746,19 +775,19 @@ NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
 
 
 MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
-                    choices=_YESNO, default=None, metavar=_YORNO,
+                    type="bool", default=None, metavar=_YORNO,
                     help="Set the master_candidate flag on the node")
 
 OFFLINE_OPT = cli_option("-O", "--offline", dest="offline", metavar=_YORNO,
-                         choices=_YESNO, default=None,
+                         type="bool", default=None,
                          help="Set the offline flag on the node")
 
 DRAINED_OPT = cli_option("-D", "--drained", dest="drained", metavar=_YORNO,
-                         choices=_YESNO, default=None,
+                         type="bool", default=None,
                          help="Set the drained flag on the node")
 
 ALLOCATABLE_OPT = cli_option("--allocatable", dest="allocatable",
-                             choices=_YESNO, default=None, metavar=_YORNO,
+                             type="bool", default=None, metavar=_YORNO,
                              help="Set the allocatable flag on a volume")
 
 NOLVM_STORAGE_OPT = cli_option("--no-lvm-storage", dest="lvm_storage",
@@ -858,6 +887,32 @@ EARLY_RELEASE_OPT = cli_option("--early-release",
                                help="Release the locks on the secondary"
                                " node(s) early")
 
+NEW_CLUSTER_CERT_OPT = cli_option("--new-cluster-certificate",
+                                  dest="new_cluster_cert",
+                                  default=False, action="store_true",
+                                  help="Generate a new cluster certificate")
+
+RAPI_CERT_OPT = cli_option("--rapi-certificate", dest="rapi_cert",
+                           default=None,
+                           help="File containing new RAPI certificate")
+
+NEW_RAPI_CERT_OPT = cli_option("--new-rapi-certificate", dest="new_rapi_cert",
+                               default=None, action="store_true",
+                               help=("Generate a new self-signed RAPI"
+                                     " certificate"))
+
+NEW_CONFD_HMAC_KEY_OPT = cli_option("--new-confd-hmac-key",
+                                    dest="new_confd_hmac_key",
+                                    default=False, action="store_true",
+                                    help=("Create a new HMAC key for %s" %
+                                          constants.CONFD))
+
+USE_REPL_NET_OPT = cli_option("--use-replication-network",
+                              dest="use_replication_network",
+                              help="Whether to use the replication network"
+                              " for talking to the nodes",
+                              action="store_true", default=False)
+
 
 def _ParseArgs(argv, commands, aliases):
   """Parser for the command line arguments.
@@ -1500,13 +1555,23 @@ def GenericInstanceCreate(mode, opts, args):
       if not isinstance(ddict, dict):
         msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
         raise errors.OpPrereqError(msg)
-      elif "size" not in ddict:
-        raise errors.OpPrereqError("Missing size for disk %d" % didx)
-      try:
-        ddict["size"] = utils.ParseUnit(ddict["size"])
-      except ValueError, err:
-        raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
-                                   (didx, err))
+      elif "size" in ddict:
+        if "adopt" in ddict:
+          raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
+                                     " (disk %d)" % didx)
+        try:
+          ddict["size"] = utils.ParseUnit(ddict["size"])
+        except ValueError, err:
+          raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
+                                     (didx, err))
+      elif "adopt" in ddict:
+        if mode == constants.INSTANCE_IMPORT:
+          raise errors.OpPrereqError("Disk adoption not allowed for instance"
+                                     " import")
+        ddict["size"] = 0
+      else:
+        raise errors.OpPrereqError("Missing size or adoption source for"
+                                   " disk %d" % didx)
       disks[didx] = ddict
 
   utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
@@ -1517,11 +1582,13 @@ def GenericInstanceCreate(mode, opts, args):
     os_type = opts.os
     src_node = None
     src_path = None
+    no_install = opts.no_install
   elif mode == constants.INSTANCE_IMPORT:
     start = False
     os_type = None
     src_node = opts.src_node
     src_path = opts.src_dir
+    no_install = None
   else:
     raise errors.ProgrammerError("Invalid creation mode %s" % mode)
 
@@ -1543,12 +1610,136 @@ def GenericInstanceCreate(mode, opts, args):
                                 start=start,
                                 os_type=os_type,
                                 src_node=src_node,
-                                src_path=src_path)
+                                src_path=src_path,
+                                no_install=no_install)
 
   SubmitOrSend(op, opts)
   return 0
 
 
+class _RunWhileClusterStoppedHelper:
+  """Helper class for L{RunWhileClusterStopped} to simplify state management
+
+  """
+  def __init__(self, feedback_fn, cluster_name, master_node, online_nodes):
+    """Initializes this class.
+
+    @type feedback_fn: callable
+    @param feedback_fn: Feedback function
+    @type cluster_name: string
+    @param cluster_name: Cluster name
+    @type master_node: string
+    @param master_node Master node name
+    @type online_nodes: list
+    @param online_nodes: List of names of online nodes
+
+    """
+    self.feedback_fn = feedback_fn
+    self.cluster_name = cluster_name
+    self.master_node = master_node
+    self.online_nodes = online_nodes
+
+    self.ssh = ssh.SshRunner(self.cluster_name)
+
+    self.nonmaster_nodes = [name for name in online_nodes
+                            if name != master_node]
+
+    assert self.master_node not in self.nonmaster_nodes
+
+  def _RunCmd(self, node_name, cmd):
+    """Runs a command on the local or a remote machine.
+
+    @type node_name: string
+    @param node_name: Machine name
+    @type cmd: list
+    @param cmd: Command
+
+    """
+    if node_name is None or node_name == self.master_node:
+      # No need to use SSH
+      result = utils.RunCmd(cmd)
+    else:
+      result = self.ssh.Run(node_name, "root", utils.ShellQuoteArgs(cmd))
+
+    if result.failed:
+      errmsg = ["Failed to run command %s" % result.cmd]
+      if node_name:
+        errmsg.append("on node %s" % node_name)
+      errmsg.append(": exitcode %s and error %s" %
+                    (result.exit_code, result.output))
+      raise errors.OpExecError(" ".join(errmsg))
+
+  def Call(self, fn, *args):
+    """Call function while all daemons are stopped.
+
+    @type fn: callable
+    @param fn: Function to be called
+
+    """
+    # Pause watcher by acquiring an exclusive lock on watcher state file
+    self.feedback_fn("Blocking watcher")
+    watcher_block = utils.FileLock.Open(constants.WATCHER_STATEFILE)
+    try:
+      # TODO: Currently, this just blocks. There's no timeout.
+      # TODO: Should it be a shared lock?
+      watcher_block.Exclusive(blocking=True)
+
+      # Stop master daemons, so that no new jobs can come in and all running
+      # ones are finished
+      self.feedback_fn("Stopping master daemons")
+      self._RunCmd(None, [constants.DAEMON_UTIL, "stop-master"])
+      try:
+        # Stop daemons on all nodes
+        for node_name in self.online_nodes:
+          self.feedback_fn("Stopping daemons on %s" % node_name)
+          self._RunCmd(node_name, [constants.DAEMON_UTIL, "stop-all"])
+
+        # All daemons are shut down now
+        try:
+          return fn(self, *args)
+        except Exception, err:
+          _, errmsg = FormatError(err)
+          logging.exception("Caught exception")
+          self.feedback_fn(errmsg)
+          raise
+      finally:
+        # Start cluster again, master node last
+        for node_name in self.nonmaster_nodes + [self.master_node]:
+          self.feedback_fn("Starting daemons on %s" % node_name)
+          self._RunCmd(node_name, [constants.DAEMON_UTIL, "start-all"])
+    finally:
+      # Resume watcher
+      watcher_block.Close()
+
+
+def RunWhileClusterStopped(feedback_fn, fn, *args):
+  """Calls a function while all cluster daemons are stopped.
+
+  @type feedback_fn: callable
+  @param feedback_fn: Feedback function
+  @type fn: callable
+  @param fn: Function to be called when daemons are stopped
+
+  """
+  feedback_fn("Gathering cluster information")
+
+  # This ensures we're running on the master daemon
+  cl = GetClient()
+
+  (cluster_name, master_node) = \
+    cl.QueryConfigValues(["cluster_name", "master_node"])
+
+  online_nodes = GetOnlineNodes([], cl=cl)
+
+  # Don't keep a reference to the client. The master daemon will go away.
+  del cl
+
+  assert master_node in online_nodes
+
+  return _RunWhileClusterStoppedHelper(feedback_fn, cluster_name, master_node,
+                                       online_nodes).Call(fn, *args)
+
+
 def GenerateTable(headers, fields, separator, data,
                   numfields=None, unitfields=None,
                   units=None):
@@ -1715,7 +1906,8 @@ def ParseTimespec(value):
   return value
 
 
-def GetOnlineNodes(nodes, cl=None, nowarn=False):
+def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
+                   filter_master=False):
   """Returns the names of online nodes.
 
   This function will also log a warning on stderr with the names of
@@ -1728,17 +1920,36 @@ def GetOnlineNodes(nodes, cl=None, nowarn=False):
   @param nowarn: by default, this function will output a note with the
       offline nodes that are skipped; if this parameter is True the
       note is not displayed
+  @type secondary_ips: boolean
+  @param secondary_ips: if True, return the secondary IPs instead of the
+      names, useful for doing network traffic over the replication interface
+      (if any)
+  @type filter_master: boolean
+  @param filter_master: if True, do not return the master node in the list
+      (useful in coordination with secondary_ips where we cannot check our
+      node name against the list)
 
   """
   if cl is None:
     cl = GetClient()
 
-  result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
+  if secondary_ips:
+    name_idx = 2
+  else:
+    name_idx = 0
+
+  if filter_master:
+    master_node = cl.QueryConfigValues(["master_node"])[0]
+    filter_fn = lambda x: x != master_node
+  else:
+    filter_fn = lambda _: True
+
+  result = cl.QueryNodes(names=nodes, fields=["name", "offline", "sip"],
                          use_locking=False)
   offline = [row[0] for row in result if row[1]]
   if offline and not nowarn:
     ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
-  return [row[0] for row in result if not row[1]]
+  return [row[name_idx] for row in result if not row[1] and filter_fn(row[0])]
 
 
 def _ToStream(stream, txt, *args):
@@ -1860,8 +2071,8 @@ class JobExecutor(object):
     # first, remove any non-submitted jobs
     self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
     for idx, _, jid, name in failures:
-        ToStderr("Failed to submit job for %s: %s", name, jid)
-        results.append((idx, False, jid))
+      ToStderr("Failed to submit job for %s: %s", name, jid)
+      results.append((idx, False, jid))
 
     while self.jobs:
       (idx, _, jid, name) = self._ChooseJob()