diff --git a/lib/jqueue.py b/lib/jqueue.py index 88e5a4a91048a052458b2b54b0c5a9f5189bf77e..f63f5e99c2e8f20ce26c5f7fec0957f7ed54b807 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -37,6 +37,13 @@ import re import time import weakref +try: + # pylint: disable-msg=E0611 + from pyinotify import pyinotify +except ImportError: + import pyinotify + +from ganeti import asyncnotifier from ganeti import constants from ganeti import serializer from ganeti import workerpool @@ -473,6 +480,116 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): self._job.lock_status = msg +class _WaitForJobChangesHelper(object): + """Helper class using initofy to wait for changes in a job file. + + This class takes a previous job status and serial, and alerts the client when + the current job status has changed. + + @type job_id: string + @ivar job_id: id of the job we're watching + @type prev_job_info: string + @ivar prev_job_info: previous job info, as passed by the luxi client + @type prev_log_serial: string + @ivar prev_log_serial: previous job serial, as passed by the luxi client + @type queue: L{JobQueue} + @ivar queue: job queue (used for a few utility functions) + @type job_path: string + @ivar job_path: absolute path of the job file + @type wm: pyinotify.WatchManager (or None) + @ivar wm: inotify watch manager to watch for changes + @type inotify_handler: L{asyncnotifier.SingleFileEventHandler} + @ivar inotify_handler: single file event handler, used for watching + @type notifier: pyinotify.Notifier + @ivar notifier: inotify single-threaded notifier, used for watching + + """ + + def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue): + self.job_id = job_id + self.fields = fields + self.prev_job_info = prev_job_info + self.prev_log_serial = prev_log_serial + self.queue = queue + # pylint: disable-msg=W0212 + self.job_path = self.queue._GetJobPath(self.job_id) + self.wm = None + self.inotify_handler = None + self.notifier = None + + def _SetupInotify(self): + """Create the inotify + + @raises errors.InotifyError: if the notifier cannot be setup + + """ + if self.wm: + return + self.wm = pyinotify.WatchManager() + self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm, + self.OnInotify, + self.job_path) + self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler) + self.inotify_handler.enable() + + def _LoadDiskStatus(self): + job = self.queue.SafeLoadJobFromDisk(self.job_id) + if not job: + raise errors.JobLost() + self.job_status = job.CalcStatus() + + job_info = job.GetInfo(self.fields) + log_entries = job.GetLogEntries(self.prev_log_serial) + # Serializing and deserializing data can cause type changes (e.g. from + # tuple to list) or precision loss. We're doing it here so that we get + # the same modifications as the data received from the client. Without + # this, the comparison afterwards might fail without the data being + # significantly different. + # TODO: we just deserialized from disk, investigate how to make sure that + # the job info and log entries are compatible to avoid this further step. + self.job_info = serializer.LoadJson(serializer.DumpJson(job_info)) + self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) + + def _CheckForChanges(self): + self._LoadDiskStatus() + # Don't even try to wait if the job is no longer running, there will be + # no changes. + if (self.job_status not in (constants.JOB_STATUS_QUEUED, + constants.JOB_STATUS_RUNNING, + constants.JOB_STATUS_WAITLOCK) or + self.prev_job_info != self.job_info or + (self.log_entries and self.prev_log_serial != self.log_entries[0][0])): + logging.debug("Job %s changed", self.job_id) + return (self.job_info, self.log_entries) + + raise utils.RetryAgain() + + def OnInotify(self, notifier_enabled): + if not notifier_enabled: + self.inotify_handler.enable() + + def WaitFn(self, timeout): + self._SetupInotify() + if self.notifier.check_events(timeout*1000): + self.notifier.read_events() + self.notifier.process_events() + + def WaitForChanges(self, timeout): + try: + return utils.Retry(self._CheckForChanges, + utils.RETRY_REMAINING_TIME, + timeout, + wait_fn=self.WaitFn) + except (errors.InotifyError, errors.JobLost): + return None + except utils.RetryTimeout: + return constants.JOB_NOTCHANGED + + def Close(self): + if self.wm: + self.notifier.stop() + + class _JobQueueWorker(workerpool.BaseWorker): """The actual job workers. @@ -1183,8 +1300,6 @@ class JobQueue(object): # Notify waiters about potential changes job.change.notifyAll() - @utils.LockedMethod - @_RequireOpenQueue def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial, timeout): """Waits for changes in a job. @@ -1209,44 +1324,12 @@ class JobQueue(object): as such by the clients """ - job = self._LoadJobUnlocked(job_id) - if not job: - logging.debug("Job %s not found", job_id) - return None - - def _CheckForChanges(): - logging.debug("Waiting for changes in job %s", job_id) - - status = job.CalcStatus() - job_info = job.GetInfo(fields) - log_entries = job.GetLogEntries(prev_log_serial) - - # Serializing and deserializing data can cause type changes (e.g. from - # tuple to list) or precision loss. We're doing it here so that we get - # the same modifications as the data received from the client. Without - # this, the comparison afterwards might fail without the data being - # significantly different. - job_info = serializer.LoadJson(serializer.DumpJson(job_info)) - log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) - - # Don't even try to wait if the job is no longer running, there will be - # no changes. - if (status not in (constants.JOB_STATUS_QUEUED, - constants.JOB_STATUS_RUNNING, - constants.JOB_STATUS_WAITLOCK) or - prev_job_info != job_info or - (log_entries and prev_log_serial != log_entries[0][0])): - logging.debug("Job %s changed", job_id) - return (job_info, log_entries) - - raise utils.RetryAgain() - + helper = _WaitForJobChangesHelper(job_id, fields, prev_job_info, + prev_log_serial, self) try: - # Setting wait function to release the queue lock while waiting - return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout, - wait_fn=job.change.wait) - except utils.RetryTimeout: - return constants.JOB_NOTCHANGED + return helper.WaitForChanges(timeout) + finally: + helper.Close() @utils.LockedMethod @_RequireOpenQueue