cli.JobExecutor: Handle empty name, allow adding job IDs
[ganeti-local] / lib / cli.py
index 98add1b..aba1f1b 100644 (file)
@@ -27,6 +27,8 @@ import textwrap
 import os.path
 import time
 import logging
+import errno
+import itertools
 from cStringIO import StringIO
 
 from ganeti import utils
@@ -51,6 +53,7 @@ __all__ = [
   "ALLOCATABLE_OPT",
   "ALLOC_POLICY_OPT",
   "ALL_OPT",
+  "ALLOW_FAILOVER_OPT",
   "AUTO_PROMOTE_OPT",
   "AUTO_REPLACE_OPT",
   "BACKEND_OPT",
@@ -76,6 +79,7 @@ __all__ = [
   "FIELDS_OPT",
   "FILESTORE_DIR_OPT",
   "FILESTORE_DRIVER_OPT",
+  "FORCE_FILTER_OPT",
   "FORCE_OPT",
   "FORCE_VARIANT_OPT",
   "GLOBAL_FILEDIR_OPT",
@@ -127,6 +131,7 @@ __all__ = [
   "NOSTART_OPT",
   "NOSSH_KEYCHECK_OPT",
   "NOVOTING_OPT",
+  "NO_REMEMBER_OPT",
   "NWSYNC_OPT",
   "ON_PRIMARY_OPT",
   "ON_SECONDARY_OPT",
@@ -343,7 +348,8 @@ ARGS_MANY_NODES = [ArgNode()]
 ARGS_MANY_GROUPS = [ArgGroup()]
 ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
 ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
-ARGS_ONE_GROUP = [ArgInstance(min=1, max=1)]
+# TODO
+ARGS_ONE_GROUP = [ArgGroup(min=1, max=1)]
 ARGS_ONE_OS = [ArgOs(min=1, max=1)]
 
 
@@ -358,7 +364,9 @@ def _ExtractTagsObject(opts, args):
   kind = opts.tag_type
   if kind == constants.TAG_CLUSTER:
     retval = kind, kind
-  elif kind == constants.TAG_NODE or kind == constants.TAG_INSTANCE:
+  elif kind in (constants.TAG_NODEGROUP,
+                constants.TAG_NODE,
+                constants.TAG_INSTANCE):
     if not args:
       raise errors.OpPrereqError("no arguments passed to the command")
     name = args.pop(0)
@@ -659,8 +667,8 @@ NWSYNC_OPT = cli_option("--no-wait-for-sync", dest="wait_for_sync",
                         help="Don't wait for sync (DANGEROUS!)")
 
 DISK_TEMPLATE_OPT = cli_option("-t", "--disk-template", dest="disk_template",
-                               help="Custom disk setup (diskless, file,"
-                               " plain or drbd)",
+                               help=("Custom disk setup (%s)" %
+                                     utils.CommaJoin(constants.DISK_TEMPLATES)),
                                default=None, metavar="TEMPL",
                                choices=list(constants.DISK_TEMPLATES))
 
@@ -758,6 +766,12 @@ IGNORE_CONSIST_OPT = cli_option("--ignore-consistency",
                                 help="Ignore the consistency of the disks on"
                                 " the secondary")
 
+ALLOW_FAILOVER_OPT = cli_option("--allow-failover",
+                                dest="allow_failover",
+                                action="store_true", default=False,
+                                help="If migration is not possible fallback to"
+                                     " failover")
+
 NONLIVE_OPT = cli_option("--non-live", dest="live",
                          default=True, action="store_false",
                          help="Do a non-live migration (this usually means"
@@ -854,12 +868,16 @@ NEW_SECONDARY_OPT = cli_option("-n", "--new-secondary", dest="dst_node",
 ON_PRIMARY_OPT = cli_option("-p", "--on-primary", dest="on_primary",
                             default=False, action="store_true",
                             help="Replace the disk(s) on the primary"
-                            " node (only for the drbd template)")
+                                 " node (applies only to internally mirrored"
+                                 " disk templates, e.g. %s)" %
+                                 utils.CommaJoin(constants.DTS_INT_MIRROR))
 
 ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
                               default=False, action="store_true",
                               help="Replace the disk(s) on the secondary"
-                              " node (only for the drbd template)")
+                                   " node (applies only to internally mirrored"
+                                   " disk templates, e.g. %s)" %
+                                   utils.CommaJoin(constants.DTS_INT_MIRROR))
 
 AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
                               default=False, action="store_true",
@@ -869,7 +887,9 @@ AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
 AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
                               default=False, action="store_true",
                               help="Automatically replace faulty disks"
-                              " (only for the drbd template)")
+                                   " (applies only to internally mirrored"
+                                   " disk templates, e.g. %s)" %
+                                   utils.CommaJoin(constants.DTS_INT_MIRROR))
 
 IGNORE_SIZE_OPT = cli_option("--ignore-size", dest="ignore_size",
                              default=False, action="store_true",
@@ -898,8 +918,7 @@ NOSSH_KEYCHECK_OPT = cli_option("--no-ssh-key-check", dest="ssh_key_check",
 
 NODE_FORCE_JOIN_OPT = cli_option("--force-join", dest="force_join",
                                  default=False, action="store_true",
-                                 help="Force the joining of a node,"
-                                      " needed when merging clusters")
+                                 help="Force the joining of a node")
 
 MC_OPT = cli_option("-C", "--master-candidate", dest="master_candidate",
                     type="bool", default=None, metavar=_YORNO,
@@ -1175,6 +1194,17 @@ POWER_DELAY_OPT = cli_option("--power-delay", dest="power_delay", type="float",
                              default=constants.OOB_POWER_DELAY,
                              help="Time in seconds to wait between power-ons")
 
+FORCE_FILTER_OPT = cli_option("-F", "--filter", dest="force_filter",
+                              action="store_true", default=False,
+                              help=("Whether command argument should be treated"
+                                    " as filter"))
+
+NO_REMEMBER_OPT = cli_option("--no-remember",
+                             dest="no_remember",
+                             action="store_true", default=False,
+                             help="Perform but do not record the change"
+                             " in the configuration")
+
 
 #: Options provided by all commands
 COMMON_OPTS = [DEBUG_OPT]
@@ -1895,6 +1925,9 @@ def FormatError(err):
                "%s" % msg)
   elif isinstance(err, errors.JobLost):
     obuf.write("Error checking job status: %s" % msg)
+  elif isinstance(err, errors.QueryFilterParseError):
+    obuf.write("Error while parsing query filter: %s\n" % err.args[0])
+    obuf.write("\n".join(err.GetDetails()))
   elif isinstance(err, errors.GenericError):
     obuf.write("Unhandled Ganeti error: %s" % msg)
   elif isinstance(err, JobSubmittedException):
@@ -1966,6 +1999,12 @@ def GenericMain(commands, override=None, aliases=None):
     ToStderr("Aborted. Note that if the operation created any jobs, they"
              " might have been submitted and"
              " will continue to run in the background.")
+  except IOError, err:
+    if err.errno == errno.EPIPE:
+      # our terminal went away, we'll exit
+      sys.exit(constants.EXIT_FAILURE)
+    else:
+      raise
 
   return result
 
@@ -2039,7 +2078,7 @@ def GenericInstanceCreate(mode, opts, args):
       raise errors.OpPrereqError("Please use either the '--disk' or"
                                  " '-s' option")
     if opts.sd_size is not None:
-      opts.disks = [(0, {"size": opts.sd_size})]
+      opts.disks = [(0, {constants.IDISK_SIZE: opts.sd_size})]
 
     if opts.disks:
       try:
@@ -2054,20 +2093,21 @@ def GenericInstanceCreate(mode, opts, args):
       if not isinstance(ddict, dict):
         msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
         raise errors.OpPrereqError(msg)
-      elif "size" in ddict:
-        if "adopt" in ddict:
+      elif constants.IDISK_SIZE in ddict:
+        if constants.IDISK_ADOPT in ddict:
           raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
                                      " (disk %d)" % didx)
         try:
-          ddict["size"] = utils.ParseUnit(ddict["size"])
+          ddict[constants.IDISK_SIZE] = \
+            utils.ParseUnit(ddict[constants.IDISK_SIZE])
         except ValueError, err:
           raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
                                      (didx, err))
-      elif "adopt" in ddict:
+      elif constants.IDISK_ADOPT in ddict:
         if mode == constants.INSTANCE_IMPORT:
           raise errors.OpPrereqError("Disk adoption not allowed for instance"
                                      " import")
-        ddict["size"] = 0
+        ddict[constants.IDISK_SIZE] = 0
       else:
         raise errors.OpPrereqError("Missing size or adoption source for"
                                    " disk %d" % didx)
@@ -2553,10 +2593,10 @@ def _WarnUnknownFields(fdefs):
 
 
 def GenericList(resource, fields, names, unit, separator, header, cl=None,
-                format_override=None, verbose=False):
+                format_override=None, verbose=False, force_filter=False):
   """Generic implementation for listing all items of a resource.
 
-  @param resource: One of L{constants.QR_OP_LUXI}
+  @param resource: One of L{constants.QR_VIA_LUXI}
   @type fields: list of strings
   @param fields: List of fields to query for
   @type names: list of strings
@@ -2569,6 +2609,8 @@ def GenericList(resource, fields, names, unit, separator, header, cl=None,
   @param separator: String used to separate fields
   @type header: bool
   @param header: Whether to show header row
+  @type force_filter: bool
+  @param force_filter: Whether to always treat names as filter
   @type format_override: dict
   @param format_override: Dictionary for overriding field formatting functions,
     indexed by field name, contents like L{_DEFAULT_FORMAT_QUERY}
@@ -2582,7 +2624,20 @@ def GenericList(resource, fields, names, unit, separator, header, cl=None,
   if not names:
     names = None
 
-  response = cl.Query(resource, fields, qlang.MakeSimpleFilter("name", names))
+  if (force_filter or
+      (names and len(names) == 1 and qlang.MaybeFilter(names[0]))):
+    try:
+      (filter_text, ) = names
+    except ValueError:
+      raise errors.OpPrereqError("Exactly one argument must be given as a"
+                                 " filter")
+
+    logging.debug("Parsing '%s' as filter", filter_text)
+    filter_ = qlang.ParseFilter(filter_text)
+  else:
+    filter_ = qlang.MakeSimpleFilter("name", names)
+
+  response = cl.Query(resource, fields, filter_)
 
   found_unknown = _WarnUnknownFields(response.fields)
 
@@ -2607,7 +2662,7 @@ def GenericList(resource, fields, names, unit, separator, header, cl=None,
 def GenericListFields(resource, fields, separator, header, cl=None):
   """Generic implementation for listing fields for a resource.
 
