Commit 6c2549d6 authored by Guido Trotter's avatar Guido Trotter
Browse files

Parallelize WaitForJobChanges



As for QueryJobs we rely on file updates rather than condition
notification to acquire job changes. In order to do that we use the
pyinotify module to watch files. This might make the client a bit slower
(pending planned improvements, such as subscription-based
WaitForJobChanges) but detaches it from the job execution.
Signed-off-by: default avatarGuido Trotter <ultrotter@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent b3855790
......@@ -37,6 +37,13 @@ import re
import time
import weakref
try:
# pylint: disable-msg=E0611
from pyinotify import pyinotify
except ImportError:
import pyinotify
from ganeti import asyncnotifier
from ganeti import constants
from ganeti import serializer
from ganeti import workerpool
......@@ -473,6 +480,116 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
self._job.lock_status = msg
class _WaitForJobChangesHelper(object):
"""Helper class using initofy to wait for changes in a job file.
This class takes a previous job status and serial, and alerts the client when
the current job status has changed.
@type job_id: string
@ivar job_id: id of the job we're watching
@type prev_job_info: string
@ivar prev_job_info: previous job info, as passed by the luxi client
@type prev_log_serial: string
@ivar prev_log_serial: previous job serial, as passed by the luxi client
@type queue: L{JobQueue}
@ivar queue: job queue (used for a few utility functions)
@type job_path: string
@ivar job_path: absolute path of the job file
@type wm: pyinotify.WatchManager (or None)
@ivar wm: inotify watch manager to watch for changes
@type inotify_handler: L{asyncnotifier.SingleFileEventHandler}
@ivar inotify_handler: single file event handler, used for watching
@type notifier: pyinotify.Notifier
@ivar notifier: inotify single-threaded notifier, used for watching
"""
def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue):
self.job_id = job_id
self.fields = fields
self.prev_job_info = prev_job_info
self.prev_log_serial = prev_log_serial
self.queue = queue
# pylint: disable-msg=W0212
self.job_path = self.queue._GetJobPath(self.job_id)
self.wm = None
self.inotify_handler = None
self.notifier = None
def _SetupInotify(self):
"""Create the inotify
@raises errors.InotifyError: if the notifier cannot be setup
"""
if self.wm:
return
self.wm = pyinotify.WatchManager()
self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm,
self.OnInotify,
self.job_path)
self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler)
self.inotify_handler.enable()
def _LoadDiskStatus(self):
job = self.queue.SafeLoadJobFromDisk(self.job_id)
if not job:
raise errors.JobLost()
self.job_status = job.CalcStatus()
job_info = job.GetInfo(self.fields)
log_entries = job.GetLogEntries(self.prev_log_serial)
# Serializing and deserializing data can cause type changes (e.g. from
# tuple to list) or precision loss. We're doing it here so that we get
# the same modifications as the data received from the client. Without
# this, the comparison afterwards might fail without the data being
# significantly different.
# TODO: we just deserialized from disk, investigate how to make sure that
# the job info and log entries are compatible to avoid this further step.
self.job_info = serializer.LoadJson(serializer.DumpJson(job_info))
self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
def _CheckForChanges(self):
self._LoadDiskStatus()
# Don't even try to wait if the job is no longer running, there will be
# no changes.
if (self.job_status not in (constants.JOB_STATUS_QUEUED,
constants.JOB_STATUS_RUNNING,
constants.JOB_STATUS_WAITLOCK) or
self.prev_job_info != self.job_info or
(self.log_entries and self.prev_log_serial != self.log_entries[0][0])):
logging.debug("Job %s changed", self.job_id)
return (self.job_info, self.log_entries)
raise utils.RetryAgain()
def OnInotify(self, notifier_enabled):
if not notifier_enabled:
self.inotify_handler.enable()
def WaitFn(self, timeout):
self._SetupInotify()
if self.notifier.check_events(timeout*1000):
self.notifier.read_events()
self.notifier.process_events()
def WaitForChanges(self, timeout):
try:
return utils.Retry(self._CheckForChanges,
utils.RETRY_REMAINING_TIME,
timeout,
wait_fn=self.WaitFn)
except (errors.InotifyError, errors.JobLost):
return None
except utils.RetryTimeout:
return constants.JOB_NOTCHANGED
def Close(self):
if self.wm:
self.notifier.stop()
class _JobQueueWorker(workerpool.BaseWorker):
"""The actual job workers.
......@@ -1183,8 +1300,6 @@ class JobQueue(object):
# Notify waiters about potential changes
job.change.notifyAll()
@utils.LockedMethod
@_RequireOpenQueue
def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
timeout):
"""Waits for changes in a job.
......@@ -1209,44 +1324,12 @@ class JobQueue(object):
as such by the clients
"""
job = self._LoadJobUnlocked(job_id)
if not job:
logging.debug("Job %s not found", job_id)
return None
def _CheckForChanges():
logging.debug("Waiting for changes in job %s", job_id)
status = job.CalcStatus()
job_info = job.GetInfo(fields)
log_entries = job.GetLogEntries(prev_log_serial)
# Serializing and deserializing data can cause type changes (e.g. from
# tuple to list) or precision loss. We're doing it here so that we get
# the same modifications as the data received from the client. Without
# this, the comparison afterwards might fail without the data being
# significantly different.
job_info = serializer.LoadJson(serializer.DumpJson(job_info))
log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
# Don't even try to wait if the job is no longer running, there will be
# no changes.
if (status not in (constants.JOB_STATUS_QUEUED,
constants.JOB_STATUS_RUNNING,
constants.JOB_STATUS_WAITLOCK) or
prev_job_info != job_info or
(log_entries and prev_log_serial != log_entries[0][0])):
logging.debug("Job %s changed", job_id)
return (job_info, log_entries)
raise utils.RetryAgain()
helper = _WaitForJobChangesHelper(job_id, fields, prev_job_info,
prev_log_serial, self)
try:
# Setting wait function to release the queue lock while waiting
return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
wait_fn=job.change.wait)
except utils.RetryTimeout:
return constants.JOB_NOTCHANGED
return helper.WaitForChanges(timeout)
finally:
helper.Close()
@utils.LockedMethod
@_RequireOpenQueue
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment