Remove a TODO
[ganeti-local] / lib / cli.py
index 9a40c48..1932a00 100644 (file)
@@ -37,6 +37,7 @@ from ganeti import luxi
 from ganeti import ssconf
 from ganeti import rpc
 from ganeti import ssh
+from ganeti import compat
 
 from optparse import (OptionParser, TitledHelpFormatter,
                       Option, OptionValueError)
@@ -117,6 +118,7 @@ __all__ = [
   "REBOOT_TYPE_OPT",
   "REMOVE_INSTANCE_OPT",
   "REMOVE_UIDS_OPT",
+  "ROMAN_OPT",
   "SECONDARY_IP_OPT",
   "SELECT_OS_OPT",
   "SEP_OPT",
@@ -976,6 +978,12 @@ REMOVE_UIDS_OPT = cli_option("--remove-uids", default=None,
                                    " ranges separated by commas, to be"
                                    " removed from the user-id pool"))
 
+ROMAN_OPT = cli_option("--roman",
+                       dest="roman_integers", default=False,
+                       action="store_true",
+                       help="Use roman numbers for positive integers")
+
+
 
 def _ParseArgs(argv, commands, aliases):
   """Parser for the command line arguments.
@@ -1249,41 +1257,31 @@ def SendJob(ops, cl=None):
   return job_id
 
 
-def PollJob(job_id, cl=None, feedback_fn=None):
-  """Function to poll for the result of a job.
+def GenericPollJob(job_id, cbs, report_cbs):
+  """Generic job-polling function.
 
-  @type job_id: job identified
-  @param job_id: the job to poll for results
-  @type cl: luxi.Client
-  @param cl: the luxi client to use for communicating with the master;
-             if None, a new client will be created
+  @type job_id: number
+  @param job_id: Job ID
+  @type cbs: Instance of L{JobPollCbBase}
+  @param cbs: Data callbacks
+  @type report_cbs: Instance of L{JobPollReportCbBase}
+  @param report_cbs: Reporting callbacks
 
   """
-  if cl is None:
-    cl = GetClient()
-
   prev_job_info = None
   prev_logmsg_serial = None
 
   status = None
 
-  notified_queued = False
-  notified_waitlock = False
-
   while True:
-    result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
-                                     prev_logmsg_serial)
+    result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
+                                      prev_logmsg_serial)
     if not result:
       # job not found, go away!
       raise errors.JobLost("Job with id %s lost" % job_id)
-    elif result == constants.JOB_NOTCHANGED:
-      if status is not None and not callable(feedback_fn):
-        if status == constants.JOB_STATUS_QUEUED and not notified_queued:
-          ToStderr("Job %s is waiting in queue", job_id)
-          notified_queued = True
-        elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
-          ToStderr("Job %s is trying to acquire all necessary locks", job_id)
-          notified_waitlock = True
+
+    if result == constants.JOB_NOTCHANGED:
+      report_cbs.ReportNotChanged(job_id, status)
 
       # Wait again
       continue
@@ -1294,12 +1292,9 @@ def PollJob(job_id, cl=None, feedback_fn=None):
 
     if log_entries:
       for log_entry in log_entries:
-        (serial, timestamp, _, message) = log_entry
-        if callable(feedback_fn):
-          feedback_fn(log_entry[1:])
-        else:
-          encoded = utils.SafeEncode(message)
-          ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
+        (serial, timestamp, log_type, message) = log_entry
+        report_cbs.ReportLogMessage(job_id, serial, timestamp,
+                                    log_type, message)
         prev_logmsg_serial = max(prev_logmsg_serial, serial)
 
     # TODO: Handle canceled and archived jobs
@@ -1311,30 +1306,189 @@ def PollJob(job_id, cl=None, feedback_fn=None):
 
     prev_job_info = job_info
 
-  jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
+  jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
   if not jobs:
     raise errors.JobLost("Job with id %s lost" % job_id)
 
   status, opstatus, result = jobs[0]
+
   if status == constants.JOB_STATUS_SUCCESS:
     return result
-  elif status in (constants.JOB_STATUS_CANCELING,
-                  constants.JOB_STATUS_CANCELED):
+
+  if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
     raise errors.OpExecError("Job was canceled")
