diff --git a/lib/ht.py b/lib/ht.py index 14b710e9c0146afc72f4cc86b280c8542cc2e25d..ff9d6da24f11fef0a7704703e19ce307dd0c446d 100644 --- a/lib/ht.py +++ b/lib/ht.py @@ -22,6 +22,7 @@ """Module implementing the parameter types code.""" import re +import operator from ganeti import compat from ganeti import utils @@ -297,6 +298,10 @@ TPositiveInt = \ TStrictPositiveInt = \ TAnd(TInt, WithDesc("GreaterThanZero")(lambda v: v > 0)) +#: a strictly negative integer (0 > value) +TStrictNegativeInt = \ + TAnd(TInt, WithDesc("LessThanZero")(compat.partial(operator.gt, 0))) + #: a positive float TPositiveFloat = \ TAnd(TFloat, WithDesc("EqualGreaterZero")(lambda v: v >= 0.0)) @@ -308,6 +313,9 @@ TJobId = TOr(TPositiveInt, #: Number TNumber = TOr(TInt, TFloat) +#: Relative job ID +TRelativeJobId = WithDesc("RelativeJobId")(TStrictNegativeInt) + def TListOf(my_type): """Checks if a given value is a list with all elements of the same type. diff --git a/lib/jqueue.py b/lib/jqueue.py index 67691241b71a84854f9f3ee77c55785728c74a72..3b1d61a6690774e260dcadc6bf68511ea34b4fea 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -1982,6 +1982,13 @@ class JobQueue(object): raise errors.GenericError("Opcode %s has invalid priority %s, allowed" " are %s" % (idx, op.priority, allowed)) + dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None) + if not opcodes.TNoRelativeJobDependencies(dependencies): + raise errors.GenericError("Opcode %s has invalid dependencies, must" + " match %s: %s" % + (idx, opcodes.TNoRelativeJobDependencies, + dependencies)) + # Write to disk self.UpdateJobUnlocked(job) @@ -2000,7 +2007,7 @@ class JobQueue(object): @see: L{_SubmitJobUnlocked} """ - job_id = self._NewSerialsUnlocked(1)[0] + (job_id, ) = self._NewSerialsUnlocked(1) self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)]) return job_id @@ -2012,24 +2019,92 @@ class JobQueue(object): @see: L{_SubmitJobUnlocked} """ - results = [] - added_jobs = [] all_job_ids = self._NewSerialsUnlocked(len(jobs)) - for job_id, ops in zip(all_job_ids, jobs): - try: - added_jobs.append(self._SubmitJobUnlocked(job_id, ops)) - status = True - data = job_id - except errors.GenericError, err: - data = ("%s; opcodes %s" % - (err, utils.CommaJoin(op.Summary() for op in ops))) - status = False - results.append((status, data)) + + (results, added_jobs) = \ + self._SubmitManyJobsUnlocked(jobs, all_job_ids, []) self._EnqueueJobs(added_jobs) return results + @staticmethod + def _FormatSubmitError(msg, ops): + """Formats errors which occurred while submitting a job. + + """ + return ("%s; opcodes %s" % + (msg, utils.CommaJoin(op.Summary() for op in ops))) + + @staticmethod + def _ResolveJobDependencies(resolve_fn, deps): + """Resolves relative job IDs in dependencies. + + @type resolve_fn: callable + @param resolve_fn: Function to resolve a relative job ID + @type deps: list + @param deps: Dependencies + @rtype: list + @return: Resolved dependencies + + """ + result = [] + + for (dep_job_id, dep_status) in deps: + if ht.TRelativeJobId(dep_job_id): + assert ht.TInt(dep_job_id) and dep_job_id < 0 + try: + job_id = resolve_fn(dep_job_id) + except IndexError: + # Abort + return (False, "Unable to resolve relative job ID %s" % dep_job_id) + else: + job_id = dep_job_id + + result.append((job_id, dep_status)) + + return (True, result) + + def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids): + """Create and store multiple jobs. + + @see: L{_SubmitJobUnlocked} + + """ + results = [] + added_jobs = [] + + def resolve_fn(job_idx, reljobid): + assert reljobid < 0 + return (previous_job_ids + job_ids[:job_idx])[reljobid] + + for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)): + for op in ops: + if getattr(op, opcodes.DEPEND_ATTR, None): + (status, data) = \ + self._ResolveJobDependencies(compat.partial(resolve_fn, idx), + op.depends) + if not status: + # Abort resolving dependencies + assert ht.TNonEmptyString(data), "No error message" + break + # Use resolved dependencies + op.depends = data + else: + try: + job = self._SubmitJobUnlocked(job_id, ops) + except errors.GenericError, err: + status = False + data = self._FormatSubmitError(str(err), ops) + else: + status = True + data = job_id + added_jobs.append(job) + + results.append((status, data)) + + return (results, added_jobs) + def _EnqueueJobs(self, jobs): """Helper function to add jobs to worker pool's queue. diff --git a/lib/opcodes.py b/lib/opcodes.py index 3f7938ba6d204fc2f5f6d44bac62bd69613c19e2..bb9ebe5ab094279ac035f5f7da6c5b135131478f 100644 --- a/lib/opcodes.py +++ b/lib/opcodes.py @@ -393,6 +393,30 @@ class BaseOpCode(object): errors.ECODE_INVAL) +def _BuildJobDepCheck(relative): + """Builds check for job dependencies (L{DEPEND_ATTR}). + + @type relative: bool + @param relative: Whether to accept relative job IDs (negative) + @rtype: callable + + """ + if relative: + job_id = ht.TOr(ht.TJobId, ht.TRelativeJobId) + else: + job_id = ht.TJobId + + job_dep = \ + ht.TAnd(ht.TIsLength(2), + ht.TItems([job_id, + ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))])) + + return ht.TOr(ht.TNone, ht.TListOf(job_dep)) + + +TNoRelativeJobDependencies = _BuildJobDepCheck(False) + + class OpCode(BaseOpCode): """Abstract OpCode. @@ -416,17 +440,14 @@ class OpCode(BaseOpCode): # pylint: disable-msg=E1101 # as OP_ID is dynamically defined WITH_LU = True - _T_JOB_DEP = \ - ht.TAnd(ht.TIsLength(2), - ht.TItems([ht.TJobId, - ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))])) OP_PARAMS = [ ("dry_run", None, ht.TMaybeBool, "Run checks only, don't execute"), ("debug_level", None, ht.TOr(ht.TNone, ht.TPositiveInt), "Debug level"), ("priority", constants.OP_PRIO_DEFAULT, ht.TElemOf(constants.OP_PRIO_SUBMIT_VALID), "Opcode priority"), - (DEPEND_ATTR, None, ht.TOr(ht.TNone, ht.TListOf(_T_JOB_DEP)), - "Job dependencies"), + (DEPEND_ATTR, None, _BuildJobDepCheck(True), + "Job dependencies; if used through ``SubmitManyJobs`` relative (negative)" + " job IDs can be used"), ] def __getstate__(self): diff --git a/test/ganeti.ht_unittest.py b/test/ganeti.ht_unittest.py index 567c53c24db871833c9a92570e711668f5ed80b1..59d97576de59cc54a0cd5b6b758b6f03fa9e45c6 100755 --- a/test/ganeti.ht_unittest.py +++ b/test/ganeti.ht_unittest.py @@ -250,6 +250,15 @@ class TestTypeChecks(unittest.TestCase): None, [], {}, object()]: self.assertFalse(ht.TJobId(i)) + def testRelativeJobId(self): + for i in [-1, -93, -4395]: + self.assertTrue(ht.TRelativeJobId(i)) + self.assertFalse(ht.TRelativeJobId(str(i))) + + for i in [0, 1, 2, 10, 9289, "", "0", "-1", "-999"]: + self.assertFalse(ht.TRelativeJobId(i)) + self.assertFalse(ht.TRelativeJobId(str(i))) + def testItems(self): self.assertRaises(AssertionError, ht.TItems, []) diff --git a/test/ganeti.opcodes_unittest.py b/test/ganeti.opcodes_unittest.py index d78b6aca3be00f09040546188fe575d616182d8b..d6bdfb4134b1ef28893ccf559ddb282d5ea405ee 100755 --- a/test/ganeti.opcodes_unittest.py +++ b/test/ganeti.opcodes_unittest.py @@ -273,5 +273,37 @@ class TestOpcodes(unittest.TestCase): self.assertEqual(op.debug_level, 123) +class TestOpcodeDepends(unittest.TestCase): + def test(self): + check_relative = opcodes._BuildJobDepCheck(True) + check_norelative = opcodes.TNoRelativeJobDependencies + + for fn in [check_relative, check_norelative]: + self.assertTrue(fn(None)) + self.assertTrue(fn([])) + self.assertTrue(fn([(1, [])])) + self.assertTrue(fn([(719833, [])])) + self.assertTrue(fn([("24879", [])])) + self.assertTrue(fn([(2028, [constants.JOB_STATUS_ERROR])])) + self.assertTrue(fn([ + (2028, [constants.JOB_STATUS_ERROR]), + (18750, []), + (5063, [constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR]), + ])) + + self.assertFalse(fn(1)) + self.assertFalse(fn([(9, )])) + self.assertFalse(fn([(15194, constants.JOB_STATUS_ERROR)])) + + for i in [ + [(-1, [])], + [(-27740, [constants.JOB_STATUS_CANCELED, constants.JOB_STATUS_ERROR]), + (-1, [constants.JOB_STATUS_ERROR]), + (9921, [])], + ]: + self.assertTrue(check_relative(i)) + self.assertFalse(check_norelative(i)) + + if __name__ == "__main__": testutils.GanetiTestProgram()