class _JobFileChangesWaiter(object):
- def __init__(self, filename):
+ def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
"""Initializes this class.
@type filename: string
@raises errors.InotifyError: if the notifier cannot be setup
"""
- self._wm = pyinotify.WatchManager()
+ self._wm = _inotify_wm_cls()
self._inotify_handler = \
asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
self._notifier = \
class _JobChangesWaiter(object):
- def __init__(self, filename):
+ def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
"""Initializes this class.
@type filename: string
"""
self._filewaiter = None
self._filename = filename
+ self._waiter_cls = _waiter_cls
def Wait(self, timeout):
"""Waits for a job to change.
# If this point is reached, return immediately and let caller check the job
# file again in case there were changes since the last check. This avoids a
# race condition.
- self._filewaiter = _JobFileChangesWaiter(self._filename)
+ self._filewaiter = self._waiter_cls(self._filename)
return True
return result
def __call__(self, filename, job_load_fn,
- fields, prev_job_info, prev_log_serial, timeout):
+ fields, prev_job_info, prev_log_serial, timeout,
+ _waiter_cls=_JobChangesWaiter):
"""Waits for changes on a job.
@type filename: string
counter = itertools.count()
try:
check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
- waiter = _JobChangesWaiter(filename)
+ waiter = _waiter_cls(filename)
try:
return utils.Retry(compat.partial(self._CheckForChanges,
counter, job_load_fn, check_fn),
wait_fn=waiter.Wait)
finally:
waiter.Close()
- except (errors.InotifyError, errors.JobLost):
+ except errors.JobLost:
return None
except utils.RetryTimeout:
return constants.JOB_NOTCHANGED
import random
import operator
+try:
+ # pylint: disable=E0611
+ from pyinotify import pyinotify
+except ImportError:
+ import pyinotify
+
from ganeti import constants
from ganeti import utils
from ganeti import errors
self._EnsureNotifierClosed(waiter._filewaiter._notifier)
+class _FailingWatchManager(pyinotify.WatchManager):
+ """Subclass of L{pyinotify.WatchManager} which always fails to register.
+
+ """
+ def add_watch(self, filename, mask):
+ assert mask == (pyinotify.EventsCodes.ALL_FLAGS["IN_MODIFY"] |
+ pyinotify.EventsCodes.ALL_FLAGS["IN_IGNORED"])
+
+ return {
+ filename: -1,
+ }
+
+
class TestWaitForJobChangesHelper(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.assert_(wfjc(self.filename, self._LoadLostJob,
["status"], None, None, 1.0) is None)
+ def testNonExistentFile(self):
+ wfjc = jqueue._WaitForJobChangesHelper()
+
+ filename = utils.PathJoin(self.tmpdir, "does-not-exist")
+ self.assertFalse(os.path.exists(filename))
+
+ result = wfjc(filename, self._LoadLostJob, ["status"], None, None, 1.0,
+ _waiter_cls=compat.partial(jqueue._JobChangesWaiter,
+ _waiter_cls=NotImplemented))
+ self.assertTrue(result is None)
+
+ def testInotifyError(self):
+ jobfile_waiter_cls = \
+ compat.partial(jqueue._JobFileChangesWaiter,
+ _inotify_wm_cls=_FailingWatchManager)
+
+ jobchange_waiter_cls = \
+ compat.partial(jqueue._JobChangesWaiter, _waiter_cls=jobfile_waiter_cls)
+
+ wfjc = jqueue._WaitForJobChangesHelper()
+
+ # Test if failing to watch a job file (e.g. due to
+ # fs.inotify.max_user_watches being too low) raises errors.InotifyError
+ self.assertRaises(errors.InotifyError, wfjc,
+ self.filename, self._LoadWaitingJob,
+ ["status"], [constants.JOB_STATUS_WAITING], None, 1.0,
+ _waiter_cls=jobchange_waiter_cls)
+
class TestEncodeOpError(unittest.TestCase):
def test(self):