Revision 383477e9

b/lib/jqueue.py
689 689

  
690 690

  
691 691
class _JobFileChangesWaiter(object):
692
  def __init__(self, filename):
692
  def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager):
693 693
    """Initializes this class.
694 694

  
695 695
    @type filename: string
......
697 697
    @raises errors.InotifyError: if the notifier cannot be setup
698 698

  
699 699
    """
700
    self._wm = pyinotify.WatchManager()
700
    self._wm = _inotify_wm_cls()
701 701
    self._inotify_handler = \
702 702
      asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
703 703
    self._notifier = \
......
739 739

  
740 740

  
741 741
class _JobChangesWaiter(object):
742
  def __init__(self, filename):
742
  def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter):
743 743
    """Initializes this class.
744 744

  
745 745
    @type filename: string
......
748 748
    """
749 749
    self._filewaiter = None
750 750
    self._filename = filename
751
    self._waiter_cls = _waiter_cls
751 752

  
752 753
  def Wait(self, timeout):
753 754
    """Waits for a job to change.
......
764 765
    # If this point is reached, return immediately and let caller check the job
765 766
    # file again in case there were changes since the last check. This avoids a
766 767
    # race condition.
767
    self._filewaiter = _JobFileChangesWaiter(self._filename)
768
    self._filewaiter = self._waiter_cls(self._filename)
768 769

  
769 770
    return True
770 771

  
......
802 803
    return result
803 804

  
804 805
  def __call__(self, filename, job_load_fn,
805
               fields, prev_job_info, prev_log_serial, timeout):
806
               fields, prev_job_info, prev_log_serial, timeout,
807
               _waiter_cls=_JobChangesWaiter):
806 808
    """Waits for changes on a job.
807 809

  
808 810
    @type filename: string
......
822 824
    counter = itertools.count()
823 825
    try:
824 826
      check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
825
      waiter = _JobChangesWaiter(filename)
827
      waiter = _waiter_cls(filename)
826 828
      try:
827 829
        return utils.Retry(compat.partial(self._CheckForChanges,
828 830
                                          counter, job_load_fn, check_fn),
......
830 832
                           wait_fn=waiter.Wait)
831 833
      finally:
832 834
        waiter.Close()
833
    except (errors.InotifyError, errors.JobLost):
835
    except errors.JobLost:
834 836
      return None
835 837
    except utils.RetryTimeout:
836 838
      return constants.JOB_NOTCHANGED
b/test/ganeti.jqueue_unittest.py
31 31
import random
32 32
import operator
33 33

  
34
try:
35
  # pylint: disable=E0611
36
  from pyinotify import pyinotify
37
except ImportError:
38
  import pyinotify
39

  
34 40
from ganeti import constants
35 41
from ganeti import utils
36 42
from ganeti import errors
......
195 201
    self._EnsureNotifierClosed(waiter._filewaiter._notifier)
196 202

  
197 203

  
204
class _FailingWatchManager(pyinotify.WatchManager):
205
  """Subclass of L{pyinotify.WatchManager} which always fails to register.
206

  
207
  """
208
  def add_watch(self, filename, mask):
209
    assert mask == (pyinotify.EventsCodes.ALL_FLAGS["IN_MODIFY"] |
210
                    pyinotify.EventsCodes.ALL_FLAGS["IN_IGNORED"])
211

  
212
    return {
213
      filename: -1,
214
      }
215

  
216

  
198 217
class TestWaitForJobChangesHelper(unittest.TestCase):
199 218
  def setUp(self):
200 219
    self.tmpdir = tempfile.mkdtemp()
......
228 247
    self.assert_(wfjc(self.filename, self._LoadLostJob,
229 248
                      ["status"], None, None, 1.0) is None)
230 249

  
250
  def testNonExistentFile(self):
251
    wfjc = jqueue._WaitForJobChangesHelper()
252

  
253
    filename = utils.PathJoin(self.tmpdir, "does-not-exist")
254
    self.assertFalse(os.path.exists(filename))
255

  
256
    result = wfjc(filename, self._LoadLostJob, ["status"], None, None, 1.0,
257
                  _waiter_cls=compat.partial(jqueue._JobChangesWaiter,
258
                                             _waiter_cls=NotImplemented))
259
    self.assertTrue(result is None)
260

  
261
  def testInotifyError(self):
262
    jobfile_waiter_cls = \
263
      compat.partial(jqueue._JobFileChangesWaiter,
264
                     _inotify_wm_cls=_FailingWatchManager)
265

  
266
    jobchange_waiter_cls = \
267
      compat.partial(jqueue._JobChangesWaiter, _waiter_cls=jobfile_waiter_cls)
268

  
269
    wfjc = jqueue._WaitForJobChangesHelper()
270

  
271
    # Test if failing to watch a job file (e.g. due to
272
    # fs.inotify.max_user_watches being too low) raises errors.InotifyError
273
    self.assertRaises(errors.InotifyError, wfjc,
274
                      self.filename, self._LoadWaitingJob,
275
                      ["status"], [constants.JOB_STATUS_WAITING], None, 1.0,
276
                      _waiter_cls=jobchange_waiter_cls)
277

  
231 278

  
232 279
class TestEncodeOpError(unittest.TestCase):
233 280
  def test(self):

Also available in: Unified diff