Commit c0f6d0d8 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

jqueue: Add “writable” flag to memory objects

Basically only one instance of the job, the one being processed,
should be serialized to disk and replicated to other nodes. With
this flag assertions can be added in various places.
Signed-off-by: default avatarMichael Hanselmann <>
Reviewed-by: default avatarIustin Pop <>
parent b95479a5
......@@ -174,14 +174,15 @@ class _QueuedJob(object):
@ivar received_timestamp: the timestamp for when the job was received
@ivar start_timestmap: the timestamp for start of execution
@ivar end_timestamp: the timestamp for end of execution
@ivar writable: Whether the job is allowed to be modified
# pylint: disable-msg=W0212
__slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
"received_timestamp", "start_timestamp", "end_timestamp",
"__weakref__", "processor_lock"]
"__weakref__", "processor_lock", "writable"]
def __init__(self, queue, job_id, ops):
def __init__(self, queue, job_id, ops, writable):
"""Constructor for the _QueuedJob.
@type queue: L{JobQueue}
......@@ -191,6 +192,8 @@ class _QueuedJob(object):
@type ops: list
@param ops: the list of opcodes we hold, which will be encapsulated
in _QueuedOpCodes
@type writable: bool
@param writable: Whether job can be modified
if not ops:
......@@ -204,13 +207,14 @@ class _QueuedJob(object):
self.start_timestamp = None
self.end_timestamp = None
self._InitInMemory(self, writable)
def _InitInMemory(obj):
def _InitInMemory(obj, writable):
"""Initializes in-memory variables.
obj.writable = writable
obj.ops_iter = None
obj.cur_opctx = None
obj.processor_lock = threading.Lock()
......@@ -223,13 +227,15 @@ class _QueuedJob(object):
return "<%s at %#x>" % (" ".join(status), id(self))
def Restore(cls, queue, state):
def Restore(cls, queue, state, writable):
"""Restore a _QueuedJob from serialized state:
@type queue: L{JobQueue}
@param queue: to which queue the restored job belongs
@type state: dict
@param state: the serialized state
@type writable: bool
@param writable: Whether job can be modified
@rtype: _JobQueue
@return: the restored _JobQueue instance
......@@ -249,7 +255,7 @@ class _QueuedJob(object):
obj.log_serial = max(obj.log_serial, log_entry[0])
cls._InitInMemory(obj, writable)
return obj
......@@ -583,6 +589,8 @@ class _JobChangesChecker(object):
@param job: Job object
assert not job.writable, "Expected read-only job"
status = job.CalcStatus()
job_info = job.GetInfo(self._fields)
log_entries = job.GetLogEntries(self._prev_log_serial)
......@@ -1051,6 +1059,8 @@ class _JobProcessor(object):
opcount = len(job.ops)
assert job.writable, "Expected writable job"
# Don't do anything for finalized jobs
if job.CalcStatus() in constants.JOBS_FINALIZED:
return True
......@@ -1206,6 +1216,7 @@ class _JobProcessor(object):
return bool(waitjob)
assert job.writable, "Job became read-only while being processed"
......@@ -1823,6 +1834,7 @@ class JobQueue(object):
job = self._memcache.get(job_id, None)
if job:
logging.debug("Found job %s in memcache", job_id)
assert job.writable, "Found read-only job in memcache"
return job
......@@ -1841,11 +1853,13 @@ class JobQueue(object):
self._RenameFilesUnlocked([(old_path, new_path)])
return None
assert job.writable, "Job just loaded is not writable"
self._memcache[job_id] = job
logging.debug("Added job %s to the cache", job_id)
return job
def _LoadJobFromDisk(self, job_id, try_archived):
def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
"""Load the given job file from disk.
Given a job file, read, load and restore it in a _QueuedJob format.
......@@ -1858,14 +1872,15 @@ class JobQueue(object):
@return: either None or the job object
path_functions = [self._GetJobPath]
path_functions = [(self._GetJobPath, True)]
if try_archived:
path_functions.append((self._GetArchivedJobPath, False))
raw_data = None
writable_default = None
for fn in path_functions:
for (fn, writable_default) in path_functions:
filepath = fn(job_id)
logging.debug("Loading job from %s", filepath)
......@@ -1879,15 +1894,18 @@ class JobQueue(object):
if not raw_data:
return None
if writable is None:
writable = writable_default
data = serializer.LoadJson(raw_data)
job = _QueuedJob.Restore(self, data)
job = _QueuedJob.Restore(self, data, writable)
except Exception, err: # pylint: disable-msg=W0703
raise errors.JobFileCorrupted(err)
return job
def SafeLoadJobFromDisk(self, job_id, try_archived):
def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
"""Load the given job file from disk.
Given a job file, read, load and restore it in a _QueuedJob format.
......@@ -1903,7 +1921,7 @@ class JobQueue(object):
return self._LoadJobFromDisk(job_id, try_archived)
return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
except (errors.JobFileCorrupted, EnvironmentError):
logging.exception("Can't load/parse job %s", job_id)
return None
......@@ -1955,7 +1973,7 @@ class JobQueue(object):
if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
raise errors.JobQueueFull()
job = _QueuedJob(self, job_id, ops)
job = _QueuedJob(self, job_id, ops, True)
# Check priority
for idx, op in enumerate(job.ops):
......@@ -2036,7 +2054,9 @@ class JobQueue(object):
# Not using in-memory cache as doing so would require an exclusive lock
# Try to load from disk
job = self.SafeLoadJobFromDisk(job_id, True)
job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
assert not job.writable, "Got writable job"
if job:
return job.CalcStatus()
......@@ -2060,6 +2080,7 @@ class JobQueue(object):
if __debug__:
finalized = job.CalcStatus() in constants.JOBS_FINALIZED
assert (finalized ^ (job.end_timestamp is None))
assert job.writable, "Can't update read-only job"
filename = self._GetJobPath(
data = serializer.DumpJson(job.Serialize(), indent=False)
......@@ -2090,7 +2111,8 @@ class JobQueue(object):
as such by the clients
load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False)
load_fn = compat.partial(self.SafeLoadJobFromDisk, job_id, False,
helper = _WaitForJobChangesHelper()
......@@ -2115,6 +2137,8 @@ class JobQueue(object):
logging.debug("Job %s not found", job_id)
return (False, "Job %s not found" % job_id)
assert job.writable, "Can't cancel read-only job"
(success, msg) = job.Cancel()
if success:
......@@ -2137,6 +2161,8 @@ class JobQueue(object):
archive_jobs = []
rename_files = []
for job in jobs:
assert job.writable, "Can't archive read-only job"
if job.CalcStatus() not in constants.JOBS_FINALIZED:
logging.debug("Job %s is not yet done",
......@@ -43,6 +43,7 @@ import testutils
class _FakeJob:
def __init__(self, job_id, status): = job_id
self.writable = False
self._status = status
self._log = []
......@@ -279,7 +280,7 @@ class TestQueuedOpCode(unittest.TestCase):
class TestQueuedJob(unittest.TestCase):
def test(self):
self.assertRaises(errors.GenericError, jqueue._QueuedJob,
None, 1, [])
None, 1, [], False)
def testDefaults(self):
job_id = 4260
......@@ -289,6 +290,7 @@ class TestQueuedJob(unittest.TestCase):
def _Check(job):
self.assertEqual(, job_id)
self.assertEqual(job.log_serial, 0)
......@@ -305,12 +307,19 @@ class TestQueuedJob(unittest.TestCase):
[[op.input.Summary() for op in job.ops]])
job1 = jqueue._QueuedJob(None, job_id, ops)
job1 = jqueue._QueuedJob(None, job_id, ops, True)
job2 = jqueue._QueuedJob.Restore(None, job1.Serialize())
job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True)
self.assertEqual(job1.Serialize(), job2.Serialize())
def testWritable(self):
job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False)
job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True)
def testPriority(self):
job_id = 4283
ops = [
......@@ -323,7 +332,7 @@ class TestQueuedJob(unittest.TestCase):
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
job = jqueue._QueuedJob(None, job_id, ops)
job = jqueue._QueuedJob(None, job_id, ops, True)
self.assert_(compat.all(op.priority == constants.OP_PRIO_DEFAULT
for op in job.ops))
......@@ -409,7 +418,8 @@ class TestQueuedJob(unittest.TestCase):
def _NewJob():
job = jqueue._QueuedJob(None, 1,
[opcodes.OpTestDelay() for _ in range(10)])
[opcodes.OpTestDelay() for _ in range(10)],
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
self.assert_(compat.all(op.status == constants.OP_STATUS_QUEUED
for op in job.ops))
......@@ -550,7 +560,7 @@ class _FakeExecOpCodeForProc:
class _JobProcessorTestUtils:
def _CreateJob(self, queue, job_id, ops):
job = jqueue._QueuedJob(queue, job_id, ops)
job = jqueue._QueuedJob(queue, job_id, ops, True)
self.assertEqual(len(ops), len(job.ops))
......@@ -972,7 +982,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
# Serialize and restore (simulates program restart)
newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
self._TestPartial(newjob, successcount)
......@@ -1016,7 +1026,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self.assertRaises(IndexError, queue.GetNextUpdate)
# ... also after being restored
job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
# Calling the processor on a finished job should be a no-op
self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
self.assertRaises(IndexError, queue.GetNextUpdate)
......@@ -1117,7 +1127,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
self._CheckLogMessages(job, logmsgcount)
# Serialize and restore (simulates program restart)
newjob = jqueue._QueuedJob.Restore(queue, job.Serialize())
newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True)
self._CheckLogMessages(newjob, logmsgcount)
# Check each message
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment