Parallelize WaitForJobChanges
authorGuido Trotter <ultrotter@google.com>
Tue, 15 Jun 2010 16:39:30 +0000 (17:39 +0100)
committerGuido Trotter <ultrotter@google.com>
Wed, 23 Jun 2010 10:32:46 +0000 (12:32 +0200)
As for QueryJobs we rely on file updates rather than condition
notification to acquire job changes. In order to do that we use the
pyinotify module to watch files. This might make the client a bit slower
(pending planned improvements, such as subscription-based
WaitForJobChanges) but detaches it from the job execution.

Signed-off-by: Guido Trotter <ultrotter@google.com>
Reviewed-by: Iustin Pop <iustin@google.com>

lib/jqueue.py

index 88e5a4a..f63f5e9 100644 (file)
@@ -37,6 +37,13 @@ import re
 import time
 import weakref
 
+try:
+  # pylint: disable-msg=E0611
+  from pyinotify import pyinotify
+except ImportError:
+  import pyinotify
+
+from ganeti import asyncnotifier
 from ganeti import constants
 from ganeti import serializer
 from ganeti import workerpool
@@ -473,6 +480,116 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     self._job.lock_status = msg
 
 
+class _WaitForJobChangesHelper(object):
+  """Helper class using initofy to wait for changes in a job file.
+
+  This class takes a previous job status and serial, and alerts the client when
+  the current job status has changed.
+
+  @type job_id: string
+  @ivar job_id: id of the job we're watching
+  @type prev_job_info: string
+  @ivar prev_job_info: previous job info, as passed by the luxi client
+  @type prev_log_serial: string
+  @ivar prev_log_serial: previous job serial, as passed by the luxi client
+  @type queue: L{JobQueue}
+  @ivar queue: job queue (used for a few utility functions)
+  @type job_path: string
+  @ivar job_path: absolute path of the job file
+  @type wm: pyinotify.WatchManager (or None)
+  @ivar wm: inotify watch manager to watch for changes
+  @type inotify_handler: L{asyncnotifier.SingleFileEventHandler}
+  @ivar inotify_handler: single file event handler, used for watching
+  @type notifier: pyinotify.Notifier
+  @ivar notifier: inotify single-threaded notifier, used for watching
+
+  """
+
+  def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue):
+    self.job_id = job_id
+    self.fields = fields
+    self.prev_job_info = prev_job_info
+    self.prev_log_serial = prev_log_serial
+    self.queue = queue
+    # pylint: disable-msg=W0212
+    self.job_path = self.queue._GetJobPath(self.job_id)
+    self.wm = None
+    self.inotify_handler = None
+    self.notifier = None
+
+  def _SetupInotify(self):
+    """Create the inotify
+
+    @raises errors.InotifyError: if the notifier cannot be setup
+
+    """
+    if self.wm:
+      return
+    self.wm = pyinotify.WatchManager()
+    self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm,
+                                                                self.OnInotify,
+                                                                self.job_path)
+    self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler)
+    self.inotify_handler.enable()
+
+  def _LoadDiskStatus(self):
+    job = self.queue.SafeLoadJobFromDisk(self.job_id)
+    if not job:
+      raise errors.JobLost()
+    self.job_status = job.CalcStatus()
+
+    job_info = job.GetInfo(self.fields)
+    log_entries = job.GetLogEntries(self.prev_log_serial)
+    # Serializing and deserializing data can cause type changes (e.g. from
+    # tuple to list) or precision loss. We're doing it here so that we get
+    # the same modifications as the data received from the client. Without
+    # this, the comparison afterwards might fail without the data being
+    # significantly different.
+    # TODO: we just deserialized from disk, investigate how to make sure that
+    # the job info and log entries are compatible to avoid this further step.
+    self.job_info = serializer.LoadJson(serializer.DumpJson(job_info))
+    self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
+
+  def _CheckForChanges(self):
+    self._LoadDiskStatus()
+    # Don't even try to wait if the job is no longer running, there will be
+    # no changes.
+    if (self.job_status not in (constants.JOB_STATUS_QUEUED,
+                                constants.JOB_STATUS_RUNNING,
+                                constants.JOB_STATUS_WAITLOCK) or
+        self.prev_job_info != self.job_info or
+        (self.log_entries and self.prev_log_serial != self.log_entries[0][0])):
+      logging.debug("Job %s changed", self.job_id)
+      return (self.job_info, self.log_entries)
+
+    raise utils.RetryAgain()
+
+  def OnInotify(self, notifier_enabled):
+    if not notifier_enabled:
+      self.inotify_handler.enable()
+
+  def WaitFn(self, timeout):
+    self._SetupInotify()
+    if self.notifier.check_events(timeout*1000):
+      self.notifier.read_events()
+    self.notifier.process_events()
+
+  def WaitForChanges(self, timeout):
+    try:
+      return utils.Retry(self._CheckForChanges,
+                         utils.RETRY_REMAINING_TIME,
+                         timeout,
+                         wait_fn=self.WaitFn)
+    except (errors.InotifyError, errors.JobLost):
+      return None
+    except utils.RetryTimeout:
+      return constants.JOB_NOTCHANGED
+
+  def Close(self):
+    if self.wm:
+      self.notifier.stop()
+
+
 class _JobQueueWorker(workerpool.BaseWorker):
   """The actual job workers.
 
@@ -1183,8 +1300,6 @@ class JobQueue(object):
     # Notify waiters about potential changes
     job.change.notifyAll()
 
-  @utils.LockedMethod
-  @_RequireOpenQueue
   def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
                         timeout):
     """Waits for changes in a job.
@@ -1209,44 +1324,12 @@ class JobQueue(object):
         as such by the clients
 
     """
-    job = self._LoadJobUnlocked(job_id)
-    if not job:
-      logging.debug("Job %s not found", job_id)
-      return None
-
-    def _CheckForChanges():
-      logging.debug("Waiting for changes in job %s", job_id)
-
-      status = job.CalcStatus()
-      job_info = job.GetInfo(fields)
-      log_entries = job.GetLogEntries(prev_log_serial)
-
-      # Serializing and deserializing data can cause type changes (e.g. from
-      # tuple to list) or precision loss. We're doing it here so that we get
-      # the same modifications as the data received from the client. Without
-      # this, the comparison afterwards might fail without the data being
-      # significantly different.
-      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
-      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
-
-      # Don't even try to wait if the job is no longer running, there will be
-      # no changes.
-      if (status not in (constants.JOB_STATUS_QUEUED,
-                         constants.JOB_STATUS_RUNNING,
-                         constants.JOB_STATUS_WAITLOCK) or
-          prev_job_info != job_info or
-          (log_entries and prev_log_serial != log_entries[0][0])):
-        logging.debug("Job %s changed", job_id)
-        return (job_info, log_entries)
-
-      raise utils.RetryAgain()
-
+    helper = _WaitForJobChangesHelper(job_id, fields, prev_job_info,
+                                      prev_log_serial, self)
     try:
-      # Setting wait function to release the queue lock while waiting
-      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
-                         wait_fn=job.change.wait)
-    except utils.RetryTimeout:
-      return constants.JOB_NOTCHANGED
+      return helper.WaitForChanges(timeout)
+    finally:
+      helper.Close()
 
   @utils.LockedMethod
   @_RequireOpenQueue