-  @param resource: One of L{constants.QR_OP_LUXI}
+  @param resource: One of L{constants.QR_VIA_LUXI}
   @type fields: list of strings
   @param fields: List of fields to query for
   @type separator: string or None
@@ -2837,13 +2892,20 @@ def _ToStream(stream, txt, *args):
   @param txt: the message
 
   """
-  if args:
-    args = tuple(args)
-    stream.write(txt % args)
-  else:
-    stream.write(txt)
-  stream.write('\n')
-  stream.flush()
+  try:
+    if args:
+      args = tuple(args)
+      stream.write(txt % args)
+    else:
+      stream.write(txt)
+    stream.write('\n')
+    stream.flush()
+  except IOError, err:
+    if err.errno == errno.EPIPE:
+      # our terminal went away, we'll exit
+      sys.exit(constants.EXIT_FAILURE)
+    else:
+      raise
 
 
 def ToStdout(txt, *args):
@@ -2886,15 +2948,33 @@ class JobExecutor(object):
     self.jobs = []
     self.opts = opts
     self.feedback_fn = feedback_fn
+    self._counter = itertools.count()
+
+  @staticmethod
+  def _IfName(name, fmt):
+    """Helper function for formatting name.
+
+    """
+    if name:
+      return fmt % name
+
+    return ""
 
   def QueueJob(self, name, *ops):
     """Record a job for later submit.
 
     @type name: string
     @param name: a description of the job, will be used in WaitJobSet
+
     """
     SetGenericOpcodeOpts(ops, self.opts)
-    self.queue.append((name, ops))
+    self.queue.append((self._counter.next(), name, ops))
+
+  def AddJobId(self, name, status, job_id):
+    """Adds a job ID to the internal queue.
+
+    """
+    self.jobs.append((self._counter.next(), status, job_id, name))
 
   def SubmitPending(self, each=False):
     """Submit all pending jobs.
@@ -2902,14 +2982,13 @@ class JobExecutor(object):
     """
     if each:
       results = []
-      for row in self.queue:
+      for (_, _, ops) in self.queue:
         # SubmitJob will remove the success status, but raise an exception if
         # the submission fails, so we'll notice that anyway.
-        results.append([True, self.cl.SubmitJob(row[1])])
+        results.append([True, self.cl.SubmitJob(ops)])
     else:
-      results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
-    for (idx, ((status, data), (name, _))) in enumerate(zip(results,
-                                                            self.queue)):
+      results = self.cl.SubmitManyJobs([ops for (_, _, ops) in self.queue])
+    for ((status, data), (idx, name, _)) in zip(results, self.queue):
       self.jobs.append((idx, status, data, name))
 
   def _ChooseJob(self):
@@ -2955,25 +3034,26 @@ class JobExecutor(object):
     # first, remove any non-submitted jobs
     self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
     for idx, _, jid, name in failures:
-      ToStderr("Failed to submit job for %s: %s", name, jid)
+      ToStderr("Failed to submit job%s: %s", self._IfName(name, " for %s"), jid)
       results.append((idx, False, jid))
 
     while self.jobs:
       (idx, _, jid, name) = self._ChooseJob()
-      ToStdout("Waiting for job %s for %s...", jid, name)
+      ToStdout("Waiting for job %s%s ...", jid, self._IfName(name, " for %s"))
       try:
         job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
         success = True
       except errors.JobLost, err:
         _, job_result = FormatError(err)
-        ToStderr("Job %s for %s has been archived, cannot check its result",
-                 jid, name)
+        ToStderr("Job %s%s has been archived, cannot check its result",
+                 jid, self._IfName(name, " for %s"))
         success = False
       except (errors.GenericError, luxi.ProtocolError), err:
         _, job_result = FormatError(err)
         success = False
         # the error message will always be shown, verbose or not
-        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
+        ToStderr("Job %s%s has failed: %s",
+                 jid, self._IfName(name, " for %s"), job_result)
 
       results.append((idx, success, job_result))