Commit 75d81fc8 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Fix locking issue with job dependencies



When jobs waiting for a dependency are notified, they're re-added to the
queue. This would require owning the queue lock in exclusive mode, but
since the function doing so is called from within the job/opcode
processor, it only holds the lock in shared mode.

This patch changes the result of the processor from a boolean to a
status value (integer). This way the caller can be notified about
actions to take, including notifying waiting jobs. The function adding
jobs to the queue can now acquire the lock in exclusive mode.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent f8a4adfa
......@@ -863,6 +863,10 @@ class _OpExecContext:
class _JobProcessor(object):
(DEFER,
WAITDEP,
FINISHED) = range(1, 4)
def __init__(self, queue, opexec_fn, job,
_timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
"""Initializes this class.
......@@ -1050,9 +1054,9 @@ class _JobProcessor(object):
"""Continues execution of a job.
@param _nextop_fn: Callback function for tests
@rtype: bool
@return: True if job is finished, False if processor needs to be called
again
@return: C{FINISHED} if job is fully processed, C{DEFER} if the job should
be deferred and C{WAITDEP} if the dependency manager
(L{_JobDependencyManager}) will re-schedule the job when appropriate
"""
queue = self.queue
......@@ -1068,7 +1072,7 @@ class _JobProcessor(object):
# Don't do anything for finalized jobs
if job.CalcStatus() in constants.JOBS_FINALIZED:
return True
return self.FINISHED
# Is a previous opcode still pending?
if job.cur_opctx:
......@@ -1213,13 +1217,14 @@ class _JobProcessor(object):
if finalize:
logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
# TODO: Check locking
queue.depmgr.NotifyWaiters(job.id)
return True
return self.FINISHED
assert not waitjob or queue.depmgr.JobWaiting(job)
return bool(waitjob)
if waitjob:
return self.WAITDEP
else:
return self.DEFER
finally:
assert job.writable, "Job became read-only while being processed"
queue.release()
......@@ -1265,10 +1270,24 @@ class _JobQueueWorker(workerpool.BaseWorker):
wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
proc.ExecOpCode)
if not _JobProcessor(queue, wrap_execop_fn, job)():
result = _JobProcessor(queue, wrap_execop_fn, job)()
if result == _JobProcessor.FINISHED:
# Notify waiting jobs
queue.depmgr.NotifyWaiters(job.id)
elif result == _JobProcessor.DEFER:
# Schedule again
raise workerpool.DeferTask(priority=job.CalcPriority())
elif result == _JobProcessor.WAITDEP:
# No-op, dependency manager will re-schedule
pass
else:
raise errors.ProgrammerError("Job processor returned unknown status %s" %
(result, ))
@staticmethod
def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
"""Updates the worker thread name to include a short summary of the opcode.
......@@ -1570,7 +1589,7 @@ class JobQueue(object):
if restartjobs:
logging.info("Restarting %s jobs", len(restartjobs))
self._EnqueueJobs(restartjobs)
self._EnqueueJobsUnlocked(restartjobs)
logging.info("Job queue inspection finished")
......@@ -2015,7 +2034,7 @@ class JobQueue(object):
"""
(job_id, ) = self._NewSerialsUnlocked(1)
self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
return job_id
@locking.ssynchronized(_LOCK)
......@@ -2031,7 +2050,7 @@ class JobQueue(object):
(results, added_jobs) = \
self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
self._EnqueueJobs(added_jobs)
self._EnqueueJobsUnlocked(added_jobs)
return results
......@@ -2112,6 +2131,7 @@ class JobQueue(object):
return (results, added_jobs)
@locking.ssynchronized(_LOCK)
def _EnqueueJobs(self, jobs):
"""Helper function to add jobs to worker pool's queue.
......@@ -2119,6 +2139,16 @@ class JobQueue(object):
@param jobs: List of all jobs
"""
return self._EnqueueJobsUnlocked(jobs)
def _EnqueueJobsUnlocked(self, jobs):
"""Helper function to add jobs to worker pool's queue.
@type jobs: list
@param jobs: List of all jobs
"""
assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
self._wpool.AddManyTasks([(job, ) for job in jobs],
priority=[job.CalcPriority() for job in jobs])
......
......@@ -626,9 +626,9 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertRaises(IndexError, queue.GetNextUpdate)
if idx == len(ops) - 1:
# Last opcode
self.assert_(result)
self.assertEqual(result, jqueue._JobProcessor.FINISHED)
else:
self.assertFalse(result)
self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(job.start_timestamp)
......@@ -648,7 +648,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self._GenericCheckJob(job)
# Calling the processor on a finished job should be a no-op
self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
def testOpcodeError(self):
......@@ -687,10 +688,10 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
if idx in (failfrom, len(ops) - 1):
# Last opcode
self.assert_(result)
self.assertEqual(result, jqueue._JobProcessor.FINISHED)
break
self.assertFalse(result)
self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
......@@ -726,7 +727,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self._GenericCheckJob(job)
# Calling the processor on a finished job should be a no-op
self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
def testCancelWhileInQueue(self):
......@@ -757,7 +759,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
# Simulate processor called in workerpool
opexec = _FakeExecOpCodeForProc(queue, None, None)
self.assert_(jqueue._JobProcessor(queue, opexec, job)())
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
# Check result
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
......@@ -802,7 +805,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
for op in job.ops))
opexec = _FakeExecOpCodeForProc(queue, None, None)
self.assert_(jqueue._JobProcessor(queue, opexec, job)())
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
# Check result
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
......@@ -850,7 +854,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assert_(jqueue._JobProcessor(queue, opexec, job)())
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
......@@ -896,7 +901,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
self.assert_(jqueue._JobProcessor(queue, opexec, job)())
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
# Check result
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
......@@ -925,7 +931,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
opexec = _FakeExecOpCodeForProc(queue, None, None)
# Run one opcode
self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.DEFER)
# Job goes back to queued
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
......@@ -940,7 +947,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assert_(success)
# Try processing another opcode (this will actually cancel the job)
self.assert_(jqueue._JobProcessor(queue, opexec, job)())
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
# Check result
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
......@@ -970,7 +978,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
for _ in range(successcount):
self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertEqual(job.GetInfo(["opstatus"]),
......@@ -1002,10 +1011,10 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
if remaining == 0:
# Last opcode
self.assert_(result)
self.assertEqual(result, jqueue._JobProcessor.FINISHED)
break
self.assertFalse(result)
self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
......@@ -1022,13 +1031,15 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self._GenericCheckJob(job)
# Calling the processor on a finished job should be a no-op
self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
# ... also after being restored
job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
# Calling the processor on a finished job should be a no-op
self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(),
jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
def testProcessorOnRunningJob(self):
......@@ -1109,10 +1120,10 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
if remaining == 0:
# Last opcode
self.assert_(result)
self.assertEqual(result, jqueue._JobProcessor.FINISHED)
break
self.assertFalse(result)
self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
......@@ -1217,9 +1228,9 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertRaises(IndexError, queue.GetNextUpdate)
if idx == len(ops) - 1:
# Last opcode
self.assert_(result)
self.assertEqual(result, jqueue._JobProcessor.FINISHED)
else:
self.assertFalse(result)
self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(job.start_timestamp)
......@@ -1244,7 +1255,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self._GenericCheckJob(job)
# Calling the processor on a finished job should be a no-op
self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
def testJobDependency(self):
......@@ -1331,7 +1343,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
if attempt < 5:
# Simulate waiting for other job
self.assertTrue(result)
self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
self.assertTrue(job.cur_opctx)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
self.assertRaises(IndexError, depmgr.GetNextNotification)
......@@ -1339,15 +1351,14 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertFalse(job.end_timestamp)
continue
if result:
if result == jqueue._JobProcessor.FINISHED:
# Last opcode
self.assertFalse(job.cur_opctx)
self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
break
self.assertRaises(IndexError, depmgr.GetNextNotification)
self.assertFalse(result)
self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)
......@@ -1369,7 +1380,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertFalse(depmgr.CountWaitingJobs())
# Calling the processor on a finished job should be a no-op
self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
def testJobDependencyCancel(self):
......@@ -1449,7 +1461,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
if attempt > 0 and attempt < 4:
# Simulate waiting for other job
self.assertTrue(result)
self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
self.assertTrue(job.cur_opctx)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
self.assertRaises(IndexError, depmgr.GetNextNotification)
......@@ -1457,15 +1469,14 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertFalse(job.end_timestamp)
continue
if result:
if result == jqueue._JobProcessor.FINISHED:
# Last opcode
self.assertFalse(job.cur_opctx)
self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
break
self.assertRaises(IndexError, depmgr.GetNextNotification)
self.assertFalse(result)
self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)
......@@ -1486,7 +1497,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertFalse(depmgr.CountPendingResults())
# Calling the processor on a finished job should be a no-op
self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
def testJobDependencyWrongstatus(self):
......@@ -1566,7 +1578,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
if attempt > 0 and attempt < 4:
# Simulate waiting for other job
self.assertTrue(result)
self.assertEqual(result, jqueue._JobProcessor.WAITDEP)
self.assertTrue(job.cur_opctx)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
self.assertRaises(IndexError, depmgr.GetNextNotification)
......@@ -1574,15 +1586,14 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertFalse(job.end_timestamp)
continue
if result:
if result == jqueue._JobProcessor.FINISHED:
# Last opcode
self.assertFalse(job.cur_opctx)
self.assertEqual(queue.depmgr.GetNextNotification(), job_id)
break
self.assertRaises(IndexError, depmgr.GetNextNotification)
self.assertFalse(result)
self.assertEqual(result, jqueue._JobProcessor.DEFER)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)
......@@ -1607,7 +1618,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertFalse(depmgr.CountPendingResults())
# Calling the processor on a finished job should be a no-op
self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, queue.GetNextUpdate)
......@@ -1784,7 +1796,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
result = proc(_nextop_fn=self._NextOpcode)
assert self.curop is not None
if result or self.gave_lock:
if result == jqueue._JobProcessor.FINISHED or self.gave_lock:
# Got lock and/or job is done, result must've been written
self.assertFalse(job.cur_opctx)
self.assertEqual(self.queue.GetNextUpdate(), (job, True))
......@@ -1792,11 +1804,11 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
self.assert_(job.ops[self.curop].exec_timestamp)
if result:
if result == jqueue._JobProcessor.FINISHED:
self.assertFalse(job.cur_opctx)
break
self.assertFalse(result)
self.assertEqual(result, jqueue._JobProcessor.DEFER)
if self.curop == 0:
self.assertEqual(job.ops[self.curop].start_timestamp,
......@@ -1839,7 +1851,8 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
for op in job.ops))
# Calling the processor on a finished job should be a no-op
self.assertTrue(jqueue._JobProcessor(self.queue, opexec, job)())
self.assertEqual(jqueue._JobProcessor(self.queue, opexec, job)(),
jqueue._JobProcessor.FINISHED)
self.assertRaises(IndexError, self.queue.GetNextUpdate)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment