import time
import weakref
+try:
+ # pylint: disable-msg=E0611
+ from pyinotify import pyinotify
+except ImportError:
+ import pyinotify
+
+from ganeti import asyncnotifier
from ganeti import constants
from ganeti import serializer
from ganeti import workerpool
self._job.lock_status = msg
+class _WaitForJobChangesHelper(object):
+ """Helper class using initofy to wait for changes in a job file.
+
+ This class takes a previous job status and serial, and alerts the client when
+ the current job status has changed.
+
+ @type job_id: string
+ @ivar job_id: id of the job we're watching
+ @type prev_job_info: string
+ @ivar prev_job_info: previous job info, as passed by the luxi client
+ @type prev_log_serial: string
+ @ivar prev_log_serial: previous job serial, as passed by the luxi client
+ @type queue: L{JobQueue}
+ @ivar queue: job queue (used for a few utility functions)
+ @type job_path: string
+ @ivar job_path: absolute path of the job file
+ @type wm: pyinotify.WatchManager (or None)
+ @ivar wm: inotify watch manager to watch for changes
+ @type inotify_handler: L{asyncnotifier.SingleFileEventHandler}
+ @ivar inotify_handler: single file event handler, used for watching
+ @type notifier: pyinotify.Notifier
+ @ivar notifier: inotify single-threaded notifier, used for watching
+
+ """
+
+ def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue):
+ self.job_id = job_id
+ self.fields = fields
+ self.prev_job_info = prev_job_info
+ self.prev_log_serial = prev_log_serial
+ self.queue = queue
+ # pylint: disable-msg=W0212
+ self.job_path = self.queue._GetJobPath(self.job_id)
+ self.wm = None
+ self.inotify_handler = None
+ self.notifier = None
+
+ def _SetupInotify(self):
+ """Create the inotify
+
+ @raises errors.InotifyError: if the notifier cannot be setup
+
+ """
+ if self.wm:
+ return
+ self.wm = pyinotify.WatchManager()
+ self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm,
+ self.OnInotify,
+ self.job_path)
+ self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler)
+ self.inotify_handler.enable()
+
+ def _LoadDiskStatus(self):
+ job = self.queue.SafeLoadJobFromDisk(self.job_id)
+ if not job:
+ raise errors.JobLost()
+ self.job_status = job.CalcStatus()
+
+ job_info = job.GetInfo(self.fields)
+ log_entries = job.GetLogEntries(self.prev_log_serial)
+ # Serializing and deserializing data can cause type changes (e.g. from
+ # tuple to list) or precision loss. We're doing it here so that we get
+ # the same modifications as the data received from the client. Without
+ # this, the comparison afterwards might fail without the data being
+ # significantly different.
+ # TODO: we just deserialized from disk, investigate how to make sure that
+ # the job info and log entries are compatible to avoid this further step.
+ self.job_info = serializer.LoadJson(serializer.DumpJson(job_info))
+ self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
+
+ def _CheckForChanges(self):
+ self._LoadDiskStatus()
+ # Don't even try to wait if the job is no longer running, there will be
+ # no changes.
+ if (self.job_status not in (constants.JOB_STATUS_QUEUED,
+ constants.JOB_STATUS_RUNNING,
+ constants.JOB_STATUS_WAITLOCK) or
+ self.prev_job_info != self.job_info or
+ (self.log_entries and self.prev_log_serial != self.log_entries[0][0])):
+ logging.debug("Job %s changed", self.job_id)
+ return (self.job_info, self.log_entries)
+
+ raise utils.RetryAgain()
+
+ def OnInotify(self, notifier_enabled):
+ if not notifier_enabled:
+ self.inotify_handler.enable()
+
+ def WaitFn(self, timeout):
+ self._SetupInotify()
+ if self.notifier.check_events(timeout*1000):
+ self.notifier.read_events()
+ self.notifier.process_events()
+
+ def WaitForChanges(self, timeout):
+ try:
+ return utils.Retry(self._CheckForChanges,
+ utils.RETRY_REMAINING_TIME,
+ timeout,
+ wait_fn=self.WaitFn)
+ except (errors.InotifyError, errors.JobLost):
+ return None
+ except utils.RetryTimeout:
+ return constants.JOB_NOTCHANGED
+
+ def Close(self):
+ if self.wm:
+ self.notifier.stop()
+
+
class _JobQueueWorker(workerpool.BaseWorker):
"""The actual job workers.
# Notify waiters about potential changes
job.change.notifyAll()
- @utils.LockedMethod
- @_RequireOpenQueue
def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
timeout):
"""Waits for changes in a job.
as such by the clients
"""
- job = self._LoadJobUnlocked(job_id)
- if not job:
- logging.debug("Job %s not found", job_id)
- return None
-
- def _CheckForChanges():
- logging.debug("Waiting for changes in job %s", job_id)
-
- status = job.CalcStatus()
- job_info = job.GetInfo(fields)
- log_entries = job.GetLogEntries(prev_log_serial)
-
- # Serializing and deserializing data can cause type changes (e.g. from
- # tuple to list) or precision loss. We're doing it here so that we get
- # the same modifications as the data received from the client. Without
- # this, the comparison afterwards might fail without the data being
- # significantly different.
- job_info = serializer.LoadJson(serializer.DumpJson(job_info))
- log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
-
- # Don't even try to wait if the job is no longer running, there will be
- # no changes.
- if (status not in (constants.JOB_STATUS_QUEUED,
- constants.JOB_STATUS_RUNNING,
- constants.JOB_STATUS_WAITLOCK) or
- prev_job_info != job_info or
- (log_entries and prev_log_serial != log_entries[0][0])):
- logging.debug("Job %s changed", job_id)
- return (job_info, log_entries)
-
- raise utils.RetryAgain()
-
+ helper = _WaitForJobChangesHelper(job_id, fields, prev_job_info,
+ prev_log_serial, self)
try:
- # Setting wait function to release the queue lock while waiting
- return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
- wait_fn=job.change.wait)
- except utils.RetryTimeout:
- return constants.JOB_NOTCHANGED
+ return helper.WaitForChanges(timeout)
+ finally:
+ helper.Close()
@utils.LockedMethod
@_RequireOpenQueue