Commit 989a8bee authored by Michael Hanselmann's avatar Michael Hanselmann

jqueue: Factorize code waiting for job changes

By splitting the _WaitForJobChangesHelper class into multiple smaller
classes, we gain in several places:

- Simpler code, less interaction between functions and variables
- Easy to unittest (close to 100% coverage)
- Waiting for job changes has no direct knowledge of queue anymore (it
  doesn't references queue functions anymore, especially not private ones)
- Activate inotify only if there was no change at the beginning (and
  checking again right away to avoid race conditions)
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarGuido Trotter <ultrotter@google.com>
parent 54dc4fdb
......@@ -377,6 +377,7 @@ python_tests = \
test/ganeti.hooks_unittest.py \
test/ganeti.http_unittest.py \
test/ganeti.impexpd_unittest.py \
test/ganeti.jqueue_unittest.py \
test/ganeti.locking_unittest.py \
test/ganeti.luxi_unittest.py \
test/ganeti.masterd.instance_unittest.py \
......
......@@ -785,6 +785,11 @@ JOB_STATUS_RUNNING = "running"
JOB_STATUS_CANCELED = "canceled"
JOB_STATUS_SUCCESS = "success"
JOB_STATUS_ERROR = "error"
JOBS_FINALIZED = frozenset([
JOB_STATUS_CANCELED,
JOB_STATUS_SUCCESS,
JOB_STATUS_ERROR,
])
# OpCode status
# not yet finalized
......
......@@ -54,6 +54,7 @@ from ganeti import utils
from ganeti import jstore
from ganeti import rpc
from ganeti import netutils
from ganeti import compat
JOBQUEUE_THREADS = 25
......@@ -481,65 +482,33 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
self._job.lock_status = msg
class _WaitForJobChangesHelper(object):
"""Helper class using initofy to wait for changes in a job file.
This class takes a previous job status and serial, and alerts the client when
the current job status has changed.
class _JobChangesChecker(object):
def __init__(self, fields, prev_job_info, prev_log_serial):
"""Initializes this class.
@type job_id: string
@ivar job_id: id of the job we're watching
@type prev_job_info: string
@ivar prev_job_info: previous job info, as passed by the luxi client
@type prev_log_serial: string
@ivar prev_log_serial: previous job serial, as passed by the luxi client
@type queue: L{JobQueue}
@ivar queue: job queue (used for a few utility functions)
@type job_path: string
@ivar job_path: absolute path of the job file
@type wm: pyinotify.WatchManager (or None)
@ivar wm: inotify watch manager to watch for changes
@type inotify_handler: L{asyncnotifier.SingleFileEventHandler}
@ivar inotify_handler: single file event handler, used for watching
@type notifier: pyinotify.Notifier
@ivar notifier: inotify single-threaded notifier, used for watching
@type fields: list of strings
@param fields: Fields requested by LUXI client
@type prev_job_info: string
@param prev_job_info: previous job info, as passed by the LUXI client
@type prev_log_serial: string
@param prev_log_serial: previous job serial, as passed by the LUXI client
"""
def __init__(self, job_id, fields, prev_job_info, prev_log_serial, queue):
self.job_id = job_id
self.fields = fields
self.prev_job_info = prev_job_info
self.prev_log_serial = prev_log_serial
self.queue = queue
# pylint: disable-msg=W0212
self.job_path = self.queue._GetJobPath(self.job_id)
self.wm = None
self.inotify_handler = None
self.notifier = None
"""
self._fields = fields
self._prev_job_info = prev_job_info
self._prev_log_serial = prev_log_serial
def _SetupInotify(self):
"""Create the inotify
def __call__(self, job):
"""Checks whether job has changed.
@raises errors.InotifyError: if the notifier cannot be setup
@type job: L{_QueuedJob}
@param job: Job object
"""
if self.wm:
return
self.wm = pyinotify.WatchManager()
self.inotify_handler = asyncnotifier.SingleFileEventHandler(self.wm,
self.OnInotify,
self.job_path)
self.notifier = pyinotify.Notifier(self.wm, self.inotify_handler)
self.inotify_handler.enable()
def _LoadDiskStatus(self):
job = self.queue.SafeLoadJobFromDisk(self.job_id)
if not job:
raise errors.JobLost()
self.job_status = job.CalcStatus()
status = job.CalcStatus()
job_info = job.GetInfo(self._fields)
log_entries = job.GetLogEntries(self._prev_log_serial)
job_info = job.GetInfo(self.fields)
log_entries = job.GetLogEntries(self.prev_log_serial)
# Serializing and deserializing data can cause type changes (e.g. from
# tuple to list) or precision loss. We're doing it here so that we get
# the same modifications as the data received from the client. Without
......@@ -547,49 +516,164 @@ class _WaitForJobChangesHelper(object):
# significantly different.
# TODO: we just deserialized from disk, investigate how to make sure that
# the job info and log entries are compatible to avoid this further step.
self.job_info = serializer.LoadJson(serializer.DumpJson(job_info))
self.log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
# TODO: Doing something like in testutils.py:UnifyValueType might be more
# efficient, though floats will be tricky
job_info = serializer.LoadJson(serializer.DumpJson(job_info))
log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
def _CheckForChanges(self):
self._LoadDiskStatus()
# Don't even try to wait if the job is no longer running, there will be
# no changes.
if (self.job_status not in (constants.JOB_STATUS_QUEUED,
constants.JOB_STATUS_RUNNING,
constants.JOB_STATUS_WAITLOCK) or
self.prev_job_info != self.job_info or
(self.log_entries and self.prev_log_serial != self.log_entries[0][0])):
logging.debug("Job %s changed", self.job_id)
return (self.job_info, self.log_entries)
if (status not in (constants.JOB_STATUS_QUEUED,
constants.JOB_STATUS_RUNNING,
constants.JOB_STATUS_WAITLOCK) or
job_info != self._prev_job_info or
(log_entries and self._prev_log_serial != log_entries[0][0])):
logging.debug("Job %s changed", job.id)
return (job_info, log_entries)
raise utils.RetryAgain()
return None
class _JobFileChangesWaiter(object):
def __init__(self, filename):
"""Initializes this class.
@type filename: string
@param filename: Path to job file
@raises errors.InotifyError: if the notifier cannot be setup
def OnInotify(self, notifier_enabled):
"""
self._wm = pyinotify.WatchManager()
self._inotify_handler = \
asyncnotifier.SingleFileEventHandler(self._wm, self._OnInotify, filename)
self._notifier = \
pyinotify.Notifier(self._wm, default_proc_fun=self._inotify_handler)
try:
self._inotify_handler.enable()
except Exception:
# pyinotify doesn't close file descriptors automatically
self._notifier.stop()
raise
def _OnInotify(self, notifier_enabled):
"""Callback for inotify.
"""
if not notifier_enabled:
self.inotify_handler.enable()
self._inotify_handler.enable()
def Wait(self, timeout):
"""Waits for the job file to change.
@type timeout: float
@param timeout: Timeout in seconds
@return: Whether there have been events
"""
assert timeout >= 0
have_events = self._notifier.check_events(timeout * 1000)
if have_events:
self._notifier.read_events()
self._notifier.process_events()
return have_events
def Close(self):
"""Closes underlying notifier and its file descriptor.
"""
self._notifier.stop()
class _JobChangesWaiter(object):
def __init__(self, filename):
"""Initializes this class.
@type filename: string
@param filename: Path to job file
"""
self._filewaiter = None
self._filename = filename
def WaitFn(self, timeout):
self._SetupInotify()
if self.notifier.check_events(timeout*1000):
self.notifier.read_events()
self.notifier.process_events()
def Wait(self, timeout):
"""Waits for a job to change.
def WaitForChanges(self, timeout):
self._SetupInotify()
@type timeout: float
@param timeout: Timeout in seconds
@return: Whether there have been events
"""
if self._filewaiter:
return self._filewaiter.Wait(timeout)
# Lazy setup: Avoid inotify setup cost when job file has already changed.
# If this point is reached, return immediately and let caller check the job
# file again in case there were changes since the last check. This avoids a
# race condition.
self._filewaiter = _JobFileChangesWaiter(self._filename)
return True
def Close(self):
"""Closes underlying waiter.
"""
if self._filewaiter:
self._filewaiter.Close()
class _WaitForJobChangesHelper(object):
"""Helper class using inotify to wait for changes in a job file.
This class takes a previous job status and serial, and alerts the client when
the current job status has changed.
"""
@staticmethod
def _CheckForChanges(job_load_fn, check_fn):
job = job_load_fn()
if not job:
raise errors.JobLost()
result = check_fn(job)
if result is None:
raise utils.RetryAgain()
return result
def __call__(self, filename, job_load_fn,
fields, prev_job_info, prev_log_serial, timeout):
"""Waits for changes on a job.
@type filename: string
@param filename: File on which to wait for changes
@type job_load_fn: callable
@param job_load_fn: Function to load job
@type fields: list of strings
@param fields: Which fields to check for changes
@type prev_job_info: list or None
@param prev_job_info: Last job information returned
@type prev_log_serial: int
@param prev_log_serial: Last job message serial number
@type timeout: float
@param timeout: maximum time to wait in seconds
"""
try:
return utils.Retry(self._CheckForChanges,
utils.RETRY_REMAINING_TIME,
timeout,
wait_fn=self.WaitFn)
check_fn = _JobChangesChecker(fields, prev_job_info, prev_log_serial)
waiter = _JobChangesWaiter(filename)
try:
return utils.Retry(compat.partial(self._CheckForChanges,
job_load_fn, check_fn),
utils.RETRY_REMAINING_TIME, timeout,
wait_fn=waiter.Wait)
finally:
waiter.Close()
except (errors.InotifyError, errors.JobLost):
return None
except utils.RetryTimeout:
return constants.JOB_NOTCHANGED
def Close(self):
if self.wm:
self.notifier.stop()
class _JobQueueWorker(workerpool.BaseWorker):
"""The actual job workers.
......@@ -1314,7 +1398,7 @@ class JobQueue(object):
@type prev_log_serial: int
@param prev_log_serial: Last job message serial number
@type timeout: float
@param timeout: maximum time to wait
@param timeout: maximum time to wait in seconds
@rtype: tuple (job info, log entries)
@return: a tuple of the job information as required via
the fields parameter, and the log entries as a list
......@@ -1325,12 +1409,12 @@ class JobQueue(object):
as such by the clients
"""
helper = _WaitForJobChangesHelper(job_id, fields, prev_job_info,
prev_log_serial, self)
try:
return helper.WaitForChanges(timeout)
finally:
helper.Close()
load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id)
helper = _WaitForJobChangesHelper()
return helper(self._GetJobPath(job_id), load_fn,
fields, prev_job_info, prev_log_serial, timeout)
@locking.ssynchronized(_LOCK)
@_RequireOpenQueue
......@@ -1380,9 +1464,7 @@ class JobQueue(object):
archive_jobs = []
rename_files = []
for job in jobs:
if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
constants.JOB_STATUS_SUCCESS,
constants.JOB_STATUS_ERROR):
if job.CalcStatus() not in constants.JOBS_FINALIZED:
logging.debug("Job %s is not yet done", job.id)
continue
......
#!/usr/bin/python
#
# Copyright (C) 2010 Google Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
# 02110-1301, USA.
"""Script for testing ganeti.jqueue"""
import os
import sys
import unittest
import tempfile
import shutil
import errno
from ganeti import constants
from ganeti import utils
from ganeti import errors
from ganeti import jqueue
import testutils
class _FakeJob:
def __init__(self, job_id, status):
self.id = job_id
self._status = status
self._log = []
def SetStatus(self, status):
self._status = status
def AddLogEntry(self, msg):
self._log.append((len(self._log), msg))
def CalcStatus(self):
return self._status
def GetInfo(self, fields):
result = []
for name in fields:
if name == "status":
result.append(self._status)
else:
raise Exception("Unknown field")
return result
def GetLogEntries(self, newer_than):
assert newer_than is None or newer_than >= 0
if newer_than is None:
return self._log
return self._log[newer_than:]
class TestJobChangesChecker(unittest.TestCase):
def testStatus(self):
job = _FakeJob(9094, constants.JOB_STATUS_QUEUED)
checker = jqueue._JobChangesChecker(["status"], None, None)
self.assertEqual(checker(job), ([constants.JOB_STATUS_QUEUED], []))
job.SetStatus(constants.JOB_STATUS_RUNNING)
self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
job.SetStatus(constants.JOB_STATUS_SUCCESS)
self.assertEqual(checker(job), ([constants.JOB_STATUS_SUCCESS], []))
# job.id is used by checker
self.assertEqual(job.id, 9094)
def testStatusWithPrev(self):
job = _FakeJob(12807, constants.JOB_STATUS_QUEUED)
checker = jqueue._JobChangesChecker(["status"],
[constants.JOB_STATUS_QUEUED], None)
self.assert_(checker(job) is None)
job.SetStatus(constants.JOB_STATUS_RUNNING)
self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
def testFinalStatus(self):
for status in constants.JOBS_FINALIZED:
job = _FakeJob(2178711, status)
checker = jqueue._JobChangesChecker(["status"], [status], None)
# There won't be any changes in this status, hence it should signal
# a change immediately
self.assertEqual(checker(job), ([status], []))
def testLog(self):
job = _FakeJob(9094, constants.JOB_STATUS_RUNNING)
checker = jqueue._JobChangesChecker(["status"], None, None)
self.assertEqual(checker(job), ([constants.JOB_STATUS_RUNNING], []))
job.AddLogEntry("Hello World")
(job_info, log_entries) = checker(job)
self.assertEqual(job_info, [constants.JOB_STATUS_RUNNING])
self.assertEqual(log_entries, [[0, "Hello World"]])
checker2 = jqueue._JobChangesChecker(["status"], job_info, len(log_entries))
self.assert_(checker2(job) is None)
job.AddLogEntry("Foo Bar")
job.SetStatus(constants.JOB_STATUS_ERROR)
(job_info, log_entries) = checker2(job)
self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
self.assertEqual(log_entries, [[1, "Foo Bar"]])
checker3 = jqueue._JobChangesChecker(["status"], None, None)
(job_info, log_entries) = checker3(job)
self.assertEqual(job_info, [constants.JOB_STATUS_ERROR])
self.assertEqual(log_entries, [[0, "Hello World"], [1, "Foo Bar"]])
class TestJobChangesWaiter(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.filename = utils.PathJoin(self.tmpdir, "job-1")
utils.WriteFile(self.filename, data="")
def tearDown(self):
shutil.rmtree(self.tmpdir)
def _EnsureNotifierClosed(self, notifier):
try:
os.fstat(notifier._fd)
except EnvironmentError, err:
self.assertEqual(err.errno, errno.EBADF)
else:
self.fail("File descriptor wasn't closed")
def testClose(self):
for wait in [False, True]:
waiter = jqueue._JobFileChangesWaiter(self.filename)
try:
if wait:
waiter.Wait(0.001)
finally:
waiter.Close()
# Ensure file descriptor was closed
self._EnsureNotifierClosed(waiter._notifier)
def testChangingFile(self):
waiter = jqueue._JobFileChangesWaiter(self.filename)
try:
self.assertFalse(waiter.Wait(0.1))
utils.WriteFile(self.filename, data="changed")
self.assert_(waiter.Wait(60))
finally:
waiter.Close()
self._EnsureNotifierClosed(waiter._notifier)
def testChangingFile2(self):
waiter = jqueue._JobChangesWaiter(self.filename)
try:
self.assertFalse(waiter._filewaiter)
self.assert_(waiter.Wait(0.1))
self.assert_(waiter._filewaiter)
# File waiter is now used, but there have been no changes
self.assertFalse(waiter.Wait(0.1))
utils.WriteFile(self.filename, data="changed")
self.assert_(waiter.Wait(60))
finally:
waiter.Close()
self._EnsureNotifierClosed(waiter._filewaiter._notifier)
class TestWaitForJobChangesHelper(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.filename = utils.PathJoin(self.tmpdir, "job-2614226563")
utils.WriteFile(self.filename, data="")
def tearDown(self):
shutil.rmtree(self.tmpdir)
def _LoadWaitingJob(self):
return _FakeJob(2614226563, constants.JOB_STATUS_WAITLOCK)
def _LoadLostJob(self):
return None
def testNoChanges(self):
wfjc = jqueue._WaitForJobChangesHelper()
# No change
self.assertEqual(wfjc(self.filename, self._LoadWaitingJob, ["status"],
[constants.JOB_STATUS_WAITLOCK], None, 0.1),
constants.JOB_NOTCHANGED)
# No previous information
self.assertEqual(wfjc(self.filename, self._LoadWaitingJob,
["status"], None, None, 1.0),
([constants.JOB_STATUS_WAITLOCK], []))
def testLostJob(self):
wfjc = jqueue._WaitForJobChangesHelper()
self.assert_(wfjc(self.filename, self._LoadLostJob,
["status"], None, None, 1.0) is None)
if __name__ == "__main__":
testutils.GanetiTestProgram()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment