cli.JobExecutor: poll jobs in execution order
[ganeti-local] / lib / cli.py
index e79f6c6..6ccf4d9 100644 (file)
@@ -45,6 +45,7 @@ __all__ = [
   # Command line options
   "ALLOCATABLE_OPT",
   "ALL_OPT",
+  "AUTO_PROMOTE_OPT",
   "AUTO_REPLACE_OPT",
   "BACKEND_OPT",
   "CLEANUP_OPT",
@@ -56,6 +57,7 @@ __all__ = [
   "DISK_OPT",
   "DISK_TEMPLATE_OPT",
   "DRAINED_OPT",
+  "EARLY_RELEASE_OPT",
   "ENABLED_HV_OPT",
   "ERROR_CODES_OPT",
   "FIELDS_OPT",
@@ -82,6 +84,7 @@ __all__ = [
   "NODE_PLACEMENT_OPT",
   "NOHDR_OPT",
   "NOIPCHECK_OPT",
+  "NONAMECHECK_OPT",
   "NOLVM_STORAGE_OPT",
   "NOMODIFY_ETCHOSTS_OPT",
   "NOMODIFY_SSH_SETUP_OPT",
@@ -144,6 +147,7 @@ __all__ = [
   "ARGS_NONE",
   "ARGS_ONE_INSTANCE",
   "ARGS_ONE_NODE",
+  "ARGS_ONE_OS",
   "ArgChoice",
   "ArgCommand",
   "ArgFile",
@@ -151,6 +155,7 @@ __all__ = [
   "ArgInstance",
   "ArgJobId",
   "ArgNode",
+  "ArgOs",
   "ArgSuggest",
   "ArgUnknown",
   "OPT_COMPL_INST_ADD_NODES",
@@ -169,7 +174,7 @@ UN_PREFIX = "-"
 
 
 class _Argument:
-  def __init__(self, min=0, max=None):
+  def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
     self.min = min
     self.max = max
 
@@ -184,6 +189,7 @@ class ArgSuggest(_Argument):
   Value can be any of the ones passed to the constructor.
 
   """
+  # pylint: disable-msg=W0622
   def __init__(self, min=0, max=None, choices=None):
     _Argument.__init__(self, min=min, max=max)
     self.choices = choices
@@ -243,12 +249,18 @@ class ArgHost(_Argument):
   """
 
 
+class ArgOs(_Argument):
+  """OS argument.
+
+  """
+
+
 ARGS_NONE = []
 ARGS_MANY_INSTANCES = [ArgInstance()]
 ARGS_MANY_NODES = [ArgNode()]
 ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
 ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
-
+ARGS_ONE_OS = [ArgOs(min=1, max=1)]
 
 
 def _ExtractTagsObject(opts, args):
@@ -311,8 +323,8 @@ def ListTags(opts, args):
 
   """
   kind, name = _ExtractTagsObject(opts, args)
-  op = opcodes.OpGetTags(kind=kind, name=name)
-  result = SubmitOpCode(op)
+  cl = GetClient()
+  result = cl.QueryTags(kind, name)
   result = list(result)
   result.sort()
   for tag in result:
@@ -353,7 +365,7 @@ def RemoveTags(opts, args):
   SubmitOpCode(op)
 
 
-def check_unit(option, opt, value):
+def check_unit(option, opt, value): # pylint: disable-msg=W0613
   """OptParsers custom converter for units.
 
   """
@@ -383,7 +395,7 @@ def _SplitKeyVal(opt, data):
   """
   kv_dict = {}
   if data:
-    for elem in data.split(","):
+    for elem in utils.UnescapeAndSplit(data, sep=","):
       if "=" in elem:
         key, val = elem.split("=", 1)
       else:
@@ -400,7 +412,7 @@ def _SplitKeyVal(opt, data):
   return kv_dict
 
 
-def check_ident_key_val(option, opt, value):
+def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
   """Custom parser for ident:key=val,key=val options.
 
   This will store the parsed values as a tuple (ident, {key: val}). As such,
@@ -428,7 +440,7 @@ def check_ident_key_val(option, opt, value):
   return retval
 
 
-def check_key_val(option, opt, value):
+def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
   """Custom parser class for key=val,key=val options.
 
   This will store the parsed values as a dict {key: val}.
@@ -481,9 +493,8 @@ cli_option = CliOption
 _YESNO = ("yes", "no")
 _YORNO = "yes|no"
 
-DEBUG_OPT = cli_option("-d", "--debug", default=False,
-                       action="store_true",
-                       help="Turn debugging on")
+DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
+                       help="Increase debugging level")
 
 NOHDR_OPT = cli_option("--no-headers", default=False,
                        action="store_true", dest="no_headers",
@@ -597,6 +608,11 @@ NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
                            help="Don't check that the instance's IP"
                            " is alive")
 
+NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
+                             default=True, action="store_false",
+                             help="Don't check that the instance's name"
+                             " is resolvable")
+
 NET_OPT = cli_option("--net",
                      help="NIC parameters", default=[],
                      dest="nics", action="append", type="identkeyval")
@@ -693,6 +709,11 @@ ON_SECONDARY_OPT = cli_option("-s", "--on-secondary", dest="on_secondary",
                               help="Replace the disk(s) on the secondary"
                               " node (only for the drbd template)")
 
+AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
+                              default=False, action="store_true",
+                              help="Lock all nodes and auto-promote as needed"
+                              " to MC status")
+
 AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
                               default=False, action="store_true",
                               help="Automatically replace faulty disks"
@@ -831,6 +852,12 @@ SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
                          default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
                          help="Maximum time to wait for instance shutdown")
 
+EARLY_RELEASE_OPT = cli_option("--early-release",
+                               dest="early_release", default=False,
+                               action="store_true",
+                               help="Release the locks on the secondary"
+                               " node(s) early")
+
 
 def _ParseArgs(argv, commands, aliases):
   """Parser for the command line arguments.
@@ -1120,12 +1147,28 @@ def PollJob(job_id, cl=None, feedback_fn=None):
   prev_job_info = None
   prev_logmsg_serial = None
 
+  status = None
+
+  notified_queued = False
+  notified_waitlock = False
+
   while True:
-    result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
-                                 prev_logmsg_serial)
+    result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
+                                     prev_logmsg_serial)
     if not result:
       # job not found, go away!
       raise errors.JobLost("Job with id %s lost" % job_id)
+    elif result == constants.JOB_NOTCHANGED:
+      if status is not None and not callable(feedback_fn):
+        if status == constants.JOB_STATUS_QUEUED and not notified_queued:
+          ToStderr("Job %s is waiting in queue", job_id)
+          notified_queued = True
+        elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
+          ToStderr("Job %s is trying to acquire all necessary locks", job_id)
+          notified_waitlock = True
+
+      # Wait again
+      continue
 
     # Split result, a tuple of (field values, log entries)
     (job_info, log_entries) = result
@@ -1176,7 +1219,7 @@ def PollJob(job_id, cl=None, feedback_fn=None):
     raise errors.OpExecError(result)
 
 
-def SubmitOpCode(op, cl=None, feedback_fn=None):
+def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
   """Legacy function to submit an opcode.
 
   This is just a simple wrapper over the construction of the processor
@@ -1187,6 +1230,8 @@ def SubmitOpCode(op, cl=None, feedback_fn=None):
   if cl is None:
     cl = GetClient()
 
+  SetGenericOpcodeOpts([op], opts)
+
   job_id = SendJob([op], cl)
 
   op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
@@ -1202,16 +1247,35 @@ def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
   whether to just send the job and print its identifier. It is used in
   order to simplify the implementation of the '--submit' option.
 
-  It will also add the dry-run parameter from the options passed, if true.
+  It will also process the opcodes if we're sending the via SendJob
+  (otherwise SubmitOpCode does it).
 
   """
-  if opts and opts.dry_run:
-    op.dry_run = opts.dry_run
   if opts and opts.submit_only:
-    job_id = SendJob([op], cl=cl)
+    job = [op]
+    SetGenericOpcodeOpts(job, opts)
+    job_id = SendJob(job, cl=cl)
     raise JobSubmittedException(job_id)
   else:
-    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
+    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
+
+
+def SetGenericOpcodeOpts(opcode_list, options):
+  """Processor for generic options.
+
+  This function updates the given opcodes based on generic command
+  line options (like debug, dry-run, etc.).
+
+  @param opcode_list: list of opcodes
+  @param options: command line options or None
+  @return: None (in-place modification)
+
+  """
+  if not options:
+    return
+  for op in opcode_list:
+    op.dry_run = options.dry_run
+    op.debug_level = options.debug
 
 
 def GetClient():
@@ -1219,13 +1283,21 @@ def GetClient():
   try:
     client = luxi.Client()
   except luxi.NoMasterError:
-    master, myself = ssconf.GetMasterAndMyself()
+    ss = ssconf.SimpleStore()
+
+    # Try to read ssconf file
+    try:
+      ss.GetMasterNode()
+    except errors.ConfigurationError:
+      raise errors.OpPrereqError("Cluster not initialized or this machine is"
+                                 " not part of a cluster")
+
+    master, myself = ssconf.GetMasterAndMyself(ss=ss)
     if master != myself:
       raise errors.OpPrereqError("This is not the master node, please connect"
                                  " to node '%s' and rerun the command" %
                                  master)
-    else:
-      raise
+    raise
   return client
 
 
@@ -1266,8 +1338,13 @@ def FormatError(err):
       msg = "Failure: can't resolve hostname '%s'"
     obuf.write(msg % err.args[0])
   elif isinstance(err, errors.OpPrereqError):
-    obuf.write("Failure: prerequisites not met for this"
-               " operation:\n%s" % msg)
+    if len(err.args) == 2:
+      obuf.write("Failure: prerequisites not met for this"
+               " operation:\nerror type: %s, error details:\n%s" %
+                 (err.args[1], err.args[0]))
+    else:
+      obuf.write("Failure: prerequisites not met for this"
+                 " operation:\n%s" % msg)
   elif isinstance(err, errors.OpExecError):
     obuf.write("Failure: command execution error:\n%s" % msg)
   elif isinstance(err, errors.TagError):
@@ -1383,7 +1460,7 @@ def GenericInstanceCreate(mode, opts, args):
 
   if opts.nics:
     try:
-      nic_max = max(int(nidx[0])+1 for nidx in opts.nics)
+      nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
     except ValueError, err:
       raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
     nics = [{}] * nic_max
@@ -1414,7 +1491,7 @@ def GenericInstanceCreate(mode, opts, args):
     if opts.sd_size is not None:
       opts.disks = [(0, {"size": opts.sd_size})]
     try:
-      disk_max = max(int(didx[0])+1 for didx in opts.disks)
+      disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
     except ValueError, err:
       raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
     disks = [{}] * disk_max
@@ -1454,6 +1531,7 @@ def GenericInstanceCreate(mode, opts, args):
                                 nics=nics,
                                 pnode=pnode, snode=snode,
                                 ip_check=opts.ip_check,
+                                name_check=opts.name_check,
                                 wait_for_sync=opts.wait_for_sync,
                                 file_storage_dir=opts.file_storage_dir,
                                 file_driver=opts.file_driver,
@@ -1511,8 +1589,8 @@ def GenerateTable(headers, fields, separator, data,
   if unitfields is None:
     unitfields = []
 
-  numfields = utils.FieldSet(*numfields)
-  unitfields = utils.FieldSet(*unitfields)
+  numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
+  unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
 
   format_fields = []
   for field in fields:
@@ -1541,7 +1619,7 @@ def GenerateTable(headers, fields, separator, data,
       if unitfields.Matches(fields[idx]):
         try:
           val = int(val)
-        except ValueError:
+        except (TypeError, ValueError):
           pass
         else:
           val = row[idx] = utils.FormatUnit(val, units)
@@ -1560,6 +1638,12 @@ def GenerateTable(headers, fields, separator, data,
       args.append(hdr)
     result.append(format % tuple(args))
 
+  if separator is None:
+    assert len(mlens) == len(fields)
+
+    if fields and not numfields.Matches(fields[-1]):
+      mlens[-1] = 0
+
   for line in data:
     args = []
     if line is None:
@@ -1616,7 +1700,7 @@ def ParseTimespec(value):
   if value[-1] not in suffix_map:
     try:
       value = int(value)
-    except ValueError:
+    except (TypeError, ValueError):
       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
   else:
     multiplier = suffix_map[value[-1]]
@@ -1626,7 +1710,7 @@ def ParseTimespec(value):
                                  " suffix passed)")
     try:
       value = int(value) * multiplier
-    except ValueError:
+    except (TypeError, ValueError):
       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
   return value
 
@@ -1653,7 +1737,7 @@ def GetOnlineNodes(nodes, cl=None, nowarn=False):
                          use_locking=False)
   offline = [row[0] for row in result if row[1]]
   if offline and not nowarn:
-    ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline))
+    ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
   return [row[0] for row in result if not row[1]]
 
 
@@ -1706,13 +1790,14 @@ class JobExecutor(object):
   GetResults() calls.
 
   """
-  def __init__(self, cl=None, verbose=True):
+  def __init__(self, cl=None, verbose=True, opts=None):
     self.queue = []
     if cl is None:
       cl = GetClient()
     self.cl = cl
     self.verbose = verbose
     self.jobs = []
+    self.opts = opts
 
   def QueueJob(self, name, *ops):
     """Record a job for later submit.
@@ -1720,6 +1805,7 @@ class JobExecutor(object):
     @type name: string
     @param name: a description of the job, will be used in WaitJobSet
     """
+    SetGenericOpcodeOpts(ops, self.opts)
     self.queue.append((name, ops))
 
   def SubmitPending(self):
@@ -1727,8 +1813,31 @@ class JobExecutor(object):
 
     """
     results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
-    for ((status, data), (name, _)) in zip(results, self.queue):
-      self.jobs.append((status, data, name))
+    for (idx, ((status, data), (name, _))) in enumerate(zip(results,
+                                                            self.queue)):
+      self.jobs.append((idx, status, data, name))
+
+  def _ChooseJob(self):
+    """Choose a non-waiting/queued job to poll next.
+
+    """
+    assert self.jobs, "_ChooseJob called with empty job list"
+
+    result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
+    assert result
+
+    for job_data, status in zip(self.jobs, result):
+      if status[0] in (constants.JOB_STATUS_QUEUED,
+                    constants.JOB_STATUS_WAITLOCK,
+                    constants.JOB_STATUS_CANCELING):
+        # job is still waiting
+        continue
+      # good candidate found
+      self.jobs.remove(job_data)
+      return job_data
+
+    # no job found
+    return self.jobs.pop(0)
 
   def GetResults(self):
     """Wait for and return the results of all jobs.
@@ -1743,16 +1852,19 @@ class JobExecutor(object):
       self.SubmitPending()
     results = []
     if self.verbose:
-      ok_jobs = [row[1] for row in self.jobs if row[0]]
+      ok_jobs = [row[2] for row in self.jobs if row[1]]
       if ok_jobs:
-        ToStdout("Submitted jobs %s", ", ".join(ok_jobs))
-    for submit_status, jid, name in self.jobs:
-      if not submit_status:
+        ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
+
+    # first, remove any non-submitted jobs
+    self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
+    for idx, _, jid, name in failures:
         ToStderr("Failed to submit job for %s: %s", name, jid)
-        results.append((False, jid))
-        continue
-      if self.verbose:
-        ToStdout("Waiting for job %s for %s...", jid, name)
+        results.append((idx, False, jid))
+
+    while self.jobs:
+      (idx, _, jid, name) = self._ChooseJob()
+      ToStdout("Waiting for job %s for %s...", jid, name)
       try:
         job_result = PollJob(jid, cl=self.cl)
         success = True
@@ -1762,7 +1874,12 @@ class JobExecutor(object):
         # the error message will always be shown, verbose or not
         ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
 
-      results.append((success, job_result))
+      results.append((idx, success, job_result))
+
+    # sort based on the index, then drop it
+    results.sort()
+    results = [i[1:] for i in results]
+
     return results
 
   def WaitOrShow(self, wait):