Parallelize LUCreateInstance
[ganeti-local] / lib / cli.py
index 5a966bc..b97edff 100644 (file)
@@ -47,6 +47,7 @@ __all__ = ["DEBUG_OPT", "NOHDR_OPT", "SEP_OPT", "GenericMain",
            "USEUNITS_OPT", "FIELDS_OPT", "FORCE_OPT", "SUBMIT_OPT",
            "ListTags", "AddTags", "RemoveTags", "TAG_SRC_OPT",
            "FormatError", "SplitNodeOption", "SubmitOrSend",
+           "JobSubmittedException",
            ]
 
 
@@ -374,6 +375,17 @@ def AskUser(text, choices=None):
   return answer
 
 
+class JobSubmittedException(Exception):
+  """Job was submitted, client should exit.
+
+  This exception has one argument, the ID of the job that was
+  submitted. The handler should print this ID.
+
+  This is not an error, just a structured way to exit from clients.
+
+  """
+
+
 def SendJob(ops, cl=None):
   """Function to submit an opcode without waiting for the results.
 
@@ -405,25 +417,34 @@ def PollJob(job_id, cl=None, feedback_fn=None):
   if cl is None:
     cl = GetClient()
 
-  state = None
-  lastmsg = None
+  prev_job_info = None
+  prev_logmsg_serial = None
+
   while True:
-    state = cl.WaitForJobChange(job_id, ["status", "ticker"], state)
-    if not state:
+    result = cl.WaitForJobChange(job_id, ["status"], prev_job_info,
+                                 prev_logmsg_serial)
+    if not result:
       # job not found, go away!
       raise errors.JobLost("Job with id %s lost" % job_id)
 
+    # Split result, a tuple of (field values, log entries)
+    (job_info, log_entries) = result
+    (status, ) = job_info
+
+    if log_entries:
+      for log_entry in log_entries:
+        (serial, timestamp, _, message) = log_entry
+        if callable(feedback_fn):
+          feedback_fn(log_entry[1:])
+        else:
+          print "%s %s" % (time.ctime(utils.MergeTime(timestamp)), message)
+        prev_logmsg_serial = max(prev_logmsg_serial, serial)
+
     # TODO: Handle canceled and archived jobs
-    status = state[0]
-    if status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR):
+    elif status in (constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR):
       break
-    msg = state[1]
-    if msg is not None and msg != lastmsg:
-      if callable(feedback_fn):
-        feedback_fn(msg)
-      else:
-        print "%s %s" % (time.ctime(utils.MergeTime(msg[0])), msg[2])
-    lastmsg = msg
+
+    prev_job_info = job_info
 
   jobs = cl.QueryJobs([job_id], ["status", "opresult"])
   if not jobs:
@@ -462,8 +483,8 @@ def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
 
   """
   if opts and opts.submit_only:
-    print SendJob([op], cl=cl)
-    sys.exit(0)
+    job_id = SendJob([op], cl=cl)
+    raise JobSubmittedException(job_id)
   else:
     return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn)
 
@@ -530,13 +551,16 @@ def FormatError(err):
     obuf.write("Unhandled Ganeti error: %s" % msg)
   elif isinstance(err, luxi.NoMasterError):
     obuf.write("Cannot communicate with the master daemon.\nIs it running"
-               " and listening on '%s'?" % err.args[0])
+               " and listening for connections?")
   elif isinstance(err, luxi.TimeoutError):
     obuf.write("Timeout while talking to the master daemon. Error:\n"
                "%s" % msg)
   elif isinstance(err, luxi.ProtocolError):
     obuf.write("Unhandled protocol error while talking to the master daemon:\n"
                "%s" % msg)
+  elif isinstance(err, JobSubmittedException):
+    obuf.write("JobID: %s\n" % err.args[0])
+    retcode = 0
   else:
     obuf.write("Unhandled exception: %s" % msg)
   return retcode, obuf.getvalue().rstrip('\n')