RAPI: Export beparams as dict. The patch also enables LUQueryInstances to accept...
[ganeti-local] / lib / cli.py
index a5d3a80..7832b29 100644 (file)
@@ -27,10 +27,10 @@ import textwrap
 import os.path
 import copy
 import time
+import logging
 from cStringIO import StringIO
 
 from ganeti import utils
-from ganeti import logger
 from ganeti import errors
 from ganeti import constants
 from ganeti import opcodes
@@ -42,11 +42,15 @@ from optparse import (OptionParser, make_option, TitledHelpFormatter,
 
 __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
            "SubmitOpCode", "GetClient",
-           "cli_option", "GenerateTable", "AskUser",
+           "cli_option", "ikv_option", "keyval_option",
+           "GenerateTable", "AskUser",
            "ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
            "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT", "SUBMIT_OPT",
            "ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
            "FormatError", "SplitNodeOption", "SubmitOrSend",
+           "JobSubmittedException", "FormatTimestamp", "ParseTimespec",
+           "ValidateBeParams",
+           "ToStderr", "ToStdout",
            ]
 
 
@@ -220,8 +224,91 @@ class CliOption(Option):
   TYPE_CHECKER["unit"] = check_unit
 
 
+def _SplitKeyVal(opt, data):
+  """Convert a KeyVal string into a dict.
+
+  This function will convert a key=val[,...] string into a dict. Empty
+  values will be converted specially: keys which have the prefix 'no_'
+  will have the value=False and the prefix stripped, the others will
+  have value=True.
+
+  @type opt: string
+  @param opt: a string holding the option name for which we process the
+      data, used in building error messages
+  @type data: string
+  @param data: a string of the format key=val,key=val,...
+  @rtype: dict
+  @return: {key=val, key=val}
+  @raises errors.ParameterError: if there are duplicate keys
+
+  """
+  NO_PREFIX = "no_"
+  UN_PREFIX = "-"
+  kv_dict = {}
+  for elem in data.split(","):
+    if "=" in elem:
+      key, val = elem.split("=", 1)
+    else:
+      if elem.startswith(NO_PREFIX):
+        key, val = elem[len(NO_PREFIX):], False
+      elif elem.startswith(UN_PREFIX):
+        key, val = elem[len(UN_PREFIX):], None
+      else:
+        key, val = elem, True
+    if key in kv_dict:
+      raise errors.ParameterError("Duplicate key '%s' in option %s" %
+                                  (key, opt))
+    kv_dict[key] = val
+  return kv_dict
+
+
+def check_ident_key_val(option, opt, value):
+  """Custom parser for the IdentKeyVal option type.
+
+  """
+  if ":" not in value:
+    retval =  (value, {})
+  else:
+    ident, rest = value.split(":", 1)
+    kv_dict = _SplitKeyVal(opt, rest)
+    retval = (ident, kv_dict)
+  return retval
+
+
+class IdentKeyValOption(Option):
+  """Custom option class for ident:key=val,key=val options.
+
+  This will store the parsed values as a tuple (ident, {key: val}). As
+  such, multiple uses of this option via action=append is possible.
+
+  """
+  TYPES = Option.TYPES + ("identkeyval",)
+  TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
+  TYPE_CHECKER["identkeyval"] = check_ident_key_val
+
+
+def check_key_val(option, opt, value):
+  """Custom parser for the KeyVal option type.
+
+  """
+  return _SplitKeyVal(opt, value)
+
+
+class KeyValOption(Option):
+  """Custom option class for key=val,key=val options.
+
+  This will store the parsed values as a dict {key: val}.
+
+  """
+  TYPES = Option.TYPES + ("keyval",)
+  TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
+  TYPE_CHECKER["keyval"] = check_key_val
+
+
 # optparse.py sets make_option, so we do it for our own option class, too
 cli_option = CliOption
+ikv_option = IdentKeyValOption
+keyval_option = KeyValOption
 
 
 def _ParseArgs(argv, commands, aliases):
@@ -316,6 +403,27 @@ def SplitNodeOption(value):
     return (value, None)
 
 
+def ValidateBeParams(bep):
+  """Parse and check the given beparams.
+
+  The function will update in-place the given dictionary.
+
+  @type bep: dict
+  @param bep: input beparams
+  @raise errors.ParameterError: if the input values are not OK
+  @raise errors.UnitParseError: if the input values are not OK
+
+  """
+  if constants.BE_MEMORY in bep:
+    bep[constants.BE_MEMORY] = utils.ParseUnit(bep[constants.BE_MEMORY])
+
+  if constants.BE_VCPUS in bep:
+    try:
+      bep[constants.BE_VCPUS] = int(bep[constants.BE_VCPUS])
+    except ValueError:
+      raise errors.ParameterError("Invalid number of VCPUs")
+
+
 def AskUser(text, choices=None):
   """Ask the user a question.
 
@@ -374,6 +482,17 @@ def AskUser(text, choices=None):
   return answer
 
 
+class JobSubmittedException(Exception):
+  """Job was submitted, client should exit.
+
+  This exception has one argument, the ID of the job that was
+  submitted. The handler should print this ID.
+
+  This is not an error, just a structured way to exit from clients.
+
+  """
+
+
 def SendJob(ops, cl=None):
   """Function to submit an opcode without waiting for the results.
 
@@ -392,7 +511,7 @@ def SendJob(ops, cl=None):
   return job_id
 
 
-def PollJob(job_id, cl=None):
+def PollJob(job_id, cl=None, feedback_fn=None):
   """Function to poll for the result of a job.
 
   @type job_id: job identified
@@ -405,25 +524,34 @@ def PollJob(job_id, cl=None):
   if cl is None:
     cl = GetClient()
 
-  lastmsg = None
+  prev_job_info = None
+  prev_logmsg_serial = None
+
   while True:
-    jobs = cl.QueryJobs([job_id], ["status", "ticker"])
-    if not jobs:
+    result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
+                                 prev_logmsg_serial)
+    if not result:
       # job not found, go away!
       raise errors.JobLost("Job with id %s lost" % job_id)
 
+    # Split result, a tuple of (field values, log entries)
+    (job_info, log_entries) = result
+    (status, ) = job_info
+
+    if log_entries:
+      for log_entry in log_entries:
+        (serial, timestamp, _, message) = log_entry
+        if callable(feedback_fn):
+          feedback_fn(log_entry[1:])
+        else:
+          print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), message)
+        prev_logmsg_serial = max(prev_logmsg_serial, serial)
+
     # TODO: Handle canceled and archived jobs
-    status = jobs[0][0]
-    if status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR):
+    elif status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR):
       break
-    msg = jobs[0][1]
-    if msg is not None and msg != lastmsg:
-      if callable(feedback_fn):
-        feedback_fn(msg)
-      else:
-        print "%s %s" % (time.ctime(msg[0]), msg[2])
-    lastmsg = msg
-    time.sleep(1)
+
+    prev_job_info = job_info
 
   jobs = cl.QueryJobs([job_id], ["status", "opresult"])
   if not jobs:
@@ -431,7 +559,7 @@ def PollJob(job_id, cl=None):
 
   status, result = jobs[0]
   if status == constants.JOB_STATUS_SUCCESS:
-    return result[0]
+    return result
   else:
     raise errors.OpExecError(result)
 
@@ -449,7 +577,9 @@ def SubmitOpCode(op, cl=None, feedback_fn=None):
 
   job_id = SendJob([op], cl)
 
-  return PollJob(job_id, cl)
+  op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
+
+  return op_results[0]
 
 
 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
@@ -462,8 +592,8 @@ def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
 
   """
   if opts and opts.submit_only:
-    print SendJob([op], cl=cl)
-    sys.exit(0)
+    job_id = SendJob([op], cl=cl)
+    raise JobSubmittedException(job_id)
   else:
     return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
 
@@ -497,7 +627,7 @@ def FormatError(err):
   msg = str(err)
   if isinstance(err, errors.ConfigurationError):
     txt = "Corrupt configuration file: %s" % msg
-    logger.Error(txt)
+    logging.error(txt)
     obuf.write(txt + "\n")
     obuf.write("Aborting.")
     retcode = 2
@@ -526,17 +656,23 @@ def FormatError(err):
     obuf.write("Failure: command execution error:\n%s" % msg)
   elif isinstance(err, errors.TagError):
     obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
+  elif isinstance(err, errors.JobQueueDrainError):
+    obuf.write("Failure: the job queue is marked for drain and doesn't"
+               " accept new requests\n")
   elif isinstance(err, errors.GenericError):
     obuf.write("Unhandled Ganeti error: %s" % msg)
   elif isinstance(err, luxi.NoMasterError):
     obuf.write("Cannot communicate with the master daemon.\nIs it running"
-               " and listening on '%s'?" % err.args[0])
+               " and listening for connections?")
   elif isinstance(err, luxi.TimeoutError):
     obuf.write("Timeout while talking to the master daemon. Error:\n"
                "%s" % msg)
   elif isinstance(err, luxi.ProtocolError):
     obuf.write("Unhandled protocol error while talking to the master daemon:\n"
                "%s" % msg)
+  elif isinstance(err, JobSubmittedException):
+    obuf.write("JobID: %s\n" % err.args[0])
+    retcode = 0
   else:
     obuf.write("Unhandled exception: %s" % msg)
   return retcode, obuf.getvalue().rstrip('\n')
@@ -577,21 +713,22 @@ def GenericMain(commands, override=None, aliases=None):
     for key, val in override.iteritems():
       setattr(options, key, val)
 
-  logger.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
-                      stderr_logging=True, program=binary)
+  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
+                     stderr_logging=True, program=binary)
 
   utils.debug = options.debug
 
   if old_cmdline:
-    logger.Info("run with arguments '%s'" % old_cmdline)
+    logging.info("run with arguments '%s'", old_cmdline)
   else:
-    logger.Info("run with no arguments")
+    logging.info("run with no arguments")
 
   try:
     result = func(options, args)
   except (errors.GenericError, luxi.ProtocolError), err:
     result, err_msg = FormatError(err)
-    logger.ToStderr(err_msg)
+    logging.exception("Error durring command processing")
+    ToStderr(err_msg)
 
   return result
 
@@ -665,3 +802,103 @@ def GenerateTable(headers, fields, separator, data,
     result.append(format % tuple(args))
 
   return result
+
+
+def FormatTimestamp(ts):
+  """Formats a given timestamp.
+
+  @type ts: timestamp
+  @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
+
+  @rtype: string
+  @returns: a string with the formatted timestamp
+
+  """
+  if not isinstance (ts, (tuple, list)) or len(ts) != 2:
+    return '?'
+  sec, usec = ts
+  return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
+
+
+def ParseTimespec(value):
+  """Parse a time specification.
+
+  The following suffixed will be recognized:
+
+    - s: seconds
+    - m: minutes
+    - h: hours
+    - d: day
+    - w: weeks
+
+  Without any suffix, the value will be taken to be in seconds.
+
+  """
+  value = str(value)
+  if not value:
+    raise errors.OpPrereqError("Empty time specification passed")
+  suffix_map = {
+    's': 1,
+    'm': 60,
+    'h': 3600,
+    'd': 86400,
+    'w': 604800,
+    }
+  if value[-1] not in suffix_map:
+    try:
+      value = int(value)
+    except ValueError:
+      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
+  else:
+    multiplier = suffix_map[value[-1]]
+    value = value[:-1]
+    if not value: # no data left after stripping the suffix
+      raise errors.OpPrereqError("Invalid time specification (only"
+                                 " suffix passed)")
+    try:
+      value = int(value) * multiplier
+    except ValueError:
+      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
+  return value
+
+
+def _ToStream(stream, txt, *args):
+  """Write a message to a stream, bypassing the logging system
+
+  @type stream: file object
+  @param stream: the file to which we should write
+  @type txt: str
+  @param txt: the message
+
+  """
+  if args:
+    args = tuple(args)
+    stream.write(txt % args)
+  else:
+    stream.write(txt)
+  stream.write('\n')
+  stream.flush()
+
+
+def ToStdout(txt, *args):
+  """Write a message to stdout only, bypassing the logging system
+
+  This is just a wrapper over _ToStream.
+
+  @type txt: str
+  @param txt: the message
+
+  """
+  _ToStream(sys.stdout, txt, *args)
+
+
+def ToStderr(txt, *args):
+  """Write a message to stderr only, bypassing the logging system
+
+  This is just a wrapper over _ToStream.
+
+  @type txt: str
+  @param txt: the message
+
+  """
+  _ToStream(sys.stderr, txt, *args)