Rename the device type constants
[ganeti-local] / lib / cli.py
index 71a9953..ce1277d 100644 (file)
@@ -27,27 +27,31 @@ import textwrap
 import os.path
 import copy
 import time
 import os.path
 import copy
 import time
+import logging
 from cStringIO import StringIO
 
 from ganeti import utils
 from cStringIO import StringIO
 
 from ganeti import utils
-from ganeti import logger
 from ganeti import errors
 from ganeti import constants
 from ganeti import opcodes
 from ganeti import luxi
 from ganeti import ssconf
 from ganeti import errors
 from ganeti import constants
 from ganeti import opcodes
 from ganeti import luxi
 from ganeti import ssconf
+from ganeti import rpc
 
 from optparse import (OptionParser, make_option, TitledHelpFormatter,
                       Option, OptionValueError)
 
 __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
            "SubmitOpCode", "GetClient",
 
 from optparse import (OptionParser, make_option, TitledHelpFormatter,
                       Option, OptionValueError)
 
 __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
            "SubmitOpCode", "GetClient",
-           "cli_option", "GenerateTable", "AskUser",
+           "cli_option", "ikv_option", "keyval_option",
+           "GenerateTable", "AskUser",
            "ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
            "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT", "SUBMIT_OPT",
            "ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
            "FormatError", "SplitNodeOption", "SubmitOrSend",
            "ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
            "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT", "SUBMIT_OPT",
            "ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
            "FormatError", "SplitNodeOption", "SubmitOrSend",
-           "JobSubmittedException", "FormatTimestamp",
+           "JobSubmittedException", "FormatTimestamp", "ParseTimespec",
+           "ValidateBeParams", "ToStderr", "ToStdout", "UsesRPC",
+           "GetOnlineNodes", "JobExecutor", "SYNC_OPT",
            ]
 
 
            ]
 
 
@@ -166,9 +170,9 @@ SEP_OPT = make_option("--separator", default=None,
                       help="Separator between output fields"
                       " (defaults to one space)")
 
                       help="Separator between output fields"
                       " (defaults to one space)")
 
-USEUNITS_OPT = make_option("--human-readable", default=False,
-                           action="store_true", dest="human_readable",
-                           help="Print sizes in human readable format")
+USEUNITS_OPT = make_option("--units", default=None,
+                           dest="units", choices=('h', 'm', 'g', 't'),
+                           help="Specify units for output (one of hmgt)")
 
 FIELDS_OPT = make_option("-o", "--output", dest="output", action="store",
                          type="string", help="Comma separated list of"
 
 FIELDS_OPT = make_option("-o", "--output", dest="output", action="store",
                          type="string", help="Comma separated list of"
@@ -186,6 +190,11 @@ SUBMIT_OPT = make_option("--submit", dest="submit_only",
                          help="Submit the job and return the job ID, but"
                          " don't wait for the job to finish")
 
                          help="Submit the job and return the job ID, but"
                          " don't wait for the job to finish")
 
+SYNC_OPT = make_option("--sync", dest="do_locking",
+                       default=False, action="store_true",
+                       help="Grab locks while doing the queries"
+                       " in order to ensure more consistent results")
+
 
 def ARGS_FIXED(val):
   """Macro-like function denoting a fixed number of arguments"""
 
 def ARGS_FIXED(val):
   """Macro-like function denoting a fixed number of arguments"""
@@ -221,20 +230,103 @@ class CliOption(Option):
   TYPE_CHECKER["unit"] = check_unit
 
 
   TYPE_CHECKER["unit"] = check_unit
 
 
+def _SplitKeyVal(opt, data):
+  """Convert a KeyVal string into a dict.
+
+  This function will convert a key=val[,...] string into a dict. Empty
+  values will be converted specially: keys which have the prefix 'no_'
+  will have the value=False and the prefix stripped, the others will
+  have value=True.
+
+  @type opt: string
+  @param opt: a string holding the option name for which we process the
+      data, used in building error messages
+  @type data: string
+  @param data: a string of the format key=val,key=val,...
+  @rtype: dict
+  @return: {key=val, key=val}
+  @raises errors.ParameterError: if there are duplicate keys
+
+  """
+  NO_PREFIX = "no_"
+  UN_PREFIX = "-"
+  kv_dict = {}
+  for elem in data.split(","):
+    if "=" in elem:
+      key, val = elem.split("=", 1)
+    else:
+      if elem.startswith(NO_PREFIX):
+        key, val = elem[len(NO_PREFIX):], False
+      elif elem.startswith(UN_PREFIX):
+        key, val = elem[len(UN_PREFIX):], None
+      else:
+        key, val = elem, True
+    if key in kv_dict:
+      raise errors.ParameterError("Duplicate key '%s' in option %s" %
+                                  (key, opt))
+    kv_dict[key] = val
+  return kv_dict
+
+
+def check_ident_key_val(option, opt, value):
+  """Custom parser for the IdentKeyVal option type.
+
+  """
+  if ":" not in value:
+    retval =  (value, {})
+  else:
+    ident, rest = value.split(":", 1)
+    kv_dict = _SplitKeyVal(opt, rest)
+    retval = (ident, kv_dict)
+  return retval
+
+
+class IdentKeyValOption(Option):
+  """Custom option class for ident:key=val,key=val options.
+
+  This will store the parsed values as a tuple (ident, {key: val}). As
+  such, multiple uses of this option via action=append is possible.
+
+  """
+  TYPES = Option.TYPES + ("identkeyval",)
+  TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
+  TYPE_CHECKER["identkeyval"] = check_ident_key_val
+
+
+def check_key_val(option, opt, value):
+  """Custom parser for the KeyVal option type.
+
+  """
+  return _SplitKeyVal(opt, value)
+
+
+class KeyValOption(Option):
+  """Custom option class for key=val,key=val options.
+
+  This will store the parsed values as a dict {key: val}.
+
+  """
+  TYPES = Option.TYPES + ("keyval",)
+  TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
+  TYPE_CHECKER["keyval"] = check_key_val
+
+
 # optparse.py sets make_option, so we do it for our own option class, too
 cli_option = CliOption
 # optparse.py sets make_option, so we do it for our own option class, too
 cli_option = CliOption
+ikv_option = IdentKeyValOption
+keyval_option = KeyValOption
 
 
 def _ParseArgs(argv, commands, aliases):
 
 
 def _ParseArgs(argv, commands, aliases):
-  """Parses the command line and return the function which must be
-  executed together with its arguments
+  """Parser for the command line arguments.
 
 
-  Arguments:
-    argv: the command line
+  This function parses the arguements and returns the function which
+  must be executed together with its (modified) arguments.
 
 
-    commands: dictionary with special contents, see the design doc for
-    cmdline handling
-    aliases: dictionary with command aliases {'alias': 'target, ...}
+  @param argv: the command line
+  @param commands: dictionary with special contents, see the design
+      doc for cmdline handling
+  @param aliases: dictionary with command aliases {'alias': 'target, ...}
 
   """
   if len(argv) == 0:
 
   """
   if len(argv) == 0:
@@ -317,20 +409,50 @@ def SplitNodeOption(value):
     return (value, None)
 
 
     return (value, None)
 
 
+def ValidateBeParams(bep):
+  """Parse and check the given beparams.
+
+  The function will update in-place the given dictionary.
+
+  @type bep: dict
+  @param bep: input beparams
+  @raise errors.ParameterError: if the input values are not OK
+  @raise errors.UnitParseError: if the input values are not OK
+
+  """
+  if constants.BE_MEMORY in bep:
+    bep[constants.BE_MEMORY] = utils.ParseUnit(bep[constants.BE_MEMORY])
+
+  if constants.BE_VCPUS in bep:
+    try:
+      bep[constants.BE_VCPUS] = int(bep[constants.BE_VCPUS])
+    except ValueError:
+      raise errors.ParameterError("Invalid number of VCPUs")
+
+
+def UsesRPC(fn):
+  def wrapper(*args, **kwargs):
+    rpc.Init()
+    try:
+      return fn(*args, **kwargs)
+    finally:
+      rpc.Shutdown()
+  return wrapper
+
+
 def AskUser(text, choices=None):
   """Ask the user a question.
 
 def AskUser(text, choices=None):
   """Ask the user a question.
 
-  Args:
-    text - the question to ask.
+  @param text: the question to ask
 
 
-    choices - list with elements tuples (input_char, return_value,
-    description); if not given, it will default to: [('y', True,
-    'Perform the operation'), ('n', False, 'Do no do the operation')];
-    note that the '?' char is reserved for help
+  @param choices: list with elements tuples (input_char, return_value,
+      description); if not given, it will default to: [('y', True,
+      'Perform the operation'), ('n', False, 'Do no do the operation')];
+      note that the '?' char is reserved for help
 
 
-  Returns: one of the return values from the choices list; if input is
-  not possible (i.e. not running with a tty, we return the last entry
-  from the list
+  @return: one of the return values from the choices list; if input is
+      not possible (i.e. not running with a tty, we return the last
+      entry from the list
 
   """
   if choices is None:
 
   """
   if choices is None:
@@ -437,23 +559,41 @@ def PollJob(job_id, cl=None, feedback_fn=None):
         if callable(feedback_fn):
           feedback_fn(log_entry[1:])
         else:
         if callable(feedback_fn):
           feedback_fn(log_entry[1:])
         else:
-          print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), message)
+          encoded = utils.SafeEncode(message)
+          print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), encoded)
         prev_logmsg_serial = max(prev_logmsg_serial, serial)
 
     # TODO: Handle canceled and archived jobs
         prev_logmsg_serial = max(prev_logmsg_serial, serial)
 
     # TODO: Handle canceled and archived jobs
-    elif status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR):
+    elif status in (constants.JOB_STATUS_SUCCESS,
+                    constants.JOB_STATUS_ERROR,
+                    constants.JOB_STATUS_CANCELING,
+                    constants.JOB_STATUS_CANCELED):
       break
 
     prev_job_info = job_info
 
       break
 
     prev_job_info = job_info
 
-  jobs = cl.QueryJobs([job_id], ["status", "opresult"])
+  jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
   if not jobs:
     raise errors.JobLost("Job with id %s lost" % job_id)
 
   if not jobs:
     raise errors.JobLost("Job with id %s lost" % job_id)
 
-  status, result = jobs[0]
+  status, opstatus, result = jobs[0]
   if status == constants.JOB_STATUS_SUCCESS:
     return result
   if status == constants.JOB_STATUS_SUCCESS:
     return result
+  elif status in (constants.JOB_STATUS_CANCELING,
+                  constants.JOB_STATUS_CANCELED):
+    raise errors.OpExecError("Job was canceled")
   else:
   else:
+    has_ok = False
+    for idx, (status, msg) in enumerate(zip(opstatus, result)):
+      if status == constants.OP_STATUS_SUCCESS:
+        has_ok = True
+      elif status == constants.OP_STATUS_ERROR:
+        if has_ok:
+          raise errors.OpExecError("partial failure (opcode %d): %s" %
+                                   (idx, msg))
+        else:
+          raise errors.OpExecError(str(msg))
+    # default failure mode
     raise errors.OpExecError(result)
 
 
     raise errors.OpExecError(result)
 
 
@@ -520,7 +660,7 @@ def FormatError(err):
   msg = str(err)
   if isinstance(err, errors.ConfigurationError):
     txt = "Corrupt configuration file: %s" % msg
   msg = str(err)
   if isinstance(err, errors.ConfigurationError):
     txt = "Corrupt configuration file: %s" % msg
-    logger.Error(txt)
+    logging.error(txt)
     obuf.write(txt + "\n")
     obuf.write("Aborting.")
     retcode = 2
     obuf.write(txt + "\n")
     obuf.write("Aborting.")
     retcode = 2
@@ -549,6 +689,12 @@ def FormatError(err):
     obuf.write("Failure: command execution error:\n%s" % msg)
   elif isinstance(err, errors.TagError):
     obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
     obuf.write("Failure: command execution error:\n%s" % msg)
   elif isinstance(err, errors.TagError):
     obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
+  elif isinstance(err, errors.JobQueueDrainError):
+    obuf.write("Failure: the job queue is marked for drain and doesn't"
+               " accept new requests\n")
+  elif isinstance(err, errors.JobQueueFull):
+    obuf.write("Failure: the job queue is full and doesn't accept new"
+               " job submissions until old jobs are archived\n")
   elif isinstance(err, errors.GenericError):
     obuf.write("Unhandled Ganeti error: %s" % msg)
   elif isinstance(err, luxi.NoMasterError):
   elif isinstance(err, errors.GenericError):
     obuf.write("Unhandled Ganeti error: %s" % msg)
   elif isinstance(err, luxi.NoMasterError):
@@ -603,51 +749,80 @@ def GenericMain(commands, override=None, aliases=None):
     for key, val in override.iteritems():
       setattr(options, key, val)
 
     for key, val in override.iteritems():
       setattr(options, key, val)
 
-  logger.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
-                      stderr_logging=True, program=binary)
+  utils.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
+                     stderr_logging=True, program=binary)
 
   utils.debug = options.debug
 
   if old_cmdline:
 
   utils.debug = options.debug
 
   if old_cmdline:
-    logger.Info("run with arguments '%s'" % old_cmdline)
+    logging.info("run with arguments '%s'", old_cmdline)
   else:
   else:
-    logger.Info("run with no arguments")
+    logging.info("run with no arguments")
 
   try:
     result = func(options, args)
 
   try:
     result = func(options, args)
-  except (errors.GenericError, luxi.ProtocolError), err:
+  except (errors.GenericError, luxi.ProtocolError,
+          JobSubmittedException), err:
     result, err_msg = FormatError(err)
     result, err_msg = FormatError(err)
-    logger.ToStderr(err_msg)
+    logging.exception("Error durring command processing")
+    ToStderr(err_msg)
 
   return result
 
 
 def GenerateTable(headers, fields, separator, data,
 
   return result
 
 
 def GenerateTable(headers, fields, separator, data,
-                  numfields=None, unitfields=None):
+                  numfields=None, unitfields=None,
+                  units=None):
   """Prints a table with headers and different fields.
 
   """Prints a table with headers and different fields.
 
-  Args:
-    headers: Dict of header titles or None if no headers should be shown
-    fields: List of fields to show
-    separator: String used to separate fields or None for spaces
-    data: Data to be printed
-    numfields: List of fields to be aligned to right
-    unitfields: List of fields to be formatted as units
+  @type headers: dict
+  @param headers: dictionary mapping field names to headers for
+      the table
+  @type fields: list
+  @param fields: the field names corresponding to each row in
+      the data field
+  @param separator: the separator to be used; if this is None,
+      the default 'smart' algorithm is used which computes optimal
+      field width, otherwise just the separator is used between
+      each field
+  @type data: list
+  @param data: a list of lists, each sublist being one row to be output
+  @type numfields: list
+  @param numfields: a list with the fields that hold numeric
+      values and thus should be right-aligned
+  @type unitfields: list
+  @param unitfields: a list with the fields that hold numeric
+      values that should be formatted with the units field
+  @type units: string or None
+  @param units: the units we should use for formatting, or None for
+      automatic choice (human-readable for non-separator usage, otherwise
+      megabytes); this is a one-letter string
 
   """
 
   """
+  if units is None:
+    if separator:
+      units = "m"
+    else:
+      units = "h"
+
   if numfields is None:
     numfields = []
   if unitfields is None:
     unitfields = []
 
   if numfields is None:
     numfields = []
   if unitfields is None:
     unitfields = []
 
+  numfields = utils.FieldSet(*numfields)
+  unitfields = utils.FieldSet(*unitfields)
+
   format_fields = []
   for field in fields:
     if headers and field not in headers:
   format_fields = []
   for field in fields:
     if headers and field not in headers:
-      raise errors.ProgrammerError("Missing header description for field '%s'"
-                                   % field)
+      # FIXME: handle better unknown fields (either revert to old
+      # style of raising exception, or deal more intelligently with
+      # variable fields)
+      headers[field] = field
     if separator is not None:
       format_fields.append("%s")
     if separator is not None:
       format_fields.append("%s")
-    elif field in numfields:
+    elif numfields.Matches(field):
       format_fields.append("%*s")
     else:
       format_fields.append("%-*s")
       format_fields.append("%*s")
     else:
       format_fields.append("%-*s")
