Commit 50d54091 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Merge branch 'devel-2.3'



* devel-2.3:
  jqueue: Keep jobs in “waitlock” while returning to queue
  Improve jqueue unittests
  Update manpages to display version 2.3

Conflicts:
	man/ganeti-cleaner.sgml: Removed
	man/ganeti-confd.sgml: Removed
	man/ganeti-masterd.sgml: Removed
	man/ganeti-noded.sgml: Removed
	man/ganeti-os-interface.sgml: Removed
	man/ganeti-rapi.sgml: Removed
	man/ganeti-watcher.sgml: Removed
	man/ganeti.sgml: Removed
	man/gnt-backup.sgml: Removed
	man/gnt-cluster.sgml: Removed
	man/gnt-debug.sgml: Removed
	man/gnt-instance.sgml: Removed
	man/gnt-job.sgml: Removed
	man/gnt-node.sgml: Removed
	man/gnt-os.sgml: Removed
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parents abefdcff 5fd6b694
......@@ -895,13 +895,28 @@ class _JobProcessor(object):
"""
assert op in job.ops
assert op.status in (constants.OP_STATUS_QUEUED,
constants.OP_STATUS_WAITLOCK)
update = False
op.status = constants.OP_STATUS_WAITLOCK
op.result = None
op.start_timestamp = TimeStampNow()
if op.status == constants.OP_STATUS_QUEUED:
op.status = constants.OP_STATUS_WAITLOCK
update = True
if op.start_timestamp is None:
op.start_timestamp = TimeStampNow()
update = True
if job.start_timestamp is None:
job.start_timestamp = op.start_timestamp
update = True
assert op.status == constants.OP_STATUS_WAITLOCK
return update
def _ExecOpCodeUnlocked(self, opctx):
"""Processes one opcode and returns the result.
......@@ -929,7 +944,8 @@ class _JobProcessor(object):
if op.status == constants.OP_STATUS_CANCELING:
return (constants.OP_STATUS_CANCELING, None)
return (constants.OP_STATUS_QUEUED, None)
# Stay in waitlock while trying to re-acquire lock
return (constants.OP_STATUS_WAITLOCK, None)
except CancelJob:
logging.exception("%s: Canceling job", opctx.log_prefix)
assert op.status == constants.OP_STATUS_CANCELING
......@@ -964,6 +980,7 @@ class _JobProcessor(object):
# Is a previous opcode still pending?
if job.cur_opctx:
opctx = job.cur_opctx
job.cur_opctx = None
else:
if __debug__ and _nextop_fn:
_nextop_fn()
......@@ -974,7 +991,7 @@ class _JobProcessor(object):
# Consistency check
assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
constants.OP_STATUS_CANCELED)
for i in job.ops[opctx.index:])
for i in job.ops[opctx.index + 1:])
assert op.status in (constants.OP_STATUS_QUEUED,
constants.OP_STATUS_WAITLOCK,
......@@ -985,13 +1002,13 @@ class _JobProcessor(object):
if op.status != constants.OP_STATUS_CANCELED:
# Prepare to start opcode
self._MarkWaitlock(job, op)
if self._MarkWaitlock(job, op):
# Write to disk
queue.UpdateJobUnlocked(job)
assert op.status == constants.OP_STATUS_WAITLOCK
assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
# Write to disk
queue.UpdateJobUnlocked(job)
assert job.start_timestamp and op.start_timestamp
logging.info("%s: opcode %s waiting for locks",
opctx.log_prefix, opctx.summary)
......@@ -1005,7 +1022,7 @@ class _JobProcessor(object):
op.status = op_status
op.result = op_result
if op.status == constants.OP_STATUS_QUEUED:
if op.status == constants.OP_STATUS_WAITLOCK:
# Couldn't get locks in time
assert not op.end_timestamp
else:
......@@ -1018,10 +1035,12 @@ class _JobProcessor(object):
else:
assert op.status in constants.OPS_FINALIZED
if op.status == constants.OP_STATUS_QUEUED:
if op.status == constants.OP_STATUS_WAITLOCK:
finalize = False
opctx.CheckPriorityIncrease()
if opctx.CheckPriorityIncrease():
# Priority was changed, need to update on-disk file
queue.UpdateJobUnlocked(job)
# Keep around for another round
job.cur_opctx = opctx
......@@ -1030,9 +1049,7 @@ class _JobProcessor(object):
op.priority >= constants.OP_PRIO_HIGHEST)
# In no case must the status be finalized here
assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
queue.UpdateJobUnlocked(job)
assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
else:
# Ensure all opcodes so far have been successful
......
......@@ -427,10 +427,14 @@ class TestQueuedJob(unittest.TestCase):
class _FakeQueueForProc:
def __init__(self):
self._acquired = False
self._updates = []
def IsAcquired(self):
return self._acquired
def GetNextUpdate(self):
return self._updates.pop(0)
def acquire(self, shared=0):
assert shared == 1
self._acquired = True
......@@ -439,18 +443,21 @@ class _FakeQueueForProc:
assert self._acquired
self._acquired = False
def UpdateJobUnlocked(self, job, replicate=None):
# TODO: Ensure job is updated at the correct places
pass
def UpdateJobUnlocked(self, job, replicate=True):
assert self._acquired, "Lock not acquired while updating job"
self._updates.append((job, bool(replicate)))
class _FakeExecOpCodeForProc:
def __init__(self, before_start, after_start):
def __init__(self, queue, before_start, after_start):
self._queue = queue
self._before_start = before_start
self._after_start = after_start
def __call__(self, op, cbs, timeout=None, priority=None):
assert isinstance(op, opcodes.OpTestDummy)
assert not self._queue.IsAcquired(), \
"Queue lock not released when executing opcode"
if self._before_start:
self._before_start(timeout, priority)
......@@ -460,6 +467,9 @@ class _FakeExecOpCodeForProc:
if self._after_start:
self._after_start(op, cbs)
# Check again after the callbacks
assert not self._queue.IsAcquired()
if op.fail:
raise errors.OpExecError("Error requested (%s)" % op.result)
......@@ -507,21 +517,31 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
job = self._CreateJob(queue, job_id, ops)
def _BeforeStart(timeout, priority):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
self.assertFalse(job.cur_opctx)
def _AfterStart(op, cbs):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
self.assertFalse(job.cur_opctx)
# Job is running, cancelling shouldn't be possible
(success, _) = job.Cancel()
self.assertFalse(success)
opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
for idx in range(len(ops)):
self.assertRaises(IndexError, queue.GetNextUpdate)
result = jqueue._JobProcessor(queue, opexec, job)()
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
if idx == len(ops) - 1:
# Last opcode
self.assert_(result)
......@@ -532,6 +552,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
self.assertEqual(job.GetInfo(["opresult"]),
......@@ -568,10 +590,18 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
# Create job
job = self._CreateJob(queue, job_id, ops)
opexec = _FakeExecOpCodeForProc(None, None)
opexec = _FakeExecOpCodeForProc(queue, None, None)
for idx in range(len(ops)):
self.assertRaises(IndexError, queue.GetNextUpdate)
result = jqueue._JobProcessor(queue, opexec, job)()
# queued to waitlock
self.assertEqual(queue.GetNextUpdate(), (job, True))
# waitlock to running
self.assertEqual(queue.GetNextUpdate(), (job, True))
# Opcode result
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
if idx in (failfrom, len(ops) - 1):
# Last opcode
......@@ -582,6 +612,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertRaises(IndexError, queue.GetNextUpdate)
# Check job status
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_ERROR)
self.assertEqual(job.GetInfo(["id"]), [job_id])
......@@ -631,10 +663,12 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
(success, _) = job.Cancel()
self.assert_(success)
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
for op in job.ops))
opexec = _FakeExecOpCodeForProc(None, None)
opexec = _FakeExecOpCodeForProc(queue, None, None)
jqueue._JobProcessor(queue, opexec, job)()
# Check result
......@@ -661,6 +695,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
def _BeforeStart(timeout, priority):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
......@@ -670,14 +706,20 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELING
for op in job.ops))
self.assertRaises(IndexError, queue.GetNextUpdate)
def _AfterStart(op, cbs):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
jqueue._JobProcessor(queue, opexec, job)()
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assert_(jqueue._JobProcessor(queue, opexec, job)())
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
# Check result
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
......@@ -719,7 +761,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
def _AfterStart(op, cbs):
self.fail("Should not reach this")
opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
self.assert_(jqueue._JobProcessor(queue, opexec, job)())
......@@ -747,7 +789,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
opexec = _FakeExecOpCodeForProc(None, None)
opexec = _FakeExecOpCodeForProc(queue, None, None)
# Run one opcode
self.assertFalse(jqueue._JobProcessor(queue, opexec, job)())
......@@ -783,7 +825,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
# program was restarted
queue = _FakeQueueForProc()
opexec = _FakeExecOpCodeForProc(None, None)
opexec = _FakeExecOpCodeForProc(queue, None, None)
for job_id, successcount in [(30697, 1), (2552, 4), (12489, 9)]:
ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
......@@ -816,7 +858,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
queue = _FakeQueueForProc()
opexec = _FakeExecOpCodeForProc(None, None)
opexec = _FakeExecOpCodeForProc(queue, None, None)
for remaining in reversed(range(len(job.ops) - successcount)):
result = jqueue._JobProcessor(queue, opexec, job)()
......@@ -854,7 +896,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
ops = [opcodes.OpTestDummy(result="result", fail=False)]
queue = _FakeQueueForProc()
opexec = _FakeExecOpCodeForProc(None, None)
opexec = _FakeExecOpCodeForProc(queue, None, None)
# Create job
job = self._CreateJob(queue, 9571, ops)
......@@ -894,10 +936,14 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
job = self._CreateJob(queue, 29386, ops)
def _BeforeStart(timeout, priority):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
def _AfterStart(op, cbs):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
......@@ -905,15 +951,22 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
"too", "many", "arguments")
for (log_type, msg) in op.messages:
self.assertRaises(IndexError, queue.GetNextUpdate)
if log_type:
cbs.Feedback(log_type, msg)
else:
cbs.Feedback(msg)
# Check for job update without replication
self.assertEqual(queue.GetNextUpdate(), (job, False))
self.assertRaises(IndexError, queue.GetNextUpdate)
opexec = _FakeExecOpCodeForProc(_BeforeStart, _AfterStart)
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
for remaining in reversed(range(len(job.ops))):
self.assertRaises(IndexError, queue.GetNextUpdate)
result = jqueue._JobProcessor(queue, opexec, job)()
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
if remaining == 0:
# Last opcode
......@@ -924,6 +977,8 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assertRaises(IndexError, queue.GetNextUpdate)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
self.assertEqual(job.GetInfo(["opresult"]),
[[op.input.result for op in job.ops]])
......@@ -996,12 +1051,19 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
self.retries = 0
self.prev_tsop = None
self.prev_prio = None
self.prev_status = None
self.lock_acq_prio = None
self.gave_lock = None
self.done_lock_before_blocking = False
def _BeforeStart(self, timeout, priority):
job = self.job
# If status has changed, job must've been written
if self.prev_status != self.job.ops[self.curop].status:
self.assertEqual(self.queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, self.queue.GetNextUpdate)
self.assertFalse(self.queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
......@@ -1012,6 +1074,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
self.assertEqual(priority, job.ops[self.curop].priority)
self.gave_lock = True
self.lock_acq_prio = priority
if (self.curop == 3 and
job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
......@@ -1035,6 +1098,10 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
def _AfterStart(self, op, cbs):
job = self.job
# Setting to "running" requires an update
self.assertEqual(self.queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, self.queue.GetNextUpdate)
self.assertFalse(self.queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
......@@ -1045,6 +1112,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
def _NextOpcode(self):
self.curop = self.opcounter.next()
self.prev_prio = self.job.ops[self.curop].priority
self.prev_status = self.job.ops[self.curop].status
def _NewTimeoutStrategy(self):
job = self.job
......@@ -1110,30 +1178,62 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
self.opcounter = itertools.count(0)
opexec = _FakeExecOpCodeForProc(self._BeforeStart, self._AfterStart)
opexec = _FakeExecOpCodeForProc(self.queue, self._BeforeStart,
self._AfterStart)
tsf = self._NewTimeoutStrategy
self.assertFalse(self.done_lock_before_blocking)
for i in itertools.count(0):
while True:
proc = jqueue._JobProcessor(self.queue, opexec, job,
_timeout_strategy_factory=tsf)
self.assertRaises(IndexError, self.queue.GetNextUpdate)
if self.curop is not None:
self.prev_status = self.job.ops[self.curop].status
self.lock_acq_prio = None
result = proc(_nextop_fn=self._NextOpcode)
assert self.curop is not None
if result or self.gave_lock:
# Got lock and/or job is done, result must've been written
self.assertFalse(job.cur_opctx)
self.assertEqual(self.queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, self.queue.GetNextUpdate)
self.assertEqual(self.lock_acq_prio, job.ops[self.curop].priority)
self.assert_(job.ops[self.curop].exec_timestamp)
if result:
self.assertFalse(job.cur_opctx)
break
self.assertFalse(result)
if self.curop == 0:
self.assertEqual(job.ops[self.curop].start_timestamp,
job.start_timestamp)
if self.gave_lock:
self.assertFalse(job.cur_opctx)
# Opcode finished, but job not yet done
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
else:
# Did not get locks
self.assert_(job.cur_opctx)
self.assertEqual(job.cur_opctx._timeout_strategy._fn,
self.timeout_strategy.NextAttempt)
self.assertFalse(job.ops[self.curop].exec_timestamp)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
# If priority has changed since acquiring locks, the job must've been
# updated
if self.lock_acq_prio != job.ops[self.curop].priority:
self.assertEqual(self.queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, self.queue.GetNextUpdate)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(job.start_timestamp)
self.assertFalse(job.end_timestamp)
......@@ -1142,6 +1242,7 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
self.assertEqual(self.opcounter.next(), len(job.ops))
self.assert_(self.done_lock_before_blocking)
self.assertRaises(IndexError, self.queue.GetNextUpdate)
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
self.assertEqual(job.GetInfo(["opresult"]),
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment