Revision 383477e9
b/lib/jqueue.py | ||
---|---|---|
689 | 689 |
|
690 | 690 |
|
691 | 691 |
class _JobFileChangesWaiter(object): |
692 |
def __init__(self, filename): |
|
692 |
def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
|
|
693 | 693 |
"""Initializes this class. |
694 | 694 |
|
695 | 695 |
@type filename: string |
... | ... | |
697 | 697 |
@raises errors.InotifyError: if the notifier cannot be setup |
698 | 698 |
|
699 | 699 |
""" |
700 |
self._wm = pyinotify.WatchManager()
|
|
700 |
self._wm = _inotify_wm_cls()
|
|
701 | 701 |
self._inotify_handler = \ |
702 | 702 |
asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename) |
703 | 703 |
self._notifier = \ |
... | ... | |
739 | 739 |
|
740 | 740 |
|
741 | 741 |
class _JobChangesWaiter(object): |
742 |
def __init__(self, filename): |
|
742 |
def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
|
|
743 | 743 |
"""Initializes this class. |
744 | 744 |
|
745 | 745 |
@type filename: string |
... | ... | |
748 | 748 |
""" |
749 | 749 |
self._filewaiter = None |
750 | 750 |
self._filename = filename |
751 |
self._waiter_cls = _waiter_cls |
|
751 | 752 |
|
752 | 753 |
def Wait(self, timeout): |
753 | 754 |
"""Waits for a job to change. |
... | ... | |
764 | 765 |
# If this point is reached, return immediately and let caller check the job |
765 | 766 |
# file again in case there were changes since the last check. This avoids a |
766 | 767 |
# race condition. |
767 |
self._filewaiter = _JobFileChangesWaiter(self._filename)
|
|
768 |
self._filewaiter = self._waiter_cls(self._filename)
|
|
768 | 769 |
|
769 | 770 |
return True |
770 | 771 |
|
... | ... | |
802 | 803 |
return result |
803 | 804 |
|
804 | 805 |
def __call__(self, filename, job_load_fn, |
805 |
fields, prev_job_info, prev_log_serial, timeout): |
|
806 |
fields, prev_job_info, prev_log_serial, timeout, |
|
807 |
_waiter_cls=_JobChangesWaiter): |
|
806 | 808 |
"""Waits for changes on a job. |
807 | 809 |
|
808 | 810 |
@type filename: string |
... | ... | |
822 | 824 |
counter = itertools.count() |
823 | 825 |
try: |
824 | 826 |
check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial) |
825 |
waiter = _JobChangesWaiter(filename)
|
|
827 |
waiter = _waiter_cls(filename)
|
|
826 | 828 |
try: |
827 | 829 |
return utils.Retry(compat.partial(self._CheckForChanges, |
828 | 830 |
counter, job_load_fn, check_fn), |
... | ... | |
830 | 832 |
wait_fn=waiter.Wait) |
831 | 833 |
finally: |
832 | 834 |
waiter.Close() |
833 |
except (errors.InotifyError, errors.JobLost):
|
|
835 |
except errors.JobLost:
|
|
834 | 836 |
return None |
835 | 837 |
except utils.RetryTimeout: |
836 | 838 |
return constants.JOB_NOTCHANGED |
b/test/ganeti.jqueue_unittest.py | ||
---|---|---|
31 | 31 |
import random |
32 | 32 |
import operator |
33 | 33 |
|
34 |
try: |
|
35 |
# pylint: disable=E0611 |
|
36 |
from pyinotify import pyinotify |
|
37 |
except ImportError: |
|
38 |
import pyinotify |
|
39 |
|
|
34 | 40 |
from ganeti import constants |
35 | 41 |
from ganeti import utils |
36 | 42 |
from ganeti import errors |
... | ... | |
195 | 201 |
self._EnsureNotifierClosed(waiter._filewaiter._notifier) |
196 | 202 |
|
197 | 203 |
|
204 |
class _FailingWatchManager(pyinotify.WatchManager): |
|
205 |
"""Subclass of L{pyinotify.WatchManager} which always fails to register. |
|
206 |
|
|
207 |
""" |
|
208 |
def add_watch(self, filename, mask): |
|
209 |
assert mask == (pyinotify.EventsCodes.ALL_FLAGS["IN_MODIFY"] | |
|
210 |
pyinotify.EventsCodes.ALL_FLAGS["IN_IGNORED"]) |
|
211 |
|
|
212 |
return { |
|
213 |
filename: -1, |
|
214 |
} |
|
215 |
|
|
216 |
|
|
198 | 217 |
class TestWaitForJobChangesHelper(unittest.TestCase): |
199 | 218 |
def setUp(self): |
200 | 219 |
self.tmpdir = tempfile.mkdtemp() |
... | ... | |
228 | 247 |
self.assert_(wfjc(self.filename, self._LoadLostJob, |
229 | 248 |
["status"], None, None, 1.0) is None) |
230 | 249 |
|
250 |
def testNonExistentFile(self): |
|
251 |
wfjc = jqueue._WaitForJobChangesHelper() |
|
252 |
|
|
253 |
filename = utils.PathJoin(self.tmpdir, "does-not-exist") |
|
254 |
self.assertFalse(os.path.exists(filename)) |
|
255 |
|
|
256 |
result = wfjc(filename, self._LoadLostJob, ["status"], None, None, 1.0, |
|
257 |
_waiter_cls=compat.partial(jqueue._JobChangesWaiter, |
|
258 |
_waiter_cls=NotImplemented)) |
|
259 |
self.assertTrue(result is None) |
|
260 |
|
|
261 |
def testInotifyError(self): |
|
262 |
jobfile_waiter_cls = \ |
|
263 |
compat.partial(jqueue._JobFileChangesWaiter, |
|
264 |
_inotify_wm_cls=_FailingWatchManager) |
|
265 |
|
|
266 |
jobchange_waiter_cls = \ |
|
267 |
compat.partial(jqueue._JobChangesWaiter, _waiter_cls=jobfile_waiter_cls) |
|
268 |
|
|
269 |
wfjc = jqueue._WaitForJobChangesHelper() |
|
270 |
|
|
271 |
# Test if failing to watch a job file (e.g. due to |
|
272 |
# fs.inotify.max_user_watches being too low) raises errors.InotifyError |
|
273 |
self.assertRaises(errors.InotifyError, wfjc, |
|
274 |
self.filename, self._LoadWaitingJob, |
|
275 |
["status"], [constants.JOB_STATUS_WAITING], None, 1.0, |
|
276 |
_waiter_cls=jobchange_waiter_cls) |
|
277 |
|
|
231 | 278 |
|
232 | 279 |
class TestEncodeOpError(unittest.TestCase): |
233 | 280 |
def test(self): |
Also available in: Unified diff