Add strict name validation for the LVM backend
[ganeti-local] / lib / cli.py
index bcf8d8b..f1d2dfc 100644 (file)
@@ -25,7 +25,6 @@
 import sys
 import textwrap
 import os.path
-import copy
 import time
 import logging
 from cStringIO import StringIO
@@ -50,36 +49,51 @@ __all__ = [
   "BACKEND_OPT",
   "CLEANUP_OPT",
   "CONFIRM_OPT",
+  "CP_SIZE_OPT",
   "DEBUG_OPT",
   "DEBUG_SIMERR_OPT",
   "DISKIDX_OPT",
   "DISK_OPT",
   "DISK_TEMPLATE_OPT",
   "DRAINED_OPT",
+  "EARLY_RELEASE_OPT",
   "ENABLED_HV_OPT",
+  "ERROR_CODES_OPT",
   "FIELDS_OPT",
   "FILESTORE_DIR_OPT",
   "FILESTORE_DRIVER_OPT",
+  "FORCE_OPT",
+  "FORCE_VARIANT_OPT",
+  "GLOBAL_FILEDIR_OPT",
   "HVLIST_OPT",
   "HVOPTS_OPT",
   "HYPERVISOR_OPT",
   "IALLOCATOR_OPT",
   "IGNORE_CONSIST_OPT",
   "IGNORE_FAILURES_OPT",
+  "IGNORE_SECONDARIES_OPT",
   "IGNORE_SIZE_OPT",
-  "FORCE_OPT",
+  "MAC_PREFIX_OPT",
+  "MASTER_NETDEV_OPT",
   "MC_OPT",
   "NET_OPT",
   "NEW_SECONDARY_OPT",
+  "NIC_PARAMS_OPT",
   "NODE_LIST_OPT",
   "NODE_PLACEMENT_OPT",
   "NOHDR_OPT",
   "NOIPCHECK_OPT",
+  "NONAMECHECK_OPT",
   "NOLVM_STORAGE_OPT",
+  "NOMODIFY_ETCHOSTS_OPT",
+  "NOMODIFY_SSH_SETUP_OPT",
   "NONICS_OPT",
   "NONLIVE_OPT",
+  "NONPLUS1_OPT",
+  "NOSHUTDOWN_OPT",
   "NOSTART_OPT",
   "NOSSH_KEYCHECK_OPT",
+  "NOVOTING_OPT",
   "NWSYNC_OPT",
   "ON_PRIMARY_OPT",
   "ON_SECONDARY_OPT",
@@ -87,10 +101,12 @@ __all__ = [
   "OS_OPT",
   "OS_SIZE_OPT",
   "READD_OPT",
+  "REBOOT_TYPE_OPT",
   "SECONDARY_IP_OPT",
   "SELECT_OS_OPT",
   "SEP_OPT",
   "SHOWCMD_OPT",
+  "SHUTDOWN_TIMEOUT_OPT",
   "SINGLE_NODE_OPT",
   "SRC_DIR_OPT",
   "SRC_NODE_OPT",
@@ -98,10 +114,14 @@ __all__ = [
   "STATIC_OPT",
   "SYNC_OPT",
   "TAG_SRC_OPT",
+  "TIMEOUT_OPT",
   "USEUNITS_OPT",
   "VERBOSE_OPT",
+  "VG_NAME_OPT",
+  "YES_DOIT_OPT",
   # Generic functions for CLI programs
   "GenericMain",
+  "GenericInstanceCreate",
   "GetClient",
   "GetOnlineNodes",
   "JobExecutor",
@@ -143,6 +163,7 @@ __all__ = [
   "OPT_COMPL_ONE_OS",
   "cli_option",
   "SplitNodeOption",
+  "CalculateOSNames",
   ]
 
 NO_PREFIX = "no_"
@@ -150,7 +171,7 @@ UN_PREFIX = "-"
 
 
 class _Argument:
-  def __init__(self, min=0, max=None):
+  def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
     self.min = min
     self.max = max
 
@@ -165,6 +186,7 @@ class ArgSuggest(_Argument):
   Value can be any of the ones passed to the constructor.
 
   """
+  # pylint: disable-msg=W0622
   def __init__(self, min=0, max=None, choices=None):
     _Argument.__init__(self, min=min, max=max)
     self.choices = choices
@@ -231,7 +253,6 @@ ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
 ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
 
 
-
 def _ExtractTagsObject(opts, args):
   """Extract the tag type object.
 
@@ -292,8 +313,8 @@ def ListTags(opts, args):
 
   """
   kind, name = _ExtractTagsObject(opts, args)
-  op = opcodes.OpGetTags(kind=kind, name=name)
-  result = SubmitOpCode(op)
+  cl = GetClient()
+  result = cl.QueryTags(kind, name)
   result = list(result)
   result.sort()
   for tag in result:
@@ -334,7 +355,7 @@ def RemoveTags(opts, args):
   SubmitOpCode(op)
 
 
-def check_unit(option, opt, value):
+def check_unit(option, opt, value): # pylint: disable-msg=W0613
   """OptParsers custom converter for units.
 
   """
@@ -364,7 +385,7 @@ def _SplitKeyVal(opt, data):
   """
   kv_dict = {}
   if data:
-    for elem in data.split(","):
+    for elem in utils.UnescapeAndSplit(data, sep=","):
       if "=" in elem:
         key, val = elem.split("=", 1)
       else:
@@ -381,7 +402,7 @@ def _SplitKeyVal(opt, data):
   return kv_dict
 
 
-def check_ident_key_val(option, opt, value):
+def check_ident_key_val(option, opt, value):  # pylint: disable-msg=W0613
   """Custom parser for ident:key=val,key=val options.
 
   This will store the parsed values as a tuple (ident, {key: val}). As such,
@@ -409,7 +430,7 @@ def check_ident_key_val(option, opt, value):
   return retval
 
 
-def check_key_val(option, opt, value):
+def check_key_val(option, opt, value):  # pylint: disable-msg=W0613
   """Custom parser class for key=val,key=val options.
 
   This will store the parsed values as a dict {key: val}.
@@ -462,9 +483,8 @@ cli_option = CliOption
 _YESNO = ("yes", "no")
 _YORNO = "yes|no"
 
-DEBUG_OPT = cli_option("-d", "--debug", default=False,
-                       action="store_true",
-                       help="Turn debugging on")
+DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
+                       help="Increase debugging level")
 
 NOHDR_OPT = cli_option("--no-headers", default=False,
                        action="store_true", dest="no_headers",
@@ -551,6 +571,10 @@ OS_OPT = cli_option("-o", "--os-type", dest="os", help="What OS to run",
                     metavar="<os>",
                     completion_suggest=OPT_COMPL_ONE_OS)
 
+FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
+                               action="store_true", default=False,
+                               help="Force an unknown variant")
+
 BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
                          type="keyval", default={},
                          help="Backend parameters")
@@ -574,6 +598,11 @@ NOIPCHECK_OPT = cli_option("--no-ip-check", dest="ip_check", default=True,
                            help="Don't check that the instance's IP"
                            " is alive")
 
+NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
+                             default=True, action="store_false",
+                             help="Don't check that the instance's name"
+                             " is resolvable")
+
 NET_OPT = cli_option("--net",
                      help="NIC parameters", default=[],
                      dest="nics", action="append", type="identkeyval")
@@ -727,6 +756,93 @@ ENABLED_HV_OPT = cli_option("--enabled-hypervisors",
                             help="Comma-separated list of hypervisors",
                             type="string", default=None)
 
+NIC_PARAMS_OPT = cli_option("-N", "--nic-parameters", dest="nicparams",
+                            type="keyval", default={},
+                            help="NIC parameters")
+
+CP_SIZE_OPT = cli_option("-C", "--candidate-pool-size", default=None,
+                         dest="candidate_pool_size", type="int",
+                         help="Set the candidate pool size")
+
+VG_NAME_OPT = cli_option("-g", "--vg-name", dest="vg_name",
+                         help="Enables LVM and specifies the volume group"
+                         " name (cluster-wide) for disk allocation [xenvg]",
+                         metavar="VG", default=None)
+
+YES_DOIT_OPT = cli_option("--yes-do-it", dest="yes_do_it",
+                          help="Destroy cluster", action="store_true")
+
+NOVOTING_OPT = cli_option("--no-voting", dest="no_voting",
+                          help="Skip node agreement check (dangerous)",
+                          action="store_true", default=False)
+
+MAC_PREFIX_OPT = cli_option("-m", "--mac-prefix", dest="mac_prefix",
+                            help="Specify the mac prefix for the instance IP"
+                            " addresses, in the format XX:XX:XX",
+                            metavar="PREFIX",
+                            default=None)
+
+MASTER_NETDEV_OPT = cli_option("--master-netdev", dest="master_netdev",
+                               help="Specify the node interface (cluster-wide)"
+                               " on which the master IP address will be added "
+                               " [%s]" % constants.DEFAULT_BRIDGE,
+                               metavar="NETDEV",
+                               default=constants.DEFAULT_BRIDGE)
+
+
+GLOBAL_FILEDIR_OPT = cli_option("--file-storage-dir", dest="file_storage_dir",
+                                help="Specify the default directory (cluster-"
+                                "wide) for storing the file-based disks [%s]" %
+                                constants.DEFAULT_FILE_STORAGE_DIR,
+                                metavar="DIR",
+                                default=constants.DEFAULT_FILE_STORAGE_DIR)
+
+NOMODIFY_ETCHOSTS_OPT = cli_option("--no-etc-hosts", dest="modify_etc_hosts",
+                                   help="Don't modify /etc/hosts",
+                                   action="store_false", default=True)
+
+NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
+                                    help="Don't initialize SSH keys",
+                                    action="store_false", default=True)
+
+ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
+                             help="Enable parseable error messages",
+                             action="store_true", default=False)
+
+NONPLUS1_OPT = cli_option("--no-nplus1-mem", dest="skip_nplusone_mem",
+                          help="Skip N+1 memory redundancy tests",
+                          action="store_true", default=False)
+
+REBOOT_TYPE_OPT = cli_option("-t", "--type", dest="reboot_type",
+                             help="Type of reboot: soft/hard/full",
+                             default=constants.INSTANCE_REBOOT_HARD,
+                             metavar="<REBOOT>",
+                             choices=list(constants.REBOOT_TYPES))
+
+IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
+                                    dest="ignore_secondaries",
+                                    default=False, action="store_true",
+                                    help="Ignore errors from secondaries")
+
+NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
+                            action="store_false", default=True,
+                            help="Don't shutdown the instance (unsafe)")
+
+TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
+                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
+                         help="Maximum time to wait")
+
+SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
+                         dest="shutdown_timeout", type="int",
+                         default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
+                         help="Maximum time to wait for instance shutdown")
+
+EARLY_RELEASE_OPT = cli_option("--early-release",
+                               dest="early_release", default=False,
+                               action="store_true",
+                               help="Release the locks on the secondary"
+                               " node(s) early")
+
 
 def _ParseArgs(argv, commands, aliases):
   """Parser for the command line arguments.
@@ -793,7 +909,7 @@ def _ParseArgs(argv, commands, aliases):
     cmd = aliases[cmd]
 
   func, args_def, parser_opts, usage, description = commands[cmd]
-  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT],
+  parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
                         description=description,
                         formatter=TitledHelpFormatter(),
                         usage="%%prog %s %s" % (cmd, usage))
@@ -887,6 +1003,23 @@ def SplitNodeOption(value):
     return (value, None)
 
 
+def CalculateOSNames(os_name, os_variants):
+  """Calculates all the names an OS can be called, according to its variants.
+
+  @type os_name: string
+  @param os_name: base name of the os
+  @type os_variants: list or None
+  @param os_variants: list of supported variants
+  @rtype: list
+  @return: list of valid names
+
+  """
+  if os_variants:
+    return ['%s+%s' % (os_name, v) for v in os_variants]
+  else:
+    return [os_name]
+
+
 def UsesRPC(fn):
   def wrapper(*args, **kwargs):
     rpc.Init()
@@ -999,12 +1132,28 @@ def PollJob(job_id, cl=None, feedback_fn=None):
   prev_job_info = None
   prev_logmsg_serial = None
 
+  status = None
+
+  notified_queued = False
+  notified_waitlock = False
+
   while True:
-    result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
-                                 prev_logmsg_serial)
+    result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
+                                     prev_logmsg_serial)
     if not result:
       # job not found, go away!
       raise errors.JobLost("Job with id %s lost" % job_id)
+    elif result == constants.JOB_NOTCHANGED:
+      if status is not None and not callable(feedback_fn):
+        if status == constants.JOB_STATUS_QUEUED and not notified_queued:
+          ToStderr("Job %s is waiting in queue", job_id)
+          notified_queued = True
+        elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
+          ToStderr("Job %s is trying to acquire all necessary locks", job_id)
+          notified_waitlock = True
+
+      # Wait again
+      continue
 
     # Split result, a tuple of (field values, log entries)
     (job_info, log_entries) = result
@@ -1055,7 +1204,7 @@ def PollJob(job_id, cl=None, feedback_fn=None):
     raise errors.OpExecError(result)
 
 
-def SubmitOpCode(op, cl=None, feedback_fn=None):
+def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
   """Legacy function to submit an opcode.
 
   This is just a simple wrapper over the construction of the processor
@@ -1066,6 +1215,8 @@ def SubmitOpCode(op, cl=None, feedback_fn=None):
   if cl is None:
     cl = GetClient()
 
+  SetGenericOpcodeOpts([op], opts)
+
   job_id = SendJob([op], cl)
 
   op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
@@ -1081,16 +1232,35 @@ def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
   whether to just send the job and print its identifier. It is used in
   order to simplify the implementation of the '--submit' option.
 
-  It will also add the dry-run parameter from the options passed, if true.
+  It will also process the opcodes if we're sending the via SendJob
+  (otherwise SubmitOpCode does it).
 
   """
-  if opts and opts.dry_run:
-    op.dry_run = opts.dry_run
   if opts and opts.submit_only:
-    job_id = SendJob([op], cl=cl)
+    job = [op]
+    SetGenericOpcodeOpts(job, opts)
+    job_id = SendJob(job, cl=cl)
     raise JobSubmittedException(job_id)
   else:
-    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
+    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
+
+
+def SetGenericOpcodeOpts(opcode_list, options):
+  """Processor for generic options.
+
+  This function updates the given opcodes based on generic command
+  line options (like debug, dry-run, etc.).
+
+  @param opcode_list: list of opcodes
+  @param options: command line options or None
+  @return: None (in-place modification)
+
+  """
+  if not options:
+    return
+  for op in opcode_list:
+    op.dry_run = options.dry_run
+    op.debug_level = options.debug
 
 
 def GetClient():
@@ -1098,13 +1268,21 @@ def GetClient():
   try:
     client = luxi.Client()
   except luxi.NoMasterError:
-    master, myself = ssconf.GetMasterAndMyself()
+    ss = ssconf.SimpleStore()
+
+    # Try to read ssconf file
+    try:
+      ss.GetMasterNode()
+    except errors.ConfigurationError:
+      raise errors.OpPrereqError("Cluster not initialized or this machine is"
+                                 " not part of a cluster")
+
+    master, myself = ssconf.GetMasterAndMyself(ss=ss)
     if master != myself:
       raise errors.OpPrereqError("This is not the master node, please connect"
                                  " to node '%s' and rerun the command" %
                                  master)
-    else:
-      raise
+    raise
   return client
 
 
@@ -1145,8 +1323,13 @@ def FormatError(err):
       msg = "Failure: can't resolve hostname '%s'"
     obuf.write(msg % err.args[0])
   elif isinstance(err, errors.OpPrereqError):
-    obuf.write("Failure: prerequisites not met for this"
-               " operation:\n%s" % msg)
+    if len(err.args) == 2:
+      obuf.write("Failure: prerequisites not met for this"
+               " operation:\nerror type: %s, error details:\n%s" %
+                 (err.args[1], err.args[0]))
+    else:
+      obuf.write("Failure: prerequisites not met for this"
+                 " operation:\n%s" % msg)
   elif isinstance(err, errors.OpExecError):
     obuf.write("Failure: command execution error:\n%s" % msg)
   elif isinstance(err, errors.TagError):
@@ -1240,6 +1423,117 @@ def GenericMain(commands, override=None, aliases=None):
   return result
 
 
+def GenericInstanceCreate(mode, opts, args):
+  """Add an instance to the cluster via either creation or import.
+
+  @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
+  @param opts: the command line options selected by the user
+  @type args: list
+  @param args: should contain only one element, the new instance name
+  @rtype: int
+  @return: the desired exit code
+
+  """
+  instance = args[0]
+
+  (pnode, snode) = SplitNodeOption(opts.node)
+
+  hypervisor = None
+  hvparams = {}
+  if opts.hypervisor:
+    hypervisor, hvparams = opts.hypervisor
+
+  if opts.nics:
+    try:
+      nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
+    except ValueError, err:
+      raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
+    nics = [{}] * nic_max
+    for nidx, ndict in opts.nics:
+      nidx = int(nidx)
+      if not isinstance(ndict, dict):
+        msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
+        raise errors.OpPrereqError(msg)
+      nics[nidx] = ndict
+  elif opts.no_nics:
+    # no nics
+    nics = []
+  else:
+    # default of one nic, all auto
+    nics = [{}]
+
+  if opts.disk_template == constants.DT_DISKLESS:
+    if opts.disks or opts.sd_size is not None:
+      raise errors.OpPrereqError("Diskless instance but disk"
+                                 " information passed")
+    disks = []
+  else:
+    if not opts.disks and not opts.sd_size:
+      raise errors.OpPrereqError("No disk information specified")
+    if opts.disks and opts.sd_size is not None:
+      raise errors.OpPrereqError("Please use either the '--disk' or"
+                                 " '-s' option")
+    if opts.sd_size is not None:
+      opts.disks = [(0, {"size": opts.sd_size})]
+    try:
+      disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
+    except ValueError, err:
+      raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
+    disks = [{}] * disk_max
+    for didx, ddict in opts.disks:
+      didx = int(didx)
+      if not isinstance(ddict, dict):
+        msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
+        raise errors.OpPrereqError(msg)
+      elif "size" not in ddict:
+        raise errors.OpPrereqError("Missing size for disk %d" % didx)
+      try:
+        ddict["size"] = utils.ParseUnit(ddict["size"])
+      except ValueError, err:
+        raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
+                                   (didx, err))
+      disks[didx] = ddict
+
+  utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
+  utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
+
+  if mode == constants.INSTANCE_CREATE:
+    start = opts.start
+    os_type = opts.os
+    src_node = None
+    src_path = None
+  elif mode == constants.INSTANCE_IMPORT:
+    start = False
+    os_type = None
+    src_node = opts.src_node
+    src_path = opts.src_dir
+  else:
+    raise errors.ProgrammerError("Invalid creation mode %s" % mode)
+
+  op = opcodes.OpCreateInstance(instance_name=instance,
+                                disks=disks,
+                                disk_template=opts.disk_template,
+                                nics=nics,
+                                pnode=pnode, snode=snode,
+                                ip_check=opts.ip_check,
+                                name_check=opts.name_check,
+                                wait_for_sync=opts.wait_for_sync,
+                                file_storage_dir=opts.file_storage_dir,
+                                file_driver=opts.file_driver,
+                                iallocator=opts.iallocator,
+                                hypervisor=hypervisor,
+                                hvparams=hvparams,
+                                beparams=opts.beparams,
+                                mode=mode,
+                                start=start,
+                                os_type=os_type,
+                                src_node=src_node,
+                                src_path=src_path)
+
+  SubmitOrSend(op, opts)
+  return 0
+
+
 def GenerateTable(headers, fields, separator, data,
                   numfields=None, unitfields=None,
                   units=None):
@@ -1280,8 +1574,8 @@ def GenerateTable(headers, fields, separator, data,
   if unitfields is None:
     unitfields = []
 
-  numfields = utils.FieldSet(*numfields)
-  unitfields = utils.FieldSet(*unitfields)
+  numfields = utils.FieldSet(*numfields)   # pylint: disable-msg=W0142
+  unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
 
   format_fields = []
   for field in fields:
@@ -1310,7 +1604,7 @@ def GenerateTable(headers, fields, separator, data,
       if unitfields.Matches(fields[idx]):
         try:
           val = int(val)
-        except ValueError:
+        except (TypeError, ValueError):
           pass
         else:
           val = row[idx] = utils.FormatUnit(val, units)
@@ -1329,11 +1623,17 @@ def GenerateTable(headers, fields, separator, data,
       args.append(hdr)
     result.append(format % tuple(args))
 
+  if separator is None:
+    assert len(mlens) == len(fields)
+
+    if fields and not numfields.Matches(fields[-1]):
+      mlens[-1] = 0
+
   for line in data:
     args = []
     if line is None:
       line = ['-' for _ in fields]
-    for idx in xrange(len(fields)):
+    for idx in range(len(fields)):
       if separator is None:
         args.append(mlens[idx])
       args.append(line[idx])
@@ -1385,7 +1685,7 @@ def ParseTimespec(value):
   if value[-1] not in suffix_map:
     try:
       value = int(value)
-    except ValueError:
+    except (TypeError, ValueError):
       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
   else:
     multiplier = suffix_map[value[-1]]
@@ -1395,7 +1695,7 @@ def ParseTimespec(value):
                                  " suffix passed)")
     try:
       value = int(value) * multiplier
-    except ValueError:
+    except (TypeError, ValueError):
       raise errors.OpPrereqError("Invalid time specification '%s'" % value)
   return value
 
@@ -1422,7 +1722,7 @@ def GetOnlineNodes(nodes, cl=None, nowarn=False):
                          use_locking=False)
   offline = [row[0] for row in result if row[1]]
   if offline and not nowarn:
-    ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline))
+    ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
   return [row[0] for row in result if not row[1]]
 
 
@@ -1475,13 +1775,14 @@ class JobExecutor(object):
   GetResults() calls.
 
   """
-  def __init__(self, cl=None, verbose=True):
+  def __init__(self, cl=None, verbose=True, opts=None):
     self.queue = []
     if cl is None:
       cl = GetClient()
     self.cl = cl
     self.verbose = verbose
     self.jobs = []
+    self.opts = opts
 
   def QueueJob(self, name, *ops):
     """Record a job for later submit.
@@ -1489,6 +1790,7 @@ class JobExecutor(object):
     @type name: string
     @param name: a description of the job, will be used in WaitJobSet
     """
+    SetGenericOpcodeOpts(ops, self.opts)
     self.queue.append((name, ops))
 
   def SubmitPending(self):
@@ -1514,7 +1816,7 @@ class JobExecutor(object):
     if self.verbose:
       ok_jobs = [row[1] for row in self.jobs if row[0]]
       if ok_jobs:
-        ToStdout("Submitted jobs %s", ", ".join(ok_jobs))
+        ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
     for submit_status, jid, name in self.jobs:
       if not submit_status:
         ToStderr("Failed to submit job for %s: %s", name, jid)