Commit fcb21ad7 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Export job dependencies through lock monitor



This makes them visible to the user. Example:

$ gnt-debug locks -o name,pending
Name    Pending
job/890 job:891,892
job/892 job:894
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 4c03b2b5
......@@ -1340,8 +1340,6 @@ class _JobDependencyManager:
CONTINUE,
WRONGSTATUS) = range(1, 6)
# TODO: Export waiter information to lock monitor
def __init__(self, getstatus_fn, enqueue_fn):
"""Initializes this class.
......@@ -1352,6 +1350,22 @@ class _JobDependencyManager:
self._waiters = {}
self._lock = locking.SharedLock("JobDepMgr")
@locking.ssynchronized(_LOCK, shared=1)
def GetLockInfo(self, requested): # pylint: disable-msg=W0613
"""Retrieves information about waiting jobs.
@type requested: set
@param requested: Requested information, see C{query.LQ_*}
"""
# No need to sort here, that's being done by the lock manager and query
# library. There are no priorities for notifying jobs, hence all show up as
# one item under "pending".
return [("job/%s" % job_id, None, None,
[("job", [job.id for job in waiters])])
for job_id, waiters in self._waiters.items()
if waiters]
@locking.ssynchronized(_LOCK, shared=1)
def JobWaiting(self, job):
"""Checks if a job is waiting.
......@@ -1527,6 +1541,7 @@ class JobQueue(object):
# Job dependencies
self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
self._EnqueueJobs)
self.context.glm.AddToLockMonitor(self.depmgr)
# Setup worker pool
self._wpool = _JobQueueWorkerPool(self)
......
......@@ -28,6 +28,7 @@ import tempfile
import shutil
import errno
import itertools
import random
from ganeti import constants
from ganeti import utils
......@@ -36,6 +37,7 @@ from ganeti import jqueue
from ganeti import opcodes
from ganeti import compat
from ganeti import mcpu
from ganeti import query
import testutils
......@@ -1887,6 +1889,9 @@ class TestJobDependencyManager(unittest.TestCase):
self.assertEqual(self.jdm._waiters, {
job_id: set([job]),
})
self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
("job/28625", None, None, [("job", [job.id])])
])
self._status.append((job_id, constants.JOB_STATUS_CANCELED))
(result, _) = self.jdm.CheckAndRegister(job, job_id, [])
......@@ -1894,6 +1899,7 @@ class TestJobDependencyManager(unittest.TestCase):
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertFalse(self.jdm.JobWaiting(job))
self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
def testRequireCancel(self):
job = self._FakeJob(5278)
......@@ -1909,6 +1915,9 @@ class TestJobDependencyManager(unittest.TestCase):
self.assertEqual(self.jdm._waiters, {
job_id: set([job]),
})
self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
("job/9610", None, None, [("job", [job.id])])
])
self._status.append((job_id, constants.JOB_STATUS_CANCELED))
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
......@@ -1916,6 +1925,7 @@ class TestJobDependencyManager(unittest.TestCase):
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertFalse(self.jdm.JobWaiting(job))
self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
def testRequireError(self):
job = self._FakeJob(21459)
......@@ -1938,6 +1948,7 @@ class TestJobDependencyManager(unittest.TestCase):
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertFalse(self.jdm.JobWaiting(job))
self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
def testRequireMultiple(self):
dep_status = list(constants.JOBS_FINALIZED)
......@@ -1955,6 +1966,9 @@ class TestJobDependencyManager(unittest.TestCase):
self.assertEqual(self.jdm._waiters, {
job_id: set([job]),
})
self.assertEqual(self.jdm.GetLockInfo([query.LQ_PENDING]), [
("job/14609", None, None, [("job", [job.id])])
])
self._status.append((job_id, end_status))
(result, _) = self.jdm.CheckAndRegister(job, job_id, dep_status)
......@@ -1962,6 +1976,7 @@ class TestJobDependencyManager(unittest.TestCase):
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertFalse(self.jdm.JobWaiting(job))
self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
def testNotify(self):
job = self._FakeJob(8227)
......@@ -2050,6 +2065,72 @@ class TestJobDependencyManager(unittest.TestCase):
self.assertFalse(self._status)
self.assertFalse(self._queue)
def testMultipleWaiting(self):
# Use a deterministic random generator
rnd = random.Random(21402)
job_ids = map(str, rnd.sample(range(1, 10000), 150))
waiters = dict((job_ids.pop(),
set(map(self._FakeJob,
[job_ids.pop()
for _ in range(rnd.randint(1, 20))])))
for _ in range(10))
# Ensure there are no duplicate job IDs
assert not utils.FindDuplicates(waiters.keys() +
[job.id
for jobs in waiters.values()
for job in jobs])
# Register all jobs as waiters
for job_id, job in [(job_id, job)
for (job_id, jobs) in waiters.items()
for job in jobs]:
self._status.append((job_id, constants.JOB_STATUS_QUEUED))
(result, _) = self.jdm.CheckAndRegister(job, job_id,
[constants.JOB_STATUS_SUCCESS])
self.assertEqual(result, self.jdm.WAIT)
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertTrue(self.jdm.JobWaiting(job))
self.assertEqual(self.jdm._waiters, waiters)
def _MakeSet((name, mode, owner_names, pending)):
return (name, mode, owner_names,
[(pendmode, set(pend)) for (pendmode, pend) in pending])
def _CheckLockInfo():
info = self.jdm.GetLockInfo([query.LQ_PENDING])
self.assertEqual(sorted(map(_MakeSet, info)), sorted([
("job/%s" % job_id, None, None,
[("job", set([job.id for job in jobs]))])
for job_id, jobs in waiters.items()
if jobs
]))
_CheckLockInfo()
# Notify in random order
for job_id in rnd.sample(waiters, len(waiters)):
# Remove from pending waiter list
jobs = waiters.pop(job_id)
for job in jobs:
self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
(result, _) = self.jdm.CheckAndRegister(job, job_id,
[constants.JOB_STATUS_SUCCESS])
self.assertEqual(result, self.jdm.CONTINUE)
self.assertFalse(self._status)
self.assertFalse(self._queue)
self.assertFalse(self.jdm.JobWaiting(job))
_CheckLockInfo()
self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
assert not waiters
def testSelfDependency(self):
job = self._FakeJob(18937)
......@@ -2068,6 +2149,7 @@ class TestJobDependencyManager(unittest.TestCase):
(result, _) = jdm.CheckAndRegister(job, job_id, [])
self.assertEqual(result, self.jdm.ERROR)
self.assertFalse(jdm.JobWaiting(job))
self.assertFalse(jdm.GetLockInfo([query.LQ_PENDING]))
if __name__ == "__main__":
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment