Add a forgotten comment about overriding a method
[ganeti-local] / lib / cli.py
index 8de292c..f598320 100644 (file)
@@ -1223,41 +1223,31 @@ def SendJob(ops, cl=None):
   return job_id
 
 
-def PollJob(job_id, cl=None, feedback_fn=None):
-  """Function to poll for the result of a job.
+def GenericPollJob(job_id, cbs, report_cbs):
+  """Generic job-polling function.
 
-  @type job_id: job identified
-  @param job_id: the job to poll for results
-  @type cl: luxi.Client
-  @param cl: the luxi client to use for communicating with the master;
-             if None, a new client will be created
+  @type job_id: number
+  @param job_id: Job ID
+  @type cbs: Instance of L{JobPollCbBase}
+  @param cbs: Data callbacks
+  @type report_cbs: Instance of L{JobPollReportCbBase}
+  @param report_cbs: Reporting callbacks
 
   """
-  if cl is None:
-    cl = GetClient()
-
   prev_job_info = None
   prev_logmsg_serial = None
 
   status = None
 
-  notified_queued = False
-  notified_waitlock = False
-
   while True:
-    result = cl.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
-                                     prev_logmsg_serial)
+    result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
+                                      prev_logmsg_serial)
     if not result:
       # job not found, go away!
       raise errors.JobLost("Job with id %s lost" % job_id)
-    elif result == constants.JOB_NOTCHANGED:
-      if status is not None and not callable(feedback_fn):
-        if status == constants.JOB_STATUS_QUEUED and not notified_queued:
-          ToStderr("Job %s is waiting in queue", job_id)
-          notified_queued = True
-        elif status == constants.JOB_STATUS_WAITLOCK and not notified_waitlock:
-          ToStderr("Job %s is trying to acquire all necessary locks", job_id)
-          notified_waitlock = True
+
+    if result == constants.JOB_NOTCHANGED:
+      report_cbs.ReportNotChanged(job_id, status)
 
       # Wait again
       continue
@@ -1268,12 +1258,9 @@ def PollJob(job_id, cl=None, feedback_fn=None):
 
     if log_entries:
       for log_entry in log_entries:
-        (serial, timestamp, _, message) = log_entry
-        if callable(feedback_fn):
-          feedback_fn(log_entry[1:])
-        else:
-          encoded = utils.SafeEncode(message)
-          ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)), encoded)
+        (serial, timestamp, log_type, message) = log_entry
+        report_cbs.ReportLogMessage(job_id, serial, timestamp,
+                                    log_type, message)
         prev_logmsg_serial = max(prev_logmsg_serial, serial)
 
     # TODO: Handle canceled and archived jobs
@@ -1285,30 +1272,189 @@ def PollJob(job_id, cl=None, feedback_fn=None):
 
     prev_job_info = job_info
 
-  jobs = cl.QueryJobs([job_id], ["status", "opstatus", "opresult"])
+  jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
   if not jobs:
     raise errors.JobLost("Job with id %s lost" % job_id)
 
   status, opstatus, result = jobs[0]
+
   if status == constants.JOB_STATUS_SUCCESS:
     return result
-  elif status in (constants.JOB_STATUS_CANCELING,
-                  constants.JOB_STATUS_CANCELED):
+
+  if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
     raise errors.OpExecError("Job was canceled")
+
+  has_ok = False
+  for idx, (status, msg) in enumerate(zip(opstatus, result)):
+    if status == constants.OP_STATUS_SUCCESS:
+      has_ok = True
+    elif status == constants.OP_STATUS_ERROR:
+      errors.MaybeRaise(msg)
+
+      if has_ok:
+        raise errors.OpExecError("partial failure (opcode %d): %s" %
+                                 (idx, msg))
+
+      raise errors.OpExecError(str(msg))
+
+  # default failure mode
+  raise errors.OpExecError(result)
+
+
+class JobPollCbBase:
+  """Base class for L{GenericPollJob} callbacks.
+
+  """
+  def __init__(self):
+    """Initializes this class.
+
+    """
+
+  def WaitForJobChangeOnce(self, job_id, fields,
+                           prev_job_info, prev_log_serial):
+    """Waits for changes on a job.
+
+    """
+    raise NotImplementedError()
+
+  def QueryJobs(self, job_ids, fields):
+    """Returns the selected fields for the selected job IDs.
+
+    @type job_ids: list of numbers
+    @param job_ids: Job IDs
+    @type fields: list of strings
+    @param fields: Fields
+
+    """
+    raise NotImplementedError()
+
+
+class JobPollReportCbBase:
+  """Base class for L{GenericPollJob} reporting callbacks.
+
+  """
+  def __init__(self):
+    """Initializes this class.
+
+    """
+
+  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+    """Handles a log message.
+
+    """
+    raise NotImplementedError()
+
+  def ReportNotChanged(self, job_id, status):
+    """Called for if a job hasn't changed in a while.
+
+    @type job_id: number
+    @param job_id: Job ID
+    @type status: string or None
+    @param status: Job status if available
+
+    """
+    raise NotImplementedError()
+
+
+class _LuxiJobPollCb(JobPollCbBase):
+  def __init__(self, cl):
+    """Initializes this class.
+
+    """
+    JobPollCbBase.__init__(self)
+    self.cl = cl
+
+  def WaitForJobChangeOnce(self, job_id, fields,
+                           prev_job_info, prev_log_serial):
+    """Waits for changes on a job.
+
+    """
+    return self.cl.WaitForJobChangeOnce(job_id, fields,
+                                        prev_job_info, prev_log_serial)
+
+  def QueryJobs(self, job_ids, fields):
+    """Returns the selected fields for the selected job IDs.
+
+    """
+    return self.cl.QueryJobs(job_ids, fields)
+
+
+class FeedbackFnJobPollReportCb(JobPollReportCbBase):
+  def __init__(self, feedback_fn):
+    """Initializes this class.
+
+    """
+    JobPollReportCbBase.__init__(self)
+
+    self.feedback_fn = feedback_fn
+
+    assert callable(feedback_fn)
+
+  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+    """Handles a log message.
+
+    """
+    self.feedback_fn((timestamp, log_type, log_msg))
+
+  def ReportNotChanged(self, job_id, status):
+    """Called if a job hasn't changed in a while.
+
+    """
+    # Ignore
+
+
+class StdioJobPollReportCb(JobPollReportCbBase):
+  def __init__(self):
+    """Initializes this class.
+
+    """
+    JobPollReportCbBase.__init__(self)
+
+    self.notified_queued = False
+    self.notified_waitlock = False
+
+  def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
+    """Handles a log message.
+
+    """
+    ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
+             utils.SafeEncode(log_msg))
+
+  def ReportNotChanged(self, job_id, status):
+    """Called if a job hasn't changed in a while.
+
+    """
+    if status is None:
+      return
+
+    if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
+      ToStderr("Job %s is waiting in queue", job_id)
+      self.notified_queued = True
+
+    elif status == constants.JOB_STATUS_WAITLOCK and not self.notified_waitlock:
+      ToStderr("Job %s is trying to acquire all necessary locks", job_id)
+      self.notified_waitlock = True
+
+
+def PollJob(job_id, cl=None, feedback_fn=None):
+  """Function to poll for the result of a job.
+
+  @type job_id: job identified
+  @param job_id: the job to poll for results
+  @type cl: luxi.Client
+  @param cl: the luxi client to use for communicating with the master;
+             if None, a new client will be created
+
+  """
+  if cl is None:
+    cl = GetClient()
+
+  if feedback_fn:
+    reporter = FeedbackFnJobPollReportCb(feedback_fn)
   else:
-    has_ok = False
-    for idx, (status, msg) in enumerate(zip(opstatus, result)):
-      if status == constants.OP_STATUS_SUCCESS:
-        has_ok = True
-      elif status == constants.OP_STATUS_ERROR:
-        errors.MaybeRaise(msg)
-        if has_ok:
-          raise errors.OpExecError("partial failure (opcode %d): %s" %
-                                   (idx, msg))
-        else:
-          raise errors.OpExecError(str(msg))
-    # default failure mode
-    raise errors.OpExecError(result)
+    reporter = StdioJobPollReportCb()
+
+  return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
 
 
 def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None):