Commit 5fd6b694 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

jqueue: Keep jobs in “waitlock” while returning to queue



Iustin Pop reported that a job's file is updated many times while it
waits for locks held by other thread(s). After an investigation it was
concluded that the reason was a design decision for job priorities to
return jobs to the “queued” status if they couldn't acquire all locks.
Changing a jobs' status or priority requires an update to permanent
storage.

In a high-level view this is what happens:
1. Mark as waitlock
2. Write to disk as permanent storage (jobs left in this state by a
   crashing master daemon are resumed on restart)
3. Wait for lock (assume lock is held by another thread)
4. Mark as queued
5. Write to disk again
6. Return to workerpool

Another option originally discussed was to leave the job in the
“waitlock” status. Ignoring priority changes, this is what would happen:
1. If not in waitlock
1.1. Assert state == queued
1.2. Mark as waitlock
1.3. Set start_timestamp
1.4. Write to disk as permanent storage
3. Wait for locks (assume lock is held by another thread)
4. Leave in waitlock
5. Return to workerpool

Now let's assume the lock is released by the other thread:
[…]
3. Wait for locks and get them
4. Assert state == waitlock
5. Set state to running
6. Set exec_timestamp
7. Write to disk

As this change reduces the number of writes from two per lock acquire
attempt to two per opcode and one per priority increase (as happens
after 24 acquire attempts (see mcpu._CalculateLockAttemptTimeouts) until
the highest priority is reached), here's the patch to implement it.
Unittests are updated.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent ebb2a2a3
...@@ -895,13 +895,28 @@ class _JobProcessor(object): ...@@ -895,13 +895,28 @@ class _JobProcessor(object):
""" """
assert op in job.ops assert op in job.ops
assert op.status in (constants.OP_STATUS_QUEUED,
constants.OP_STATUS_WAITLOCK)
update = False
op.status = constants.OP_STATUS_WAITLOCK
op.result = None op.result = None
op.start_timestamp = TimeStampNow()
if op.status == constants.OP_STATUS_QUEUED:
op.status = constants.OP_STATUS_WAITLOCK
update = True
if op.start_timestamp is None:
op.start_timestamp = TimeStampNow()
update = True
if job.start_timestamp is None: if job.start_timestamp is None:
job.start_timestamp = op.start_timestamp job.start_timestamp = op.start_timestamp
update = True
assert op.status == constants.OP_STATUS_WAITLOCK
return update
def _ExecOpCodeUnlocked(self, opctx): def _ExecOpCodeUnlocked(self, opctx):
"""Processes one opcode and returns the result. """Processes one opcode and returns the result.
...@@ -929,7 +944,8 @@ class _JobProcessor(object): ...@@ -929,7 +944,8 @@ class _JobProcessor(object):
if op.status == constants.OP_STATUS_CANCELING: if op.status == constants.OP_STATUS_CANCELING:
return (constants.OP_STATUS_CANCELING, None) return (constants.OP_STATUS_CANCELING, None)
return (constants.OP_STATUS_QUEUED, None) # Stay in waitlock while trying to re-acquire lock
return (constants.OP_STATUS_WAITLOCK, None)
except CancelJob: except CancelJob:
logging.exception("%s: Canceling job", opctx.log_prefix) logging.exception("%s: Canceling job", opctx.log_prefix)
assert op.status == constants.OP_STATUS_CANCELING assert op.status == constants.OP_STATUS_CANCELING
...@@ -964,6 +980,7 @@ class _JobProcessor(object): ...@@ -964,6 +980,7 @@ class _JobProcessor(object):
# Is a previous opcode still pending? # Is a previous opcode still pending?
if job.cur_opctx: if job.cur_opctx:
opctx = job.cur_opctx opctx = job.cur_opctx
job.cur_opctx = None
else: else:
if __debug__ and _nextop_fn: if __debug__ and _nextop_fn:
_nextop_fn() _nextop_fn()
...@@ -974,7 +991,7 @@ class _JobProcessor(object): ...@@ -974,7 +991,7 @@ class _JobProcessor(object):
# Consistency check # Consistency check
assert compat.all(i.status in (constants.OP_STATUS_QUEUED, assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
constants.OP_STATUS_CANCELED) constants.OP_STATUS_CANCELED)
for i in job.ops[opctx.index:]) for i in job.ops[opctx.index + 1:])
assert op.status in (constants.OP_STATUS_QUEUED, assert op.status in (constants.OP_STATUS_QUEUED,
constants.OP_STATUS_WAITLOCK, constants.OP_STATUS_WAITLOCK,
...@@ -985,13 +1002,13 @@ class _JobProcessor(object): ...@@ -985,13 +1002,13 @@ class _JobProcessor(object):
if op.status != constants.OP_STATUS_CANCELED: if op.status != constants.OP_STATUS_CANCELED:
# Prepare to start opcode # Prepare to start opcode
self._MarkWaitlock(job, op) if self._MarkWaitlock(job, op):
# Write to disk
queue.UpdateJobUnlocked(job)
assert op.status == constants.OP_STATUS_WAITLOCK assert op.status == constants.OP_STATUS_WAITLOCK
assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
assert job.start_timestamp and op.start_timestamp
# Write to disk
queue.UpdateJobUnlocked(job)
logging.info("%s: opcode %s waiting for locks", logging.info("%s: opcode %s waiting for locks",
opctx.log_prefix, opctx.summary) opctx.log_prefix, opctx.summary)
...@@ -1005,7 +1022,7 @@ class _JobProcessor(object): ...@@ -1005,7 +1022,7 @@ class _JobProcessor(object):
op.status = op_status op.status = op_status
op.result = op_result op.result = op_result
if op.status == constants.OP_STATUS_QUEUED: if op.status == constants.OP_STATUS_WAITLOCK:
# Couldn't get locks in time # Couldn't get locks in time
assert not op.end_timestamp assert not op.end_timestamp
else: else:
...@@ -1018,10 +1035,12 @@ class _JobProcessor(object): ...@@ -1018,10 +1035,12 @@ class _JobProcessor(object):
else: else:
assert op.status in constants.OPS_FINALIZED assert op.status in constants.OPS_FINALIZED
if op.status == constants.OP_STATUS_QUEUED: if op.status == constants.OP_STATUS_WAITLOCK:
finalize = False finalize = False
opctx.CheckPriorityIncrease() if opctx.CheckPriorityIncrease():
# Priority was changed, need to update on-disk file
queue.UpdateJobUnlocked(job)
# Keep around for another round # Keep around for another round
job.cur_opctx = opctx job.cur_opctx = opctx
...@@ -1030,9 +1049,7 @@ class _JobProcessor(object): ...@@ -1030,9 +1049,7 @@ class _JobProcessor(object):
op.priority >= constants.OP_PRIO_HIGHEST) op.priority >= constants.OP_PRIO_HIGHEST)
# In no case must the status be finalized here # In no case must the status be finalized here
assert job.CalcStatus() == constants.JOB_STATUS_QUEUED assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
queue.UpdateJobUnlocked(job)
else: else:
# Ensure all opcodes so far have been successful # Ensure all opcodes so far have been successful
......
...@@ -521,6 +521,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): ...@@ -521,6 +521,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertRaises(IndexError, queue.GetNextUpdate) self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired()) self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
self.assertFalse(job.cur_opctx)
def _AfterStart(op, cbs): def _AfterStart(op, cbs):
self.assertEqual(queue.GetNextUpdate(), (job, True)) self.assertEqual(queue.GetNextUpdate(), (job, True))
...@@ -528,6 +529,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): ...@@ -528,6 +529,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertFalse(queue.IsAcquired()) self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
self.assertFalse(job.cur_opctx)
# Job is running, cancelling shouldn't be possible # Job is running, cancelling shouldn't be possible
(success, _) = job.Cancel() (success, _) = job.Cancel()
...@@ -1049,14 +1051,19 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): ...@@ -1049,14 +1051,19 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
self.retries = 0 self.retries = 0
self.prev_tsop = None self.prev_tsop = None
self.prev_prio = None self.prev_prio = None
self.prev_status = None
self.lock_acq_prio = None
self.gave_lock = None self.gave_lock = None
self.done_lock_before_blocking = False self.done_lock_before_blocking = False
def _BeforeStart(self, timeout, priority): def _BeforeStart(self, timeout, priority):
job = self.job job = self.job
self.assertEqual(self.queue.GetNextUpdate(), (job, True)) # If status has changed, job must've been written
if self.prev_status != self.job.ops[self.curop].status:
self.assertEqual(self.queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, self.queue.GetNextUpdate) self.assertRaises(IndexError, self.queue.GetNextUpdate)
self.assertFalse(self.queue.IsAcquired()) self.assertFalse(self.queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
...@@ -1067,6 +1074,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): ...@@ -1067,6 +1074,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
self.assertEqual(priority, job.ops[self.curop].priority) self.assertEqual(priority, job.ops[self.curop].priority)
self.gave_lock = True self.gave_lock = True
self.lock_acq_prio = priority
if (self.curop == 3 and if (self.curop == 3 and
job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3): job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
...@@ -1090,8 +1098,10 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): ...@@ -1090,8 +1098,10 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
def _AfterStart(self, op, cbs): def _AfterStart(self, op, cbs):
job = self.job job = self.job
# Setting to "running" requires an update
self.assertEqual(self.queue.GetNextUpdate(), (job, True)) self.assertEqual(self.queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, self.queue.GetNextUpdate) self.assertRaises(IndexError, self.queue.GetNextUpdate)
self.assertFalse(self.queue.IsAcquired()) self.assertFalse(self.queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
...@@ -1102,6 +1112,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): ...@@ -1102,6 +1112,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
def _NextOpcode(self): def _NextOpcode(self):
self.curop = self.opcounter.next() self.curop = self.opcounter.next()
self.prev_prio = self.job.ops[self.curop].priority self.prev_prio = self.job.ops[self.curop].priority
self.prev_status = self.job.ops[self.curop].status
def _NewTimeoutStrategy(self): def _NewTimeoutStrategy(self):
job = self.job job = self.job
...@@ -1173,28 +1184,56 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): ...@@ -1173,28 +1184,56 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
self.assertFalse(self.done_lock_before_blocking) self.assertFalse(self.done_lock_before_blocking)
for i in itertools.count(0): while True:
proc = jqueue._JobProcessor(self.queue, opexec, job, proc = jqueue._JobProcessor(self.queue, opexec, job,
_timeout_strategy_factory=tsf) _timeout_strategy_factory=tsf)
self.assertRaises(IndexError, self.queue.GetNextUpdate) self.assertRaises(IndexError, self.queue.GetNextUpdate)
if self.curop is not None:
self.prev_status = self.job.ops[self.curop].status
self.lock_acq_prio = None
result = proc(_nextop_fn=self._NextOpcode) result = proc(_nextop_fn=self._NextOpcode)
self.assertEqual(self.queue.GetNextUpdate(), (job, True)) assert self.curop is not None
self.assertRaises(IndexError, self.queue.GetNextUpdate)
if result or self.gave_lock:
# Got lock and/or job is done, result must've been written
self.assertFalse(job.cur_opctx)
self.assertEqual(self.queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, self.queue.GetNextUpdate)
self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
self.assert_(job.ops[self.curop].exec_timestamp)
if result: if result:
self.assertFalse(job.cur_opctx) self.assertFalse(job.cur_opctx)
break break
self.assertFalse(result) self.assertFalse(result)
if self.curop == 0:
self.assertEqual(job.ops[self.curop].start_timestamp,
job.start_timestamp)
if self.gave_lock: if self.gave_lock:
self.assertFalse(job.cur_opctx) # Opcode finished, but job not yet done
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
else: else:
# Did not get locks
self.assert_(job.cur_opctx) self.assert_(job.cur_opctx)
self.assertEqual(job.cur_opctx._timeout_strategy._fn, self.assertEqual(job.cur_opctx._timeout_strategy._fn,
self.timeout_strategy.NextAttempt) self.timeout_strategy.NextAttempt)
self.assertFalse(job.ops[self.curop].exec_timestamp)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
# If priority has changed since acquiring locks, the job must've been
# updated
if self.lock_acq_prio != job.ops[self.curop].priority:
self.assertEqual(self.queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, self.queue.GetNextUpdate)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(job.start_timestamp) self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp) self.assertFalse(job.end_timestamp)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment