import sys
import textwrap
import os.path
-import copy
import time
import logging
from cStringIO import StringIO
# Command line options
"ALLOCATABLE_OPT",
"ALL_OPT",
+ "AUTO_PROMOTE_OPT",
"AUTO_REPLACE_OPT",
"BACKEND_OPT",
"CLEANUP_OPT",
"DISK_OPT",
"DISK_TEMPLATE_OPT",
"DRAINED_OPT",
+ "EARLY_RELEASE_OPT",
"ENABLED_HV_OPT",
"ERROR_CODES_OPT",
"FIELDS_OPT",
"FILESTORE_DIR_OPT",
"FILESTORE_DRIVER_OPT",
+ "FORCE_OPT",
+ "FORCE_VARIANT_OPT",
"GLOBAL_FILEDIR_OPT",
"HVLIST_OPT",
"HVOPTS_OPT",
"IALLOCATOR_OPT",
"IGNORE_CONSIST_OPT",
"IGNORE_FAILURES_OPT",
+ "IGNORE_SECONDARIES_OPT",
"IGNORE_SIZE_OPT",
- "FORCE_OPT",
"MAC_PREFIX_OPT",
"MASTER_NETDEV_OPT",
"MC_OPT",
"NODE_PLACEMENT_OPT",
"NOHDR_OPT",
"NOIPCHECK_OPT",
+ "NONAMECHECK_OPT",
"NOLVM_STORAGE_OPT",
"NOMODIFY_ETCHOSTS_OPT",
+ "NOMODIFY_SSH_SETUP_OPT",
"NONICS_OPT",
"NONLIVE_OPT",
"NONPLUS1_OPT",
+ "NOSHUTDOWN_OPT",
"NOSTART_OPT",
"NOSSH_KEYCHECK_OPT",
"NOVOTING_OPT",
"SELECT_OS_OPT",
"SEP_OPT",
"SHOWCMD_OPT",
+ "SHUTDOWN_TIMEOUT_OPT",
"SINGLE_NODE_OPT",
"SRC_DIR_OPT",
"SRC_NODE_OPT",
"STATIC_OPT",
"SYNC_OPT",
"TAG_SRC_OPT",
+ "TIMEOUT_OPT",
"USEUNITS_OPT",
"VERBOSE_OPT",
"VG_NAME_OPT",
"YES_DOIT_OPT",
# Generic functions for CLI programs
"GenericMain",
+ "GenericInstanceCreate",
"GetClient",
"GetOnlineNodes",
"JobExecutor",
"ARGS_NONE",
"ARGS_ONE_INSTANCE",
"ARGS_ONE_NODE",
+ "ARGS_ONE_OS",
"ArgChoice",
"ArgCommand",
"ArgFile",
"ArgInstance",
"ArgJobId",
"ArgNode",
+ "ArgOs",
"ArgSuggest",
"ArgUnknown",
"OPT_COMPL_INST_ADD_NODES",
"OPT_COMPL_ONE_OS",
"cli_option",
"SplitNodeOption",
+ "CalculateOSNames",
]
NO_PREFIX = "no_"
class _Argument:
- def __init__(self, min=0, max=None):
+ def __init__(self, min=0, max=None): # pylint: disable-msg=W0622
self.min = min
self.max = max
Value can be any of the ones passed to the constructor.
"""
+ # pylint: disable-msg=W0622
def __init__(self, min=0, max=None, choices=None):
_Argument.__init__(self, min=min, max=max)
self.choices = choices
"""
+class ArgOs(_Argument):
+ """OS argument.
+
+ """
+
+
ARGS_NONE = []
ARGS_MANY_INSTANCES = [ArgInstance()]
ARGS_MANY_NODES = [ArgNode()]
ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
-
+ARGS_ONE_OS = [ArgOs(min=1, max=1)]
def _ExtractTagsObject(opts, args):
"""
kind, name = _ExtractTagsObject(opts, args)
- op = opcodes.OpGetTags(kind=kind, name=name)
- result = SubmitOpCode(op)
+ cl = GetClient()
+ result = cl.QueryTags(kind, name)
result = list(result)
result.sort()
for tag in result:
SubmitOpCode(op)
-def check_unit(option, opt, value):
+def check_unit(option, opt, value): # pylint: disable-msg=W0613
"""OptParsers custom converter for units.
"""
"""
kv_dict = {}
if data:
- for elem in data.split(","):
+ for elem in utils.UnescapeAndSplit(data, sep=","):
if "=" in elem:
key, val = elem.split("=", 1)
else:
return kv_dict
-def check_ident_key_val(option, opt, value):
+def check_ident_key_val(option, opt, value): # pylint: disable-msg=W0613
"""Custom parser for ident:key=val,key=val options.
This will store the parsed values as a tuple (ident, {key: val}). As such,
return retval
-def check_key_val(option, opt, value):
+def check_key_val(option, opt, value): # pylint: disable-msg=W0613
"""Custom parser class for key=val,key=val options.
This will store the parsed values as a dict {key: val}.
_YESNO = ("yes", "no")
_YORNO = "yes|no"
-DEBUG_OPT = cli_option("-d", "--debug", default=False,
- action="store_true",
- help="Turn debugging on")
+DEBUG_OPT = cli_option("-d", "--debug", default=0, action="count",
+ help="Increase debugging level")
NOHDR_OPT = cli_option("--no-headers", default=False,
action="store_true", dest="no_headers",
metavar="<os>",
completion_suggest=OPT_COMPL_ONE_OS)
+FORCE_VARIANT_OPT = cli_option("--force-variant", dest="force_variant",
+ action="store_true", default=False,
+ help="Force an unknown variant")
+
BACKEND_OPT = cli_option("-B", "--backend-parameters", dest="beparams",
type="keyval", default={},
help="Backend parameters")
help="Don't check that the instance's IP"
" is alive")
+NONAMECHECK_OPT = cli_option("--no-name-check", dest="name_check",
+ default=True, action="store_false",
+ help="Don't check that the instance's name"
+ " is resolvable")
+
NET_OPT = cli_option("--net",
help="NIC parameters", default=[],
dest="nics", action="append", type="identkeyval")
help="Replace the disk(s) on the secondary"
" node (only for the drbd template)")
+AUTO_PROMOTE_OPT = cli_option("--auto-promote", dest="auto_promote",
+ default=False, action="store_true",
+ help="Lock all nodes and auto-promote as needed"
+ " to MC status")
+
AUTO_REPLACE_OPT = cli_option("-a", "--auto", dest="auto",
default=False, action="store_true",
help="Automatically replace faulty disks"
help="Don't modify /etc/hosts",
action="store_false", default=True)
+NOMODIFY_SSH_SETUP_OPT = cli_option("--no-ssh-init", dest="modify_ssh_setup",
+ help="Don't initialize SSH keys",
+ action="store_false", default=True)
+
ERROR_CODES_OPT = cli_option("--error-codes", dest="error_codes",
help="Enable parseable error messages",
action="store_true", default=False)
metavar="<REBOOT>",
choices=list(constants.REBOOT_TYPES))
+IGNORE_SECONDARIES_OPT = cli_option("--ignore-secondaries",
+ dest="ignore_secondaries",
+ default=False, action="store_true",
+ help="Ignore errors from secondaries")
+
+NOSHUTDOWN_OPT = cli_option("--noshutdown", dest="shutdown",
+ action="store_false", default=True,
+ help="Don't shutdown the instance (unsafe)")
+
+TIMEOUT_OPT = cli_option("--timeout", dest="timeout", type="int",
+ default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
+ help="Maximum time to wait")
+
+SHUTDOWN_TIMEOUT_OPT = cli_option("--shutdown-timeout",
+ dest="shutdown_timeout", type="int",
+ default=constants.DEFAULT_SHUTDOWN_TIMEOUT,
+ help="Maximum time to wait for instance shutdown")
+
+EARLY_RELEASE_OPT = cli_option("--early-release",
+ dest="early_release", default=False,
+ action="store_true",
+ help="Release the locks on the secondary"
+ " node(s) early")
+
def _ParseArgs(argv, commands, aliases):
"""Parser for the command line arguments.
cmd = aliases[cmd]
func, args_def, parser_opts, usage, description = commands[cmd]
- parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT],
+ parser = OptionParser(option_list=parser_opts + [_DRY_RUN_OPT, DEBUG_OPT],
description=description,
formatter=TitledHelpFormatter(),
usage="%%prog %s %s" % (cmd, usage))
return (value, None)
+def CalculateOSNames(os_name, os_variants):
+ """Calculates all the names an OS can be called, according to its variants.
+
+ @type os_name: string
+ @param os_name: base name of the os
+ @type os_variants: list or None
+ @param os_variants: list of supported variants
+ @rtype: list
+ @return: list of valid names
+
+ """
+ if os_variants:
+ return ['%s+%s' % (os_name, v) for v in os_variants]
+ else:
+ return [os_name]
+
+
def UsesRPC(fn):
def wrapper(*args, **kwargs):
rpc.Init()
prev_job_info = None
prev_logmsg_serial = None
+ status = None
+
+ notified_queued = False
+ notified_waitlock = False
+
while True:
- result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
- prev_logmsg_serial)
+ result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
+ prev_logmsg_serial)
if not result:
# job not found, go away!
raise errors.JobLost("Job with id %s lost" % job_id)
+ elif result == constants.JOB_NOTCHANGED:
+ if status is not None and not callable(feedback_fn):
+ if status == constants.JOB_STATUS_QUEUED and not notified_queued:
+ ToStderr("Job %s is waiting in queue", job_id)
+ notified_queued = True
+ elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
+ ToStderr("Job %s is trying to acquire all necessary locks", job_id)
+ notified_waitlock = True
+
+ # Wait again
+ continue
# Split result, a tuple of (field values, log entries)
(job_info, log_entries) = result
raise errors.OpExecError(result)
-def SubmitOpCode(op, cl=None, feedback_fn=None):
+def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
"""Legacy function to submit an opcode.
This is just a simple wrapper over the construction of the processor
if cl is None:
cl = GetClient()
+ SetGenericOpcodeOpts([op], opts)
+
job_id = SendJob([op], cl)
op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
whether to just send the job and print its identifier. It is used in
order to simplify the implementation of the '--submit' option.
- It will also add the dry-run parameter from the options passed, if true.
+ It will also process the opcodes if we're sending the via SendJob
+ (otherwise SubmitOpCode does it).
"""
- if opts and opts.dry_run:
- op.dry_run = opts.dry_run
if opts and opts.submit_only:
- job_id = SendJob([op], cl=cl)
+ job = [op]
+ SetGenericOpcodeOpts(job, opts)
+ job_id = SendJob(job, cl=cl)
raise JobSubmittedException(job_id)
else:
- return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
+ return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
+
+
+def SetGenericOpcodeOpts(opcode_list, options):
+ """Processor for generic options.
+
+ This function updates the given opcodes based on generic command
+ line options (like debug, dry-run, etc.).
+
+ @param opcode_list: list of opcodes
+ @param options: command line options or None
+ @return: None (in-place modification)
+
+ """
+ if not options:
+ return
+ for op in opcode_list:
+ op.dry_run = options.dry_run
+ op.debug_level = options.debug
def GetClient():
try:
client = luxi.Client()
except luxi.NoMasterError:
- master, myself = ssconf.GetMasterAndMyself()
+ ss = ssconf.SimpleStore()
+
+ # Try to read ssconf file
+ try:
+ ss.GetMasterNode()
+ except errors.ConfigurationError:
+ raise errors.OpPrereqError("Cluster not initialized or this machine is"
+ " not part of a cluster")
+
+ master, myself = ssconf.GetMasterAndMyself(ss=ss)
if master != myself:
raise errors.OpPrereqError("This is not the master node, please connect"
" to node '%s' and rerun the command" %
master)
- else:
- raise
+ raise
return client
msg = "Failure: can't resolve hostname '%s'"
obuf.write(msg % err.args[0])
elif isinstance(err, errors.OpPrereqError):
- obuf.write("Failure: prerequisites not met for this"
- " operation:\n%s" % msg)
+ if len(err.args) == 2:
+ obuf.write("Failure: prerequisites not met for this"
+ " operation:\nerror type: %s, error details:\n%s" %
+ (err.args[1], err.args[0]))
+ else:
+ obuf.write("Failure: prerequisites not met for this"
+ " operation:\n%s" % msg)
elif isinstance(err, errors.OpExecError):
obuf.write("Failure: command execution error:\n%s" % msg)
elif isinstance(err, errors.TagError):
return result
+def GenericInstanceCreate(mode, opts, args):
+ """Add an instance to the cluster via either creation or import.
+
+ @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
+ @param opts: the command line options selected by the user
+ @type args: list
+ @param args: should contain only one element, the new instance name
+ @rtype: int
+ @return: the desired exit code
+
+ """
+ instance = args[0]
+
+ (pnode, snode) = SplitNodeOption(opts.node)
+
+ hypervisor = None
+ hvparams = {}
+ if opts.hypervisor:
+ hypervisor, hvparams = opts.hypervisor
+
+ if opts.nics:
+ try:
+ nic_max = max(int(nidx[0]) + 1 for nidx in opts.nics)
+ except ValueError, err:
+ raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err))
+ nics = [{}] * nic_max
+ for nidx, ndict in opts.nics:
+ nidx = int(nidx)
+ if not isinstance(ndict, dict):
+ msg = "Invalid nic/%d value: expected dict, got %s" % (nidx, ndict)
+ raise errors.OpPrereqError(msg)
+ nics[nidx] = ndict
+ elif opts.no_nics:
+ # no nics
+ nics = []
+ else:
+ # default of one nic, all auto
+ nics = [{}]
+
+ if opts.disk_template == constants.DT_DISKLESS:
+ if opts.disks or opts.sd_size is not None:
+ raise errors.OpPrereqError("Diskless instance but disk"
+ " information passed")
+ disks = []
+ else:
+ if not opts.disks and not opts.sd_size:
+ raise errors.OpPrereqError("No disk information specified")
+ if opts.disks and opts.sd_size is not None:
+ raise errors.OpPrereqError("Please use either the '--disk' or"
+ " '-s' option")
+ if opts.sd_size is not None:
+ opts.disks = [(0, {"size": opts.sd_size})]
+ try:
+ disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
+ except ValueError, err:
+ raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err))
+ disks = [{}] * disk_max
+ for didx, ddict in opts.disks:
+ didx = int(didx)
+ if not isinstance(ddict, dict):
+ msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
+ raise errors.OpPrereqError(msg)
+ elif "size" not in ddict:
+ raise errors.OpPrereqError("Missing size for disk %d" % didx)
+ try:
+ ddict["size"] = utils.ParseUnit(ddict["size"])
+ except ValueError, err:
+ raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
+ (didx, err))
+ disks[didx] = ddict
+
+ utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_TYPES)
+ utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
+
+ if mode == constants.INSTANCE_CREATE:
+ start = opts.start
+ os_type = opts.os
+ src_node = None
+ src_path = None
+ elif mode == constants.INSTANCE_IMPORT:
+ start = False
+ os_type = None
+ src_node = opts.src_node
+ src_path = opts.src_dir
+ else:
+ raise errors.ProgrammerError("Invalid creation mode %s" % mode)
+
+ op = opcodes.OpCreateInstance(instance_name=instance,
+ disks=disks,
+ disk_template=opts.disk_template,
+ nics=nics,
+ pnode=pnode, snode=snode,
+ ip_check=opts.ip_check,
+ name_check=opts.name_check,
+ wait_for_sync=opts.wait_for_sync,
+ file_storage_dir=opts.file_storage_dir,
+ file_driver=opts.file_driver,
+ iallocator=opts.iallocator,
+ hypervisor=hypervisor,
+ hvparams=hvparams,
+ beparams=opts.beparams,
+ mode=mode,
+ start=start,
+ os_type=os_type,
+ src_node=src_node,
+ src_path=src_path)
+
+ SubmitOrSend(op, opts)
+ return 0
+
+
def GenerateTable(headers, fields, separator, data,
numfields=None, unitfields=None,
units=None):
if unitfields is None:
unitfields = []
- numfields = utils.FieldSet(*numfields)
- unitfields = utils.FieldSet(*unitfields)
+ numfields = utils.FieldSet(*numfields) # pylint: disable-msg=W0142
+ unitfields = utils.FieldSet(*unitfields) # pylint: disable-msg=W0142
format_fields = []
for field in fields:
if unitfields.Matches(fields[idx]):
try:
val = int(val)
- except ValueError:
+ except (TypeError, ValueError):
pass
else:
val = row[idx] = utils.FormatUnit(val, units)
args.append(hdr)
result.append(format % tuple(args))
+ if separator is None:
+ assert len(mlens) == len(fields)
+
+ if fields and not numfields.Matches(fields[-1]):
+ mlens[-1] = 0
+
for line in data:
args = []
if line is None:
line = ['-' for _ in fields]
- for idx in xrange(len(fields)):
+ for idx in range(len(fields)):
if separator is None:
args.append(mlens[idx])
args.append(line[idx])
if value[-1] not in suffix_map:
try:
value = int(value)
- except ValueError:
+ except (TypeError, ValueError):
raise errors.OpPrereqError("Invalid time specification '%s'" % value)
else:
multiplier = suffix_map[value[-1]]
" suffix passed)")
try:
value = int(value) * multiplier
- except ValueError:
+ except (TypeError, ValueError):
raise errors.OpPrereqError("Invalid time specification '%s'" % value)
return value
use_locking=False)
offline = [row[0] for row in result if row[1]]
if offline and not nowarn:
- ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline))
+ ToStderr("Note: skipping offline node(s): %s" % utils.CommaJoin(offline))
return [row[0] for row in result if not row[1]]
GetResults() calls.
"""
- def __init__(self, cl=None, verbose=True):
+ def __init__(self, cl=None, verbose=True, opts=None):
self.queue = []
if cl is None:
cl = GetClient()
self.cl = cl
self.verbose = verbose
self.jobs = []
+ self.opts = opts
def QueueJob(self, name, *ops):
"""Record a job for later submit.
@type name: string
@param name: a description of the job, will be used in WaitJobSet
"""
+ SetGenericOpcodeOpts(ops, self.opts)
self.queue.append((name, ops))
def SubmitPending(self):
"""
results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
- for ((status, data), (name, _)) in zip(results, self.queue):
- self.jobs.append((status, data, name))
+ for (idx, ((status, data), (name, _))) in enumerate(zip(results,
+ self.queue)):
+ self.jobs.append((idx, status, data, name))
+
+ def _ChooseJob(self):
+ """Choose a non-waiting/queued job to poll next.
+
+ """
+ assert self.jobs, "_ChooseJob called with empty job list"
+
+ result = self.cl.QueryJobs([i[2] for i in self.jobs], ["status"])
+ assert result
+
+ for job_data, status in zip(self.jobs, result):
+ if status[0] in (constants.JOB_STATUS_QUEUED,
+ constants.JOB_STATUS_WAITLOCK,
+ constants.JOB_STATUS_CANCELING):
+ # job is still waiting
+ continue
+ # good candidate found
+ self.jobs.remove(job_data)
+ return job_data
+
+ # no job found
+ return self.jobs.pop(0)
def GetResults(self):
"""Wait for and return the results of all jobs.
self.SubmitPending()
results = []
if self.verbose:
- ok_jobs = [row[1] for row in self.jobs if row[0]]
+ ok_jobs = [row[2] for row in self.jobs if row[1]]
if ok_jobs:
- ToStdout("Submitted jobs %s", ", ".join(ok_jobs))
- for submit_status, jid, name in self.jobs:
- if not submit_status:
+ ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
+
+ # first, remove any non-submitted jobs
+ self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
+ for idx, _, jid, name in failures:
ToStderr("Failed to submit job for %s: %s", name, jid)
- results.append((False, jid))
- continue
- if self.verbose:
- ToStdout("Waiting for job %s for %s...", jid, name)
+ results.append((idx, False, jid))
+
+ while self.jobs:
+ (idx, _, jid, name) = self._ChooseJob()
+ ToStdout("Waiting for job %s for %s...", jid, name)
try:
job_result = PollJob(jid, cl=self.cl)
success = True
# the error message will always be shown, verbose or not
ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
- results.append((success, job_result))
+ results.append((idx, success, job_result))
+
+ # sort based on the index, then drop it
+ results.sort()
+ results = [i[1:] for i in results]
+
return results
def WaitOrShow(self, wait):