@@ -660,13 +835,13 @@ def GenerateTable(headers, fields, separator, data,
 
   for row in data:
     for idx, val in enumerate(row):
 
   for row in data:
     for idx, val in enumerate(row):
-      if fields[idx] in unitfields:
+      if unitfields.Matches(fields[idx]):
         try:
           val = int(val)
         except ValueError:
           pass
         else:
         try:
           val = int(val)
         except ValueError:
           pass
         else:
-          val = row[idx] = utils.FormatUnit(val)
+          val = row[idx] = utils.FormatUnit(val, units)
       val = row[idx] = str(val)
       if separator is None:
         mlens[idx] = max(mlens[idx], len(val))
       val = row[idx] = str(val)
       if separator is None:
         mlens[idx] = max(mlens[idx], len(val))
@@ -707,3 +882,177 @@ def FormatTimestamp(ts):
     return '?'
   sec, usec = ts
   return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
     return '?'
   sec, usec = ts
   return time.strftime("%F %T", time.localtime(sec)) + ".%06d" % usec
+
+
+def ParseTimespec(value):
+  """Parse a time specification.
+
+  The following suffixed will be recognized:
+
+    - s: seconds
+    - m: minutes
+    - h: hours
+    - d: day
+    - w: weeks
+
+  Without any suffix, the value will be taken to be in seconds.
+
+  """
+  value = str(value)
+  if not value:
+    raise errors.OpPrereqError("Empty time specification passed")
+  suffix_map = {
+    's': 1,
+    'm': 60,
+    'h': 3600,
+    'd': 86400,
+    'w': 604800,
+    }
+  if value[-1] not in suffix_map:
+    try:
+      value = int(value)
+    except ValueError:
+      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
+  else:
+    multiplier = suffix_map[value[-1]]
+    value = value[:-1]
+    if not value: # no data left after stripping the suffix
+      raise errors.OpPrereqError("Invalid time specification (only"
+                                 " suffix passed)")
+    try:
+      value = int(value) * multiplier
+    except ValueError:
+      raise errors.OpPrereqError("Invalid time specification '%s'" % value)
+  return value
+
+
+def GetOnlineNodes(nodes, cl=None, nowarn=False):
+  """Returns the names of online nodes.
+
+  This function will also log a warning on stderr with the names of
+  the online nodes.
+
+  @param nodes: if not empty, use only this subset of nodes (minus the
+      offline ones)
+  @param cl: if not None, luxi client to use
+  @type nowarn: boolean
+  @param nowarn: by default, this function will output a note with the
+      offline nodes that are skipped; if this parameter is True the
+      note is not displayed
+
+  """
+  if cl is None:
+    cl = GetClient()
+
+  result = cl.QueryNodes(names=nodes, fields=["name", "offline"],
+                         use_locking=False)
+  offline = [row[0] for row in result if row[1]]
+  if offline and not nowarn:
+    ToStderr("Note: skipping offline node(s): %s" % ", ".join(offline))
+  return [row[0] for row in result if not row[1]]
+
+
+def _ToStream(stream, txt, *args):
+  """Write a message to a stream, bypassing the logging system
+
+  @type stream: file object
+  @param stream: the file to which we should write
+  @type txt: str
+  @param txt: the message
+
+  """
+  if args:
+    args = tuple(args)
+    stream.write(txt % args)
+  else:
+    stream.write(txt)
+  stream.write('\n')
+  stream.flush()
+
+
+def ToStdout(txt, *args):
+  """Write a message to stdout only, bypassing the logging system
+
+  This is just a wrapper over _ToStream.
+
+  @type txt: str
+  @param txt: the message
+
+  """
+  _ToStream(sys.stdout, txt, *args)
+
+
+def ToStderr(txt, *args):
+  """Write a message to stderr only, bypassing the logging system
+
+  This is just a wrapper over _ToStream.
+
+  @type txt: str
+  @param txt: the message
+
+  """
+  _ToStream(sys.stderr, txt, *args)
+
+
+class JobExecutor(object):
+  """Class which manages the submission and execution of multiple jobs.
+
+  Note that instances of this class should not be reused between
+  GetResults() calls.
+
+  """
+  def __init__(self, cl=None, verbose=True):
+    self.queue = []
+    if cl is None:
+      cl = GetClient()
+    self.cl = cl
+    self.verbose = verbose
+
+  def QueueJob(self, name, *ops):
+    """Submit a job for execution.
+
+    @type name: string
+    @param name: a description of the job, will be used in WaitJobSet
+    """
+    job_id = SendJob(ops, cl=self.cl)
+    self.queue.append((job_id, name))
+
+  def GetResults(self):
+    """Wait for and return the results of all jobs.
+
+    @rtype: list
+    @return: list of tuples (success, job results), in the same order
+        as the submitted jobs; if a job has failed, instead of the result
+        there will be the error message
+
+    """
+    results = []
+    if self.verbose:
+      ToStdout("Submitted jobs %s", ", ".join(row[0] for row in self.queue))
+    for jid, name in self.queue:
+      if self.verbose:
+        ToStdout("Waiting for job %s for %s...", jid, name)
+      try:
+        job_result = PollJob(jid, cl=self.cl)
+        success = True
+      except (errors.GenericError, luxi.ProtocolError), err:
+        _, job_result = FormatError(err)
+        success = False
+        # the error message will always be shown, verbose or not
+        ToStderr("Job %s for %s has failed: %s", jid, name, job_result)
+
+      results.append((success, job_result))
+    return results
+
+  def WaitOrShow(self, wait):
+    """Wait for job results or only print the job IDs.
+
+    @type wait: boolean
+    @param wait: whether to wait or not
+
+    """
+    if wait:
+      return self.GetResults()
+    else:
+      for jid, name in self.queue:
+        ToStdout("%s: %s", jid, name)