+
+  has_ok = False
+  for idx, (status, msg) in enumerate(zip(opstatus, result)):
+    if status == constants.OP_STATUS_SUCCESS:
+      has_ok = True
+    elif status == constants.OP_STATUS_ERROR:
+      errors.MaybeRaise(msg)
+
+      if has_ok:
+        raise errors.OpExecError("partial failure (opcode %d): %s" %
+                                 (idx, msg))
+
+      raise errors.OpExecError(str(msg))
+
+  # default failure mode
+  raise errors.OpExecError(result)
+
+
+class JobPollCbBase:
+  """Base class for L{GenericPollJob} callbacks.
+
+  """
+  def __init__(self):
+    """Initializes this class.
+
+    """
+
+  def WaitForJobChangeOnce(self, job_id, fields,
+                           prev_job_info, prev_log_serial):
+    """Waits for changes on a job.
+
+    """
+    raise NotImplementedError()
+
+  def QueryJobs(self, job_ids, fields):
+    """Returns the selected fields for the selected job IDs.
+
+    @type job_ids: list of numbers
+    @param job_ids: Job IDs
+    @type fields: list of strings
+    @param fields: Fields
+
+    """
+    raise NotImplementedError()
+
+
+class JobPollReportCbBase:
+  """Base class for L{GenericPollJob} reporting callbacks.
+
+  """
+  def __init__(self):
+    """Initializes this class.
+
+    """
+
+  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+    """Handles a log message.
+
+    """
+    raise NotImplementedError()
+
+  def ReportNotChanged(self, job_id, status):
+    """Called for if a job hasn't changed in a while.
+
+    @type job_id: number
+    @param job_id: Job ID
+    @type status: string or None
+    @param status: Job status if available
+
+    """
+    raise NotImplementedError()
+
+
+class _LuxiJobPollCb(JobPollCbBase):
+  def __init__(self, cl):
+    """Initializes this class.
+
+    """
+    JobPollCbBase.__init__(self)
+    self.cl = cl
+
+  def WaitForJobChangeOnce(self, job_id, fields,
+                           prev_job_info, prev_log_serial):
+    """Waits for changes on a job.
+
+    """
+    return self.cl.WaitForJobChangeOnce(job_id, fields,
+                                        prev_job_info, prev_log_serial)
+
+  def QueryJobs(self, job_ids, fields):
+    """Returns the selected fields for the selected job IDs.
+
+    """
+    return self.cl.QueryJobs(job_ids, fields)
+
+
+class FeedbackFnJobPollReportCb(JobPollReportCbBase):
+  def __init__(self, feedback_fn):
+    """Initializes this class.
+
+    """
+    JobPollReportCbBase.__init__(self)
+
+    self.feedback_fn = feedback_fn
+
+    assert callable(feedback_fn)
+
+  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+    """Handles a log message.
+
+    """
+    self.feedback_fn((timestamp, log_type, log_msg))
+
+  def ReportNotChanged(self, job_id, status):
+    """Called if a job hasn't changed in a while.
+
+    """
+    # Ignore
+
+
+class StdioJobPollReportCb(JobPollReportCbBase):
+  def __init__(self):
+    """Initializes this class.
+
+    """
+    JobPollReportCbBase.__init__(self)
+
+    self.notified_queued = False
+    self.notified_waitlock = False
+
+  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+    """Handles a log message.
+
+    """
+    ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
+             utils.SafeEncode(log_msg))
+
+  def ReportNotChanged(self, job_id, status):
+    """Called if a job hasn't changed in a while.
+
+    """
+    if status is None:
+      return
+
+    if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
+      ToStderr("Job %s is waiting in queue", job_id)
+      self.notified_queued = True
+
+    elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
+      ToStderr("Job %s is trying to acquire all necessary locks", job_id)
+      self.notified_waitlock = True
+
+
+def PollJob(job_id, cl=None, feedback_fn=None):
+  """Function to poll for the result of a job.
+
+  @type job_id: job identified
+  @param job_id: the job to poll for results
+  @type cl: luxi.Client
+  @param cl: the luxi client to use for communicating with the master;
+             if None, a new client will be created
+
+  """
+  if cl is None:
+    cl = GetClient()
+
+  if feedback_fn:
+    reporter = FeedbackFnJobPollReportCb(feedback_fn)
   else:
-    has_ok = False
-    for idx, (status, msg) in enumerate(zip(opstatus, result)):
-      if status == constants.OP_STATUS_SUCCESS:
-        has_ok = True
-      elif status == constants.OP_STATUS_ERROR:
-        errors.MaybeRaise(msg)
-        if has_ok:
-          raise errors.OpExecError("partial failure (opcode %d): %s" %
-                                   (idx, msg))
-        else:
-          raise errors.OpExecError(str(msg))
-    # default failure mode
-    raise errors.OpExecError(result)
+    reporter = StdioJobPollReportCb()
+
+  return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
 
 
 def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):
@@ -2094,11 +2248,18 @@ class JobExecutor(object):
     SetGenericOpcodeOpts(ops, self.opts)
     self.queue.append((name, ops))
 
-  def SubmitPending(self):
+  def SubmitPending(self, each=False):
     """Submit all pending jobs.
 
     """
-    results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
+    if each:
+      results = []
+      for row in self.queue:
+        # SubmitJob will remove the success status, but raise an exception if
+        # the submission fails, so we'll notice that anyway.
+        results.append([True, self.cl.SubmitJob(row[1])])
+    else:
+      results = self.cl.SubmitManyJobs([row[1] for row in self.queue])
     for (idx, ((status, data), (name, _))) in enumerate(zip(results,
                                                             self.queue)):
       self.jobs.append((idx, status, data, name))
@@ -2143,7 +2304,7 @@ class JobExecutor(object):
         ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
 
     # first, remove any non-submitted jobs
-    self.jobs, failures = utils.partition(self.jobs, lambda x: x[1])
+    self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
     for idx, _, jid, name in failures:
       ToStderr("Failed to submit job for %s: %s", name, jid)
       results.append((idx, False, jid))