Parallelize LUCreateInstance
[ganeti-local] / lib / cli.py
index 9a6784d..b97edff 100644 (file)
@@ -26,24 +26,28 @@ import sys
 import textwrap
 import os.path
 import copy
 import textwrap
 import os.path
 import copy
+import time
 from cStringIO import StringIO
 
 from ganeti import utils
 from ganeti import logger
 from ganeti import errors
 from cStringIO import StringIO
 
 from ganeti import utils
 from ganeti import logger
 from ganeti import errors
-from ganeti import mcpu
 from ganeti import constants
 from ganeti import opcodes
 from ganeti import constants
 from ganeti import opcodes
+from ganeti import luxi
+from ganeti import ssconf
 
 from optparse import (OptionParser, make_option, TitledHelpFormatter,
 
 from optparse import (OptionParser, make_option, TitledHelpFormatter,
-                      Option, OptionValueError, SUPPRESS_HELP)
+                      Option, OptionValueError)
 
 
-__all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain", "SubmitOpCode",
+__all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
+           "SubmitOpCode", "GetClient",
            "cli_option", "GenerateTable", "AskUser",
            "ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
            "cli_option", "GenerateTable", "AskUser",
            "ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
-           "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT",
+           "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT", "SUBMIT_OPT",
            "ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
            "ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
-           "FormatError",
+           "FormatError", "SplitNodeOption", "SubmitOrSend",
+           "JobSubmittedException",
            ]
 
 
            ]
 
 
@@ -167,18 +171,22 @@ USEUNITS_OPT = make_option("--human-readable", default=False,
                            help="Print sizes in human readable format")
 
 FIELDS_OPT = make_option("-o", "--output", dest="output", action="store",
                            help="Print sizes in human readable format")
 
 FIELDS_OPT = make_option("-o", "--output", dest="output", action="store",
-                         type="string", help="Select output fields",
+                         type="string", help="Comma separated list of"
+                         " output fields",
                          metavar="FIELDS")
 
 FORCE_OPT = make_option("-f", "--force", dest="force", action="store_true",
                         default=False, help="Force the operation")
 
                          metavar="FIELDS")
 
 FORCE_OPT = make_option("-f", "--force", dest="force", action="store_true",
                         default=False, help="Force the operation")
 
-_LOCK_OPT = make_option("--lock-retries", default=None,
-                        type="int", help=SUPPRESS_HELP)
-
 TAG_SRC_OPT = make_option("--from", dest="tags_source",
                           default=None, help="File with tag names")
 
 TAG_SRC_OPT = make_option("--from", dest="tags_source",
                           default=None, help="File with tag names")
 
+SUBMIT_OPT = make_option("--submit", dest="submit_only",
+                         default=False, action="store_true",
+                         help="Submit the job and return the job ID, but"
+                         " don't wait for the job to finish")
+
+
 def ARGS_FIXED(val):
   """Macro-like function denoting a fixed number of arguments"""
   return -val
 def ARGS_FIXED(val):
   """Macro-like function denoting a fixed number of arguments"""
   return -val
@@ -195,6 +203,9 @@ ARGS_ANY = ARGS_ATLEAST(0)
 
 
 def check_unit(option, opt, value):
 
 
 def check_unit(option, opt, value):
+  """OptParsers custom converter for units.
+
+  """
   try:
     return utils.ParseUnit(value)
   except errors.UnitParseError, err:
   try:
     return utils.ParseUnit(value)
   except errors.UnitParseError, err:
@@ -202,6 +213,9 @@ def check_unit(option, opt, value):
 
 
 class CliOption(Option):
 
 
 class CliOption(Option):
+  """Custom option class for optparse.
+
+  """
   TYPES = Option.TYPES + ("unit",)
   TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
   TYPE_CHECKER["unit"] = check_unit
   TYPES = Option.TYPES + ("unit",)
   TYPE_CHECKER = copy.copy(Option.TYPE_CHECKER)
   TYPE_CHECKER["unit"] = check_unit
@@ -211,7 +225,7 @@ class CliOption(Option):
 cli_option = CliOption
 
 
 cli_option = CliOption
 
 
-def _ParseArgs(argv, commands):
+def _ParseArgs(argv, commands, aliases):
   """Parses the command line and return the function which must be
   executed together with its arguments
 
   """Parses the command line and return the function which must be
   executed together with its arguments
 
@@ -220,6 +234,7 @@ def _ParseArgs(argv, commands):
 
     commands: dictionary with special contents, see the design doc for
     cmdline handling
 
     commands: dictionary with special contents, see the design doc for
     cmdline handling
+    aliases: dictionary with command aliases {'alias': 'target, ...}
 
   """
   if len(argv) == 0:
 
   """
   if len(argv) == 0:
@@ -233,7 +248,8 @@ def _ParseArgs(argv, commands):
     # argument. optparse.py does it the same.
     sys.exit(0)
 
     # argument. optparse.py does it the same.
     sys.exit(0)
 
-  if len(argv) < 2 or argv[1] not in commands.keys():
+  if len(argv) < 2 or not (argv[1] in commands or
+                           argv[1] in aliases):
     # let's do a nice thing
     sortedcmds = commands.keys()
     sortedcmds.sort()
     # let's do a nice thing
     sortedcmds = commands.keys()
     sortedcmds.sort()
@@ -241,23 +257,34 @@ def _ParseArgs(argv, commands):
            "\n%(bin)s <command> --help to see details, or"
            " man %(bin)s\n" % {"bin": binary})
     # compute the max line length for cmd + usage
            "\n%(bin)s <command> --help to see details, or"
            " man %(bin)s\n" % {"bin": binary})
     # compute the max line length for cmd + usage
-    mlen = max([len(" %s %s" % (cmd, commands[cmd][3])) for cmd in commands])
+    mlen = max([len(" %s" % cmd) for cmd in commands])
     mlen = min(60, mlen) # should not get here...
     # and format a nice command list
     print "Commands:"
     for cmd in sortedcmds:
     mlen = min(60, mlen) # should not get here...
     # and format a nice command list
     print "Commands:"
     for cmd in sortedcmds:
-      cmdstr = " %s %s" % (cmd, commands[cmd][3])
+      cmdstr = " %s" % (cmd,)
       help_text = commands[cmd][4]
       help_lines = textwrap.wrap(help_text, 79-3-mlen)
       help_text = commands[cmd][4]
       help_lines = textwrap.wrap(help_text, 79-3-mlen)
-      print "%-*s - %s" % (mlen, cmdstr,
-                                          help_lines.pop(0))
+      print "%-*s - %s" % (mlen, cmdstr, help_lines.pop(0))
       for line in help_lines:
         print "%-*s   %s" % (mlen, "", line)
     print
     return None, None, None
       for line in help_lines:
         print "%-*s   %s" % (mlen, "", line)
     print
     return None, None, None
+
+  # get command, unalias it, and look it up in commands
   cmd = argv.pop(1)
   cmd = argv.pop(1)
+  if cmd in aliases:
+    if cmd in commands:
+      raise errors.ProgrammerError("Alias '%s' overrides an existing"
+                                   " command" % cmd)
+
+    if aliases[cmd] not in commands:
+      raise errors.ProgrammerError("Alias '%s' maps to non-existing"
+                                   " command '%s'" % (cmd, aliases[cmd]))
+
+    cmd = aliases[cmd]
+
   func, nargs, parser_opts, usage, description = commands[cmd]
   func, nargs, parser_opts, usage, description = commands[cmd]
-  parser_opts.append(_LOCK_OPT)
   parser = OptionParser(option_list=parser_opts,
                         description=description,
                         formatter=TitledHelpFormatter(),
   parser = OptionParser(option_list=parser_opts,
                         description=description,
                         formatter=TitledHelpFormatter(),
@@ -280,6 +307,16 @@ def _ParseArgs(argv, commands):
   return func, options, args
 
 
   return func, options, args
 
 
+def SplitNodeOption(value):
+  """Splits the value of a --node option.
+
+  """
+  if value and ':' in value:
+    return value.split(':', 1)
+  else:
+    return (value, None)
+
+
 def AskUser(text, choices=None):
   """Ask the user a question.
 
 def AskUser(text, choices=None):
   """Ask the user a question.
 
@@ -311,7 +348,7 @@ def AskUser(text, choices=None):
     new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
   text = "\n".join(new_text)
   try:
     new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
   text = "\n".join(new_text)
   try:
-    f = file("/dev/tty", "r+")
+    f = file("/dev/tty", "a+")
   except IOError:
     return answer
   try:
   except IOError:
     return answer
   try:
@@ -338,19 +375,133 @@ def AskUser(text, choices=None):
   return answer
 
 
   return answer
 
 
-def SubmitOpCode(op, proc=None, feedback_fn=None):
-  """Function to submit an opcode.
+class JobSubmittedException(Exception):
+  """Job was submitted, client should exit.
+
+  This exception has one argument, the ID of the job that was
+  submitted. The handler should print this ID.
+
+  This is not an error, just a structured way to exit from clients.
+
+  """
+
+
+def SendJob(ops, cl=None):
+  """Function to submit an opcode without waiting for the results.
+
+  @type ops: list
+  @param ops: list of opcodes
+  @type cl: luxi.Client
+  @param cl: the luxi client to use for communicating with the master;
+             if None, a new client will be created
+
+  """
+  if cl is None:
+    cl = GetClient()
+
+  job_id = cl.SubmitJob(ops)
+
+  return job_id
+
+
+def PollJob(job_id, cl=None, feedback_fn=None):
+  """Function to poll for the result of a job.
+
+  @type job_id: job identified
+  @param job_id: the job to poll for results
+  @type cl: luxi.Client
+  @param cl: the luxi client to use for communicating with the master;
+             if None, a new client will be created
+
+  """
+  if cl is None:
+    cl = GetClient()
+
+  prev_job_info = None
+  prev_logmsg_serial = None
+
+  while True:
+    result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
+                                 prev_logmsg_serial)
+    if not result:
+      # job not found, go away!
+      raise errors.JobLost("Job with id %s lost" % job_id)
+
+    # Split result, a tuple of (field values, log entries)
+    (job_info, log_entries) = result
+    (status, ) = job_info
+
+    if log_entries:
+      for log_entry in log_entries:
+        (serial, timestamp, _, message) = log_entry
+        if callable(feedback_fn):
+          feedback_fn(log_entry[1:])
+        else:
+          print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), message)
+        prev_logmsg_serial = max(prev_logmsg_serial, serial)
+
+    # TODO: Handle canceled and archived jobs
+    elif status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR):
+      break
+
+    prev_job_info = job_info
+
+  jobs = cl.QueryJobs([job_id], ["status", "opresult"])
+  if not jobs:
+    raise errors.JobLost("Job with id %s lost" % job_id)
+
+  status, result = jobs[0]
+  if status == constants.JOB_STATUS_SUCCESS:
+    return result[0]
+  else:
+    raise errors.OpExecError(result)
+
+
+def SubmitOpCode(op, cl=None, feedback_fn=None):
+  """Legacy function to submit an opcode.
 
   This is just a simple wrapper over the construction of the processor
   instance. It should be extended to better handle feedback and
   interaction functions.
 
   """
 
   This is just a simple wrapper over the construction of the processor
   instance. It should be extended to better handle feedback and
   interaction functions.
 
   """
-  if proc is None:
-    proc = mcpu.Processor()
-  if feedback_fn is None:
-    feedback_fn = logger.ToStdout
-  return proc.ExecOpCode(op, feedback_fn)
+  if cl is None:
+    cl = GetClient()
+
+  job_id = SendJob([op], cl)
+
+  return PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
+
+
+def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
+  """Wrapper around SubmitOpCode or SendJob.
+
+  This function will decide, based on the 'opts' parameter, whether to
+  submit and wait for the result of the opcode (and return it), or
+  whether to just send the job and print its identifier. It is used in
+  order to simplify the implementation of the '--submit' option.
+
+  """
+  if opts and opts.submit_only:
+    job_id = SendJob([op], cl=cl)
+    raise JobSubmittedException(job_id)
+  else:
+    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
+
+
+def GetClient():
+  # TODO: Cache object?
+  try:
+    client = luxi.Client()
+  except luxi.NoMasterError:
+    master, myself = ssconf.GetMasterAndMyself()
+    if master != myself:
+      raise errors.OpPrereqError("This is not the master node, please connect"
+                                 " to node '%s' and rerun the command" %
+                                 master)
+    else:
+      raise
+  return client
 
 
 def FormatError(err):
 
 
 def FormatError(err):
@@ -364,10 +515,11 @@ def FormatError(err):
   """
   retcode = 1
   obuf = StringIO()
   """
   retcode = 1
   obuf = StringIO()
+  msg = str(err)
   if isinstance(err, errors.ConfigurationError):
   if isinstance(err, errors.ConfigurationError):
-    msg = "Corrupt configuration file: %s" % err
-    logger.Error(msg)
-    obuf.write(msg + "\n")
+    txt = "Corrupt configuration file: %s" % msg
+    logger.Error(txt)
+    obuf.write(txt + "\n")
     obuf.write("Aborting.")
     retcode = 2
   elif isinstance(err, errors.HooksAbort):
     obuf.write("Aborting.")
     retcode = 2
   elif isinstance(err, errors.HooksAbort):
@@ -380,7 +532,7 @@ def FormatError(err):
         obuf.write("  node: %s, script: %s (no output)\n" %
                    (node, script))
   elif isinstance(err, errors.HooksFailure):
         obuf.write("  node: %s, script: %s (no output)\n" %
                    (node, script))
   elif isinstance(err, errors.HooksFailure):
-    obuf.write("Failure: hooks general failure: %s" % str(err))
+    obuf.write("Failure: hooks general failure: %s" % msg)
   elif isinstance(err, errors.ResolverError):
     this_host = utils.HostInfo.SysName()
     if err.args[0] == this_host:
   elif isinstance(err, errors.ResolverError):
     this_host = utils.HostInfo.SysName()
     if err.args[0] == this_host:
@@ -390,19 +542,31 @@ def FormatError(err):
     obuf.write(msg % err.args[0])
   elif isinstance(err, errors.OpPrereqError):
     obuf.write("Failure: prerequisites not met for this"
     obuf.write(msg % err.args[0])
   elif isinstance(err, errors.OpPrereqError):
     obuf.write("Failure: prerequisites not met for this"
-               " operation:\n%s" % str(err))
+               " operation:\n%s" % msg)
   elif isinstance(err, errors.OpExecError):
   elif isinstance(err, errors.OpExecError):
-    obuf.write("Failure: command execution error:\n%s" % str(err))
+    obuf.write("Failure: command execution error:\n%s" % msg)
   elif isinstance(err, errors.TagError):
   elif isinstance(err, errors.TagError):
-    obuf.write("Failure: invalid tag(s) given:\n%s" % str(err))
+    obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
   elif isinstance(err, errors.GenericError):
   elif isinstance(err, errors.GenericError):
-    obuf.write("Unhandled Ganeti error: %s" % str(err))
+    obuf.write("Unhandled Ganeti error: %s" % msg)
+  elif isinstance(err, luxi.NoMasterError):
+    obuf.write("Cannot communicate with the master daemon.\nIs it running"
+               " and listening for connections?")
+  elif isinstance(err, luxi.TimeoutError):
+    obuf.write("Timeout while talking to the master daemon. Error:\n"
+               "%s" % msg)
+  elif isinstance(err, luxi.ProtocolError):
+    obuf.write("Unhandled protocol error while talking to the master daemon:\n"
+               "%s" % msg)
+  elif isinstance(err, JobSubmittedException):
+    obuf.write("JobID: %s\n" % err.args[0])
+    retcode = 0
   else:
   else:
-    obuf.write("Unhandled exception: %s" % str(err))
+    obuf.write("Unhandled exception: %s" % msg)
   return retcode, obuf.getvalue().rstrip('\n')
 
 
   return retcode, obuf.getvalue().rstrip('\n')
 
 
-def GenericMain(commands, override=None):
+def GenericMain(commands, override=None, aliases=None):
   """Generic main function for all the gnt-* commands.
 
   Arguments:
   """Generic main function for all the gnt-* commands.
 
   Arguments:
@@ -411,6 +575,7 @@ def GenericMain(commands, override=None):
     - override: if not None, we expect a dictionary with keys that will
                 override command line options; this can be used to pass
                 options from the scripts to generic functions
     - override: if not None, we expect a dictionary with keys that will
                 override command line options; this can be used to pass
                 options from the scripts to generic functions
+    - aliases: dictionary with command aliases {'alias': 'target, ...}
 
   """
   # save the program name and the entire command line for later logging
 
   """
   # save the program name and the entire command line for later logging
@@ -425,7 +590,10 @@ def GenericMain(commands, override=None):
     binary = "<unknown program>"
     old_cmdline = ""
 
     binary = "<unknown program>"
     old_cmdline = ""
 
-  func, options, args = _ParseArgs(sys.argv, commands)
+  if aliases is None:
+    aliases = {}
+
+  func, options, args = _ParseArgs(sys.argv, commands, aliases)
   if func is None: # parse error
     return 1
 
   if func is None: # parse error
     return 1
 
@@ -433,13 +601,10 @@ def GenericMain(commands, override=None):
     for key, val in override.iteritems():
       setattr(options, key, val)
 
     for key, val in override.iteritems():
       setattr(options, key, val)
 
-  logger.SetupLogging(debug=options.debug, program=binary)
+  logger.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
+                      stderr_logging=True, program=binary)
 
 
-  try:
-    utils.Lock('cmd', max_retries=options.lock_retries, debug=options.debug)
-  except errors.LockError, err:
-    logger.ToStderr(str(err))
-    return 1
+  utils.debug = options.debug
 
   if old_cmdline:
     logger.Info("run with arguments '%s'" % old_cmdline)
 
   if old_cmdline:
     logger.Info("run with arguments '%s'" % old_cmdline)
@@ -447,14 +612,10 @@ def GenericMain(commands, override=None):
     logger.Info("run with no arguments")
 
   try:
     logger.Info("run with no arguments")
 
   try:
-    try:
-      result = func(options, args)
-    except errors.GenericError, err:
-      result, err_msg = FormatError(err)
-      logger.ToStderr(err_msg)
-  finally:
-    utils.Unlock('cmd')
-    utils.LockCleanup()
+    result = func(options, args)
+  except (errors.GenericError, luxi.ProtocolError), err:
+    result, err_msg = FormatError(err)
+    logger.ToStderr(err_msg)
 
   return result
 
 
   return result
 
@@ -479,6 +640,9 @@ def GenerateTable(headers, fields, separator, data,
 
   format_fields = []
   for field in fields:
 
   format_fields = []
   for field in fields:
+    if headers and field not in headers:
+      raise errors.ProgrammerError("Missing header description for field '%s'"
+                                   % field)
     if separator is not None:
       format_fields.append("%s")
     elif field in numfields:
     if separator is not None:
       format_fields.append("%s")
     elif field in numfields:
@@ -501,6 +665,7 @@ def GenerateTable(headers, fields, separator, data,
           pass
         else:
           val = row[idx] = utils.FormatUnit(val)
           pass
         else:
           val = row[idx] = utils.FormatUnit(val)
+      val = row[idx] = str(val)
       if separator is None:
         mlens[idx] = max(mlens[idx], len(val))
 
       if separator is None:
         mlens[idx] = max(mlens[idx], len(val))