diff --git a/lib/jqueue.py b/lib/jqueue.py index d16b8f701ccdeadf314537bed766b54b3f58d078..0fad78df98c90cfde39463b4b7ee1460d1c01d33 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -210,7 +210,7 @@ class _QueuedJob(object): # pylint: disable=W0212 __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", "received_timestamp", "start_timestamp", "end_timestamp", - "__weakref__", "processor_lock", "writable"] + "__weakref__", "processor_lock", "writable", "archived"] def __init__(self, queue, job_id, ops, writable): """Constructor for the _QueuedJob. @@ -236,9 +236,12 @@ class _QueuedJob(object): self.received_timestamp = TimeStampNow() self.start_timestamp = None self.end_timestamp = None + self.archived = False self._InitInMemory(self, writable) + assert not self.archived, "New jobs can not be marked as archived" + @staticmethod def _InitInMemory(obj, writable): """Initializes in-memory variables. @@ -262,7 +265,7 @@ class _QueuedJob(object): return "<%s at %#x>" % (" ".join(status), id(self)) @classmethod - def Restore(cls, queue, state, writable): + def Restore(cls, queue, state, writable, archived): """Restore a _QueuedJob from serialized state: @type queue: L{JobQueue} @@ -271,6 +274,8 @@ class _QueuedJob(object): @param state: the serialized state @type writable: bool @param writable: Whether job can be modified + @type archived: bool + @param archived: Whether job was already archived @rtype: _JobQueue @return: the restored _JobQueue instance @@ -281,6 +286,7 @@ class _QueuedJob(object): obj.received_timestamp = state.get("received_timestamp", None) obj.start_timestamp = state.get("start_timestamp", None) obj.end_timestamp = state.get("end_timestamp", None) + obj.archived = archived obj.ops = [] obj.log_serial = 0 @@ -1939,15 +1945,15 @@ class JobQueue(object): @return: either None or the job object """ - path_functions = [(self._GetJobPath, True)] + path_functions = [(self._GetJobPath, False)] if try_archived: - path_functions.append((self._GetArchivedJobPath, False)) + path_functions.append((self._GetArchivedJobPath, True)) raw_data = None - writable_default = None + archived = None - for (fn, writable_default) in path_functions: + for (fn, archived) in path_functions: filepath = fn(job_id) logging.debug("Loading job from %s", filepath) try: @@ -1962,11 +1968,11 @@ class JobQueue(object): return None if writable is None: - writable = writable_default + writable = not archived try: data = serializer.LoadJson(raw_data) - job = _QueuedJob.Restore(self, data, writable) + job = _QueuedJob.Restore(self, data, writable, archived) except Exception, err: # pylint: disable=W0703 raise errors.JobFileCorrupted(err) @@ -2229,6 +2235,7 @@ class JobQueue(object): finalized = job.CalcStatus() in constants.JOBS_FINALIZED assert (finalized ^ (job.end_timestamp is None)) assert job.writable, "Can't update read-only job" + assert not job.archived, "Can't update archived job" filename = self._GetJobPath(job.id) data = serializer.DumpJson(job.Serialize()) @@ -2286,6 +2293,7 @@ class JobQueue(object): return (False, "Job %s not found" % job_id) assert job.writable, "Can't cancel read-only job" + assert not job.archived, "Can't cancel archived job" (success, msg) = job.Cancel() @@ -2310,6 +2318,7 @@ class JobQueue(object): rename_files = [] for job in jobs: assert job.writable, "Can't archive read-only job" + assert not job.archived, "Can't cancel archived job" if job.CalcStatus() not in constants.JOBS_FINALIZED: logging.debug("Job %s is not yet done", job.id) diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py index 65ded1a0c35d8340bf13c99b096fec13eb859bff..41b4c4be89ffbcc07b7b26bcd221ff7cd66ba98a 100755 --- a/test/ganeti.jqueue_unittest.py +++ b/test/ganeti.jqueue_unittest.py @@ -309,10 +309,11 @@ class TestQueuedJob(unittest.TestCase): ["unknown-field"]) self.assertEqual(job.GetInfo(["summary"]), [[op.input.Summary() for op in job.ops]]) + self.assertFalse(job.archived) job1 = jqueue._QueuedJob(None, job_id, ops, True) _Check(job1) - job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True) + job2 = jqueue._QueuedJob.Restore(None, job1.Serialize(), True, False) _Check(job2) self.assertEqual(job1.Serialize(), job2.Serialize()) @@ -323,6 +324,16 @@ class TestQueuedJob(unittest.TestCase): job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], True) self.assertTrue(job.writable) + def testArchived(self): + job = jqueue._QueuedJob(None, 1, [opcodes.OpTestDelay()], False) + self.assertFalse(job.archived) + + newjob = jqueue._QueuedJob.Restore(None, job.Serialize(), True, True) + self.assertTrue(newjob.archived) + + newjob2 = jqueue._QueuedJob.Restore(None, newjob.Serialize(), True, False) + self.assertFalse(newjob2.archived) + def testPriority(self): job_id = 4283 ops = [ @@ -994,7 +1005,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assert_(job.ops_iter) # Serialize and restore (simulates program restart) - newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True) + newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False) self.assertFalse(newjob.ops_iter) self._TestPartial(newjob, successcount) @@ -1039,7 +1050,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assertRaises(IndexError, queue.GetNextUpdate) # ... also after being restored - job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True) + job2 = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False) # Calling the processor on a finished job should be a no-op self.assertEqual(jqueue._JobProcessor(queue, opexec, job2)(), jqueue._JobProcessor.FINISHED) @@ -1141,7 +1152,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self._CheckLogMessages(job, logmsgcount) # Serialize and restore (simulates program restart) - newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True) + newjob = jqueue._QueuedJob.Restore(queue, job.Serialize(), True, False) self._CheckLogMessages(newjob, logmsgcount) # Check each message