diff --git a/lib/jqueue.py b/lib/jqueue.py index b855b77b2b8abb8c72cba1d020fecdc2102b9814..9752f93064f47178c745756696607718df40af35 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -689,7 +689,7 @@ class _JobChangesChecker(object): class _JobFileChangesWaiter(object): - def __init__(self, filename): + def __init__(self, filename, _inotify_wm_cls=pyinotify.WatchManager): """Initializes this class. @type filename: string @@ -697,7 +697,7 @@ class _JobFileChangesWaiter(object): @raises errors.InotifyError: if the notifier cannot be setup """ - self._wm = pyinotify.WatchManager() + self._wm = _inotify_wm_cls() self._inotify_handler = \ asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename) self._notifier = \ @@ -739,7 +739,7 @@ class _JobFileChangesWaiter(object): class _JobChangesWaiter(object): - def __init__(self, filename): + def __init__(self, filename, _waiter_cls=_JobFileChangesWaiter): """Initializes this class. @type filename: string @@ -748,6 +748,7 @@ class _JobChangesWaiter(object): """ self._filewaiter = None self._filename = filename + self._waiter_cls = _waiter_cls def Wait(self, timeout): """Waits for a job to change. @@ -764,7 +765,7 @@ class _JobChangesWaiter(object): # If this point is reached, return immediately and let caller check the job # file again in case there were changes since the last check. This avoids a # race condition. - self._filewaiter = _JobFileChangesWaiter(self._filename) + self._filewaiter = self._waiter_cls(self._filename) return True @@ -802,7 +803,8 @@ class _WaitForJobChangesHelper(object): return result def __call__(self, filename, job_load_fn, - fields, prev_job_info, prev_log_serial, timeout): + fields, prev_job_info, prev_log_serial, timeout, + _waiter_cls=_JobChangesWaiter): """Waits for changes on a job. @type filename: string @@ -822,7 +824,7 @@ class _WaitForJobChangesHelper(object): counter = itertools.count() try: check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial) - waiter = _JobChangesWaiter(filename) + waiter = _waiter_cls(filename) try: return utils.Retry(compat.partial(self._CheckForChanges, counter, job_load_fn, check_fn), @@ -830,7 +832,7 @@ class _WaitForJobChangesHelper(object): wait_fn=waiter.Wait) finally: waiter.Close() - except (errors.InotifyError, errors.JobLost): + except errors.JobLost: return None except utils.RetryTimeout: return constants.JOB_NOTCHANGED diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py index 7b7567e54d46dbc1a17b0392e607fc005f7ef739..4f0b9646529d0f6324efbd00f23471caa8b5b850 100755 --- a/test/ganeti.jqueue_unittest.py +++ b/test/ganeti.jqueue_unittest.py @@ -31,6 +31,12 @@ import itertools import random import operator +try: + # pylint: disable=E0611 + from pyinotify import pyinotify +except ImportError: + import pyinotify + from ganeti import constants from ganeti import utils from ganeti import errors @@ -195,6 +201,19 @@ class TestJobChangesWaiter(unittest.TestCase): self._EnsureNotifierClosed(waiter._filewaiter._notifier) +class _FailingWatchManager(pyinotify.WatchManager): + """Subclass of L{pyinotify.WatchManager} which always fails to register. + + """ + def add_watch(self, filename, mask): + assert mask == (pyinotify.EventsCodes.ALL_FLAGS["IN_MODIFY"] | + pyinotify.EventsCodes.ALL_FLAGS["IN_IGNORED"]) + + return { + filename: -1, + } + + class TestWaitForJobChangesHelper(unittest.TestCase): def setUp(self): self.tmpdir = tempfile.mkdtemp() @@ -228,6 +247,34 @@ class TestWaitForJobChangesHelper(unittest.TestCase): self.assert_(wfjc(self.filename, self._LoadLostJob, ["status"], None, None, 1.0) is None) + def testNonExistentFile(self): + wfjc = jqueue._WaitForJobChangesHelper() + + filename = utils.PathJoin(self.tmpdir, "does-not-exist") + self.assertFalse(os.path.exists(filename)) + + result = wfjc(filename, self._LoadLostJob, ["status"], None, None, 1.0, + _waiter_cls=compat.partial(jqueue._JobChangesWaiter, + _waiter_cls=NotImplemented)) + self.assertTrue(result is None) + + def testInotifyError(self): + jobfile_waiter_cls = \ + compat.partial(jqueue._JobFileChangesWaiter, + _inotify_wm_cls=_FailingWatchManager) + + jobchange_waiter_cls = \ + compat.partial(jqueue._JobChangesWaiter, _waiter_cls=jobfile_waiter_cls) + + wfjc = jqueue._WaitForJobChangesHelper() + + # Test if failing to watch a job file (e.g. due to + # fs.inotify.max_user_watches being too low) raises errors.InotifyError + self.assertRaises(errors.InotifyError, wfjc, + self.filename, self._LoadWaitingJob, + ["status"], [constants.JOB_STATUS_WAITING], None, 1.0, + _waiter_cls=jobchange_waiter_cls) + class TestEncodeOpError(unittest.TestCase): def test(self):