import os.path
import copy
import time
+import logging
from cStringIO import StringIO
from ganeti import utils
-from ganeti import logger
from ganeti import errors
from ganeti import constants
from ganeti import opcodes
__all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
"SubmitOpCode", "GetClient",
- "cli_option", "GenerateTable", "AskUser",
+ "cli_option", "ikv_option", "keyval_option",
+ "GenerateTable", "AskUser",
"ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
"USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT", "SUBMIT_OPT",
"ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
"FormatError", "SplitNodeOption", "SubmitOrSend",
+ "JobSubmittedException", "FormatTimestamp", "ParseTimespec",
+ "ValidateBeParams",
+ "ToStderr", "ToStdout",
]
TYPE_CHECKER["unit"] = check_unit
+def _SplitKeyVal(opt, data):
+ """Convert a KeyVal string into a dict.
+
+ This function will convert a key=val[,...] string into a dict. Empty
+ values will be converted specially: keys which have the prefix 'no_'
+ will have the value=False and the prefix stripped, the others will
+ have value=True.
+
+ @type opt: string
+ @param opt: a string holding the option name for which we process the
+ data, used in building error messages
+ @type data: string
+ @param data: a string of the format key=val,key=val,...
+ @rtype: dict
+ @return: {key=val, key=val}
+ @raises errors.ParameterError: if there are duplicate keys
+
+ """
+ NO_PREFIX = "no_"
+ UN_PREFIX = "-"
+ kv_dict = {}
+ for elem in data.split(","):
+ if "=" in elem:
+ key, val = elem.split("=", 1)
+ else:
+ if elem.startswith(NO_PREFIX):
+ key, val = elem[len(NO_PREFIX):], False
+ elif elem.startswith(UN_PREFIX):
+ key, val = elem[len(UN_PREFIX):], None
+ else:
+ key, val = elem, True
+ if key in kv_dict:
+ raise errors.ParameterError("Duplicate key '%s' in option %s" %
+ (key, opt))
+ kv_dict[key] = val
+ return kv_dict
+
+
+def check_ident_key_val(option, opt, value):
+ """Custom parser for the IdentKeyVal option type.
+
+ """
+ if ":" not in value:
+ retval = (value, {})
+ else:
+ ident, rest = value.split(":", 1)
+ kv_dict = _SplitKeyVal(opt, rest)
+ retval = (ident, kv_dict)
+ return retval
+
+
+class IdentKeyValOption(Option):
+ """Custom option class for ident:key=val,key=val options.
+
+ This will store the parsed values as a tuple (ident, {key: val}). As
+ such, multiple uses of this option via action=append is possible.
+
+ """
+ TYPES = Option.TYPES + ("identkeyval",)
+ TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
+ TYPE_CHECKER["identkeyval"] = check_ident_key_val
+
+
+def check_key_val(option, opt, value):
+ """Custom parser for the KeyVal option type.
+
+ """
+ return _SplitKeyVal(opt, value)
+
+
+class KeyValOption(Option):
+ """Custom option class for key=val,key=val options.
+
+ This will store the parsed values as a dict {key: val}.
+
+ """
+ TYPES = Option.TYPES + ("keyval",)
+ TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
+ TYPE_CHECKER["keyval"] = check_key_val
+
+
# optparse.py sets make_option, so we do it for our own option class, too
cli_option = CliOption
+ikv_option = IdentKeyValOption
+keyval_option = KeyValOption
def _ParseArgs(argv, commands, aliases):
return (value, None)
+def ValidateBeParams(bep):
+ """Parse and check the given beparams.
+
+ The function will update in-place the given dictionary.
+
+ @type bep: dict
+ @param bep: input beparams
+ @raise errors.ParameterError: if the input values are not OK
+ @raise errors.UnitParseError: if the input values are not OK
+
+ """
+ if constants.BE_MEMORY in bep:
+ bep[constants.BE_MEMORY] = utils.ParseUnit(bep[constants.BE_MEMORY])
+
+ if constants.BE_VCPUS in bep:
+ try:
+ bep[constants.BE_VCPUS] = int(bep[constants.BE_VCPUS])
+ except ValueError:
+ raise errors.ParameterError("Invalid number of VCPUs")
+
+
def AskUser(text, choices=None):
"""Ask the user a question.
return answer
+class JobSubmittedException(Exception):
+ """Job was submitted, client should exit.
+
+ This exception has one argument, the ID of the job that was
+ submitted. The handler should print this ID.
+
+ This is not an error, just a structured way to exit from clients.
+
+ """
+
+
def SendJob(ops, cl=None):
"""Function to submit an opcode without waiting for the results.
return job_id
-def PollJob(job_id, cl=None):
+def PollJob(job_id, cl=None, feedback_fn=None):
"""Function to poll for the result of a job.
@type job_id: job identified
if cl is None:
cl = GetClient()
- lastmsg = None
+ prev_job_info = None
+ prev_logmsg_serial = None
+
while True:
- jobs = cl.QueryJobs([job_id], ["status", "ticker"])
- if not jobs:
+ result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
+ prev_logmsg_serial)
+ if not result:
# job not found, go away!
raise errors.JobLost("Job with id %s lost" % job_id)
+ # Split result, a tuple of (field values, log entries)
+ (job_info, log_entries) = result
+ (status, ) = job_info
+
+ if log_entries:
+ for log_entry in log_entries:
+ (serial, timestamp, _, message) = log_entry
+ if callable(feedback_fn):
+ feedback_fn(log_entry[1:])
+ else:
+ print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), message)
+ prev_logmsg_serial = max(prev_logmsg_serial, serial)
+
# TODO: Handle canceled and archived jobs
- status = jobs[0][0]
- if status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR):
+ elif status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR):
break
- msg = jobs[0][1]
- if msg is not None and msg != lastmsg:
- if callable(feedback_fn):
- feedback_fn(msg)
- else:
- print "%s %s" % (time.ctime(msg[0]), msg[2])
- lastmsg = msg
- time.sleep(1)
+
+ prev_job_info = job_info
jobs = cl.QueryJobs([job_id], ["status", "opresult"])
if not jobs:
status, result = jobs[0]
if status == constants.JOB_STATUS_SUCCESS:
- return result[0]
+ return result
else:
raise errors.OpExecError(result)
job_id = SendJob([op], cl)
- return PollJob(job_id, cl)
+ op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
+
+ return op_results[0]
def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
"""
if opts and opts.submit_only:
- print SendJob([op], cl=cl)
- sys.exit(0)
+ job_id = SendJob([op], cl=cl)
+ raise JobSubmittedException(job_id)
else:
return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
msg = str(err)
if isinstance(err, errors.ConfigurationError):
txt = "Corrupt configuration file: %s" % msg
- logger.Error(txt)
+ logging.error(txt)
obuf.write(txt + "\n")
obuf.write("Aborting.")
retcode = 2
obuf.write("Failure: command execution error:\n%s" % msg)
elif isinstance(err, errors.TagError):
obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
+ elif isinstance(err, errors.JobQueueDrainError):
+ obuf.write("Failure: the job queue is marked for drain and doesn't"
+ " accept new requests\n")
elif isinstance(err, errors.GenericError):
obuf.write("Unhandled Ganeti error: %s" % msg)
elif isinstance(err, luxi.NoMasterError):
obuf.write("Cannot communicate with the master daemon.\nIs it running"
- " and listening on '%s'?" % err.args[0])
+ " and listening for connections?")
elif isinstance(err, luxi.TimeoutError):
obuf.write("Timeout while talking to the master daemon. Error:\n"
"%s" % msg)
elif isinstance(err, luxi.ProtocolError):
obuf.write("Unhandled protocol error while talking to the master daemon:\n"
"%s" % msg)
+ elif isinstance(err, JobSubmittedException):
+ obuf.write("JobID: %s\n" % err.args[0])
+ retcode = 0
else:
obuf.write("Unhandled exception: %s" % msg)
return retcode, obuf.getvalue().rstrip('\n')
for key, val in override.iteritems():
setattr(options, key, val)
- logger.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
- stderr_logging=True, program=binary)
+ utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
+ stderr_logging=True, program=binary)
utils.debug = options.debug
if old_cmdline:
- logger.Info("run with arguments '%s'" % old_cmdline)
+ logging.info("run with arguments '%s'", old_cmdline)
else:
- logger.Info("run with no arguments")
+ logging.info("run with no arguments")
try:
result = func(options, args)
except (errors.GenericError, luxi.ProtocolError), err:
result, err_msg = FormatError(err)
- logger.ToStderr(err_msg)
+ logging.exception("Error durring command processing")
+ ToStderr(err_msg)
return result
result.append(format % tuple(args))
return result
+
+
+def FormatTimestamp(ts):
+ """Formats a given timestamp.
+
+ @type ts: timestamp
+ @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
+
+ @rtype: string
+ @returns: a string with the formatted timestamp
+
+ """
+ if not isinstance (ts, (tuple, list)) or len(ts) != 2:
+ return '?'
+ sec, usec = ts
+ return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
+
+
+def ParseTimespec(value):
+ """Parse a time specification.
+
+ The following suffixed will be recognized:
+
+ - s: seconds
+ - m: minutes
+ - h: hours
+ - d: day
+ - w: weeks
+
+ Without any suffix, the value will be taken to be in seconds.
+
+ """
+ value = str(value)
+ if not value:
+ raise errors.OpPrereqError("Empty time specification passed")
+ suffix_map = {
+ 's': 1,
+ 'm': 60,
+ 'h': 3600,
+ 'd': 86400,
+ 'w': 604800,
+ }
+ if value[-1] not in suffix_map:
+ try:
+ value = int(value)
+ except ValueError:
+ raise errors.OpPrereqError("Invalid time specification '%s'" % value)
+ else:
+ multiplier = suffix_map[value[-1]]
+ value = value[:-1]
+ if not value: # no data left after stripping the suffix
+ raise errors.OpPrereqError("Invalid time specification (only"
+ " suffix passed)")
+ try:
+ value = int(value) * multiplier
+ except ValueError:
+ raise errors.OpPrereqError("Invalid time specification '%s'" % value)
+ return value
+
+
+def _ToStream(stream, txt, *args):
+ """Write a message to a stream, bypassing the logging system
+
+ @type stream: file object
+ @param stream: the file to which we should write
+ @type txt: str
+ @param txt: the message
+
+ """
+ if args:
+ args = tuple(args)
+ stream.write(txt % args)
+ else:
+ stream.write(txt)
+ stream.write('\n')
+ stream.flush()
+
+
+def ToStdout(txt, *args):
+ """Write a message to stdout only, bypassing the logging system
+
+ This is just a wrapper over _ToStream.
+
+ @type txt: str
+ @param txt: the message
+
+ """
+ _ToStream(sys.stdout, txt, *args)
+
+
+def ToStderr(txt, *args):
+ """Write a message to stderr only, bypassing the logging system
+
+ This is just a wrapper over _ToStream.
+
+ @type txt: str
+ @param txt: the message
+
+ """
+ _ToStream(sys.stderr, txt, *args)