Commit b247c6fc authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

jqueue: Implement submitting multiple jobs with dependencies



With this change users of the “SubmitManyJobs” interface can use
relative job dependencies. Relative job IDs in dependencies are resolved
before handing the job off to the workerpool.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 7bb60c2d
......@@ -22,6 +22,7 @@
"""Module implementing the parameter types code."""
import re
import operator
from ganeti import compat
from ganeti import utils
......@@ -297,6 +298,10 @@ TPositiveInt = \
TStrictPositiveInt = \
TAnd(TInt, WithDesc("GreaterThanZero")(lambda v: v > 0))
#: a strictly negative integer (0 > value)
TStrictNegativeInt = \
TAnd(TInt, WithDesc("LessThanZero")(compat.partial(operator.gt, 0)))
#: a positive float
TPositiveFloat = \
TAnd(TFloat, WithDesc("EqualGreaterZero")(lambda v: v >= 0.0))
......@@ -308,6 +313,9 @@ TJobId = TOr(TPositiveInt,
#: Number
TNumber = TOr(TInt, TFloat)
#: Relative job ID
TRelativeJobId = WithDesc("RelativeJobId")(TStrictNegativeInt)
def TListOf(my_type):
"""Checks if a given value is a list with all elements of the same type.
......
......@@ -1982,6 +1982,13 @@ class JobQueue(object):
raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
" are %s" % (idx, op.priority, allowed))
dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
if not opcodes.TNoRelativeJobDependencies(dependencies):
raise errors.GenericError("Opcode %s has invalid dependencies, must"
" match %s: %s" %
(idx, opcodes.TNoRelativeJobDependencies,
dependencies))
# Write to disk
self.UpdateJobUnlocked(job)
......@@ -2000,7 +2007,7 @@ class JobQueue(object):
@see: L{_SubmitJobUnlocked}
"""
job_id = self._NewSerialsUnlocked(1)[0]
(job_id, ) = self._NewSerialsUnlocked(1)
self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
return job_id
......@@ -2012,24 +2019,92 @@ class JobQueue(object):
@see: L{_SubmitJobUnlocked}
"""
results = []
added_jobs = []
all_job_ids = self._NewSerialsUnlocked(len(jobs))
for job_id, ops in zip(all_job_ids, jobs):
try:
added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
status = True
data = job_id
except errors.GenericError, err:
data = ("%s; opcodes %s" %
(err, utils.CommaJoin(op.Summary() for op in ops)))
status = False
results.append((status, data))
(results, added_jobs) = \
self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
self._EnqueueJobs(added_jobs)
return results
@staticmethod
def _FormatSubmitError(msg, ops):
"""Formats errors which occurred while submitting a job.
"""
return ("%s; opcodes %s" %
(msg, utils.CommaJoin(op.Summary() for op in ops)))
@staticmethod
def _ResolveJobDependencies(resolve_fn, deps):
"""Resolves relative job IDs in dependencies.
@type resolve_fn: callable
@param resolve_fn: Function to resolve a relative job ID
@type deps: list
@param deps: Dependencies
@rtype: list
@return: Resolved dependencies
"""
result = []
for (dep_job_id, dep_status) in deps:
if ht.TRelativeJobId(dep_job_id):
assert ht.TInt(dep_job_id) and dep_job_id < 0
try:
job_id = resolve_fn(dep_job_id)
except IndexError:
# Abort
return (False, "Unable to resolve relative job ID %s" % dep_job_id)
else:
job_id = dep_job_id
result.append((job_id, dep_status))
return (True, result)
def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
"""Create and store multiple jobs.
@see: L{_SubmitJobUnlocked}
"""
results = []
added_jobs = []
def resolve_fn(job_idx, reljobid):
assert reljobid < 0
return (previous_job_ids + job_ids[:job_idx])[reljobid]
for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
for op in ops:
if getattr(op, opcodes.DEPEND_ATTR, None):
(status, data) = \
self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
op.depends)
if not status:
# Abort resolving dependencies
assert ht.TNonEmptyString(data), "No error message"
break
# Use resolved dependencies
op.depends = data
else:
try:
job = self._SubmitJobUnlocked(job_id, ops)
except errors.GenericError, err:
status = False
data = self._FormatSubmitError(str(err), ops)
else:
status = True
data = job_id
added_jobs.append(job)
results.append((status, data))
return (results, added_jobs)
def _EnqueueJobs(self, jobs):
"""Helper function to add jobs to worker pool's queue.
......
......@@ -393,6 +393,30 @@ class BaseOpCode(object):
errors.ECODE_INVAL)
def _BuildJobDepCheck(relative):
"""Builds check for job dependencies (L{DEPEND_ATTR}).
@type relative: bool
@param relative: Whether to accept relative job IDs (negative)
@rtype: callable
"""
if relative:
job_id = ht.TOr(ht.TJobId, ht.TRelativeJobId)
else:
job_id = ht.TJobId
job_dep = \
ht.TAnd(ht.TIsLength(2),
ht.TItems([job_id,
ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))]))
return ht.TOr(ht.TNone, ht.TListOf(job_dep))
TNoRelativeJobDependencies = _BuildJobDepCheck(False)
class OpCode(BaseOpCode):
"""Abstract OpCode.
......@@ -416,17 +440,14 @@ class OpCode(BaseOpCode):
# pylint: disable-msg=E1101
# as OP_ID is dynamically defined
WITH_LU = True
_T_JOB_DEP = \
ht.TAnd(ht.TIsLength(2),
ht.TItems([ht.TJobId,
ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))]))
OP_PARAMS = [
("dry_run", None, ht.TMaybeBool, "Run checks only, don't execute"),
("debug_level", None, ht.TOr(ht.TNone, ht.TPositiveInt), "Debug level"),
("priority", constants.OP_PRIO_DEFAULT,
ht.TElemOf(constants.OP_PRIO_SUBMIT_VALID), "Opcode priority"),
(DEPEND_ATTR, None, ht.TOr(ht.TNone, ht.TListOf(_T_JOB_DEP)),
"Job dependencies"),
(DEPEND_ATTR, None, _BuildJobDepCheck(True),
"Job dependencies; if used through ``SubmitManyJobs`` relative (negative)"
" job IDs can be used"),
]
def __getstate__(self):
......
......@@ -250,6 +250,15 @@ class TestTypeChecks(unittest.TestCase):
None, [], {}, object()]:
self.assertFalse(ht.TJobId(i))
def testRelativeJobId(self):
for i in [-1, -93, -4395]:
self.assertTrue(ht.TRelativeJobId(i))
self.assertFalse(ht.TRelativeJobId(str(i)))
for i in [0, 1, 2, 10, 9289, "", "0", "-1", "-999"]:
self.assertFalse(ht.TRelativeJobId(i))
self.assertFalse(ht.TRelativeJobId(str(i)))
def testItems(self):
self.assertRaises(AssertionError, ht.TItems, [])
......
......@@ -273,5 +273,37 @@ class TestOpcodes(unittest.TestCase):
self.assertEqual(op.debug_level, 123)
class TestOpcodeDepends(unittest.TestCase):
def test(self):
check_relative = opcodes._BuildJobDepCheck(True)
check_norelative = opcodes.TNoRelativeJobDependencies
for fn in [check_relative, check_norelative]:
self.assertTrue(fn(None))
self.assertTrue(fn([]))
self.assertTrue(fn([(1, [])]))
self.assertTrue(fn([(719833, [])]))
self.assertTrue(fn([("24879", [])]))
self.assertTrue(fn([(2028, [constants.JOB_STATUS_ERROR])]))
self.assertTrue(fn([
(2028, [constants.JOB_STATUS_ERROR]),
(18750, []),
(5063, [constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR]),
]))
self.assertFalse(fn(1))
self.assertFalse(fn([(9, )]))
self.assertFalse(fn([(15194, constants.JOB_STATUS_ERROR)]))
for i in [
[(-1, [])],
[(-27740, [constants.JOB_STATUS_CANCELED, constants.JOB_STATUS_ERROR]),
(-1, [constants.JOB_STATUS_ERROR]),
(9921, [])],
]:
self.assertTrue(check_relative(i))
self.assertFalse(check_norelative(i))
if __name__ == "__main__":
testutils.GanetiTestProgram()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment