Parallelize LUCreateInstance
[ganeti-local] / lib / cli.py
index befcb2c..b97edff 100644 (file)
@@ -32,21 +32,22 @@ from cStringIO import StringIO
 from ganeti import utils
 from ganeti import logger
 from ganeti import errors
-from ganeti import mcpu
 from ganeti import constants
 from ganeti import opcodes
 from ganeti import luxi
+from ganeti import ssconf
 
 from optparse import (OptionParser, make_option, TitledHelpFormatter,
-                      Option, OptionValueError, SUPPRESS_HELP)
+                      Option, OptionValueError)
 
 __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
-           "SubmitOpCode",
+           "SubmitOpCode", "GetClient",
            "cli_option", "GenerateTable", "AskUser",
            "ARGS_NONE", "ARGS_FIXED", "ARGS_ATLEAST", "ARGS_ANY", "ARGS_ONE",
-           "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT",
+           "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT", "SUBMIT_OPT",
            "ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
-           "FormatError", "SplitNodeOption"
+           "FormatError", "SplitNodeOption", "SubmitOrSend",
+           "JobSubmittedException",
            ]
 
 
@@ -180,6 +181,11 @@ FORCE_OPT = make_option("-f", "--force", dest="force", action="store_true",
 TAG_SRC_OPT = make_option("--from", dest="tags_source",
                           default=None, help="File with tag names")
 
+SUBMIT_OPT = make_option("--submit", dest="submit_only",
+                         default=False, action="store_true",
+                         help="Submit the job and return the job ID, but"
+                         " don't wait for the job to finish")
+
 
 def ARGS_FIXED(val):
   """Macro-like function denoting a fixed number of arguments"""
@@ -369,32 +375,78 @@ def AskUser(text, choices=None):
   return answer
 
 
-def SubmitOpCode(op, proc=None, feedback_fn=None):
-  """Legacy function to submit an opcode.
+class JobSubmittedException(Exception):
+  """Job was submitted, client should exit.
 
-  This is just a simple wrapper over the construction of the processor
-  instance. It should be extended to better handle feedback and
-  interaction functions.
+  This exception has one argument, the ID of the job that was
+  submitted. The handler should print this ID.
+
+  This is not an error, just a structured way to exit from clients.
+
+  """
+
+
+def SendJob(ops, cl=None):
+  """Function to submit an opcode without waiting for the results.
+
+  @type ops: list
+  @param ops: list of opcodes
+  @type cl: luxi.Client
+  @param cl: the luxi client to use for communicating with the master;
+             if None, a new client will be created
 
   """
-  # TODO: Fix feedback_fn situation.
-  cl = luxi.Client()
+  if cl is None:
+    cl = GetClient()
 
-  job_id = cl.SubmitJob([op])
+  job_id = cl.SubmitJob(ops)
+
+  return job_id
+
+
+def PollJob(job_id, cl=None, feedback_fn=None):
+  """Function to poll for the result of a job.
+
+  @type job_id: job identified
+  @param job_id: the job to poll for results
+  @type cl: luxi.Client
+  @param cl: the luxi client to use for communicating with the master;
+             if None, a new client will be created
+
+  """
+  if cl is None:
+    cl = GetClient()
+
+  prev_job_info = None
+  prev_logmsg_serial = None
 
   while True:
-    jobs = cl.QueryJobs([job_id], ["status"])
-    if not jobs:
+    result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
+                                 prev_logmsg_serial)
+    if not result:
       # job not found, go away!
       raise errors.JobLost("Job with id %s lost" % job_id)
 
+    # Split result, a tuple of (field values, log entries)
+    (job_info, log_entries) = result
+    (status, ) = job_info
+
+    if log_entries:
+      for log_entry in log_entries:
+        (serial, timestamp, _, message) = log_entry
+        if callable(feedback_fn):
+          feedback_fn(log_entry[1:])
+        else:
+          print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), message)
+        prev_logmsg_serial = max(prev_logmsg_serial, serial)
+
     # TODO: Handle canceled and archived jobs
-    status = jobs[0][0]
-    if status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR):
+    elif status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR):
       break
-    time.sleep(1)
 
-  jobs = cl.QueryJobs([job_id], ["status", "result"])
+    prev_job_info = job_info
+
+  jobs = cl.QueryJobs([job_id], ["status", "opresult"])
   if not jobs:
     raise errors.JobLost("Job with id %s lost" % job_id)
 
@@ -405,6 +457,53 @@ def SubmitOpCode(op, proc=None, feedback_fn=None):
     raise errors.OpExecError(result)
 
 
+def SubmitOpCode(op, cl=None, feedback_fn=None):
+  """Legacy function to submit an opcode.
+
+  This is just a simple wrapper over the construction of the processor
+  instance. It should be extended to better handle feedback and
+  interaction functions.
+
+  """
+  if cl is None:
+    cl = GetClient()
+
+  job_id = SendJob([op], cl)
+
+  return PollJob(job_id, cl=cl, feedback_fn=feedback_fn)
+
+
+def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
+  """Wrapper around SubmitOpCode or SendJob.
+
+  This function will decide, based on the 'opts' parameter, whether to
+  submit and wait for the result of the opcode (and return it), or
+  whether to just send the job and print its identifier. It is used in
+  order to simplify the implementation of the '--submit' option.
+
+  """
+  if opts and opts.submit_only:
+    job_id = SendJob([op], cl=cl)
+    raise JobSubmittedException(job_id)
+  else:
+    return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
+
+
+def GetClient():
+  # TODO: Cache object?
+  try:
+    client = luxi.Client()
+  except luxi.NoMasterError:
+    master, myself = ssconf.GetMasterAndMyself()
+    if master != myself:
+      raise errors.OpPrereqError("This is not the master node, please connect"
+                                 " to node '%s' and rerun the command" %
+                                 master)
+    else:
+      raise
+  return client
+
+
 def FormatError(err):
   """Return a formatted error message for a given error.
 
@@ -452,13 +551,16 @@ def FormatError(err):
     obuf.write("Unhandled Ganeti error: %s" % msg)
   elif isinstance(err, luxi.NoMasterError):
     obuf.write("Cannot communicate with the master daemon.\nIs it running"
-               " and listening on '%s'?" % err.args[0])
+               " and listening for connections?")
   elif isinstance(err, luxi.TimeoutError):
     obuf.write("Timeout while talking to the master daemon. Error:\n"
                "%s" % msg)
   elif isinstance(err, luxi.ProtocolError):
     obuf.write("Unhandled protocol error while talking to the master daemon:\n"
                "%s" % msg)
+  elif isinstance(err, JobSubmittedException):
+    obuf.write("JobID: %s\n" % err.args[0])
+    retcode = 0
   else:
     obuf.write("Unhandled exception: %s" % msg)
   return retcode, obuf.getvalue().rstrip('\n')
@@ -499,7 +601,8 @@ def GenericMain(commands, override=None, aliases=None):
     for key, val in override.iteritems():
       setattr(options, key, val)
 
-  logger.SetupLogging(program=binary, debug=options.debug)
+  logger.SetupLogging(constants.LOG_COMMANDS, debug=options.debug,
+                      stderr_logging=True, program=binary)
 
   utils.debug = options.debug