From b247c6fca8c673282a0210cf90e5a0921c24fabc Mon Sep 17 00:00:00 2001
From: Michael Hanselmann <hansmi@google.com>
Date: Fri, 8 Jul 2011 23:49:03 +0200
Subject: [PATCH] jqueue: Implement submitting multiple jobs with dependencies
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

With this change users of the β€œSubmitManyJobs” interface can use
relative job dependencies. Relative job IDs in dependencies are resolved
before handing the job off to the workerpool.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Iustin Pop <iustin@google.com>
---
 lib/ht.py                       |   8 +++
 lib/jqueue.py                   | 101 ++++++++++++++++++++++++++++----
 lib/opcodes.py                  |  33 +++++++++--
 test/ganeti.ht_unittest.py      |   9 +++
 test/ganeti.opcodes_unittest.py |  32 ++++++++++
 5 files changed, 164 insertions(+), 19 deletions(-)

diff --git a/lib/ht.py b/lib/ht.py
index 14b710e9c..ff9d6da24 100644
--- a/lib/ht.py
+++ b/lib/ht.py
@@ -22,6 +22,7 @@
 """Module implementing the parameter types code."""
 
 import re
+import operator
 
 from ganeti import compat
 from ganeti import utils
@@ -297,6 +298,10 @@ TPositiveInt = \
 TStrictPositiveInt = \
   TAnd(TInt, WithDesc("GreaterThanZero")(lambda v: v > 0))
 
+#: a strictly negative integer (0 > value)
+TStrictNegativeInt = \
+  TAnd(TInt, WithDesc("LessThanZero")(compat.partial(operator.gt, 0)))
+
 #: a positive float
 TPositiveFloat = \
   TAnd(TFloat, WithDesc("EqualGreaterZero")(lambda v: v >= 0.0))
@@ -308,6 +313,9 @@ TJobId = TOr(TPositiveInt,
 #: Number
 TNumber = TOr(TInt, TFloat)
 
+#: Relative job ID
+TRelativeJobId = WithDesc("RelativeJobId")(TStrictNegativeInt)
+
 
 def TListOf(my_type):
   """Checks if a given value is a list with all elements of the same type.
diff --git a/lib/jqueue.py b/lib/jqueue.py
index 67691241b..3b1d61a66 100644
--- a/lib/jqueue.py
+++ b/lib/jqueue.py
@@ -1982,6 +1982,13 @@ class JobQueue(object):
         raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
                                   " are %s" % (idx, op.priority, allowed))
 
+      dependencies = getattr(op.input, opcodes.DEPEND_ATTR, None)
+      if not opcodes.TNoRelativeJobDependencies(dependencies):
+        raise errors.GenericError("Opcode %s has invalid dependencies, must"
+                                  " match %s: %s" %
+                                  (idx, opcodes.TNoRelativeJobDependencies,
+                                   dependencies))
+
     # Write to disk
     self.UpdateJobUnlocked(job)
 
@@ -2000,7 +2007,7 @@ class JobQueue(object):
     @see: L{_SubmitJobUnlocked}
 
     """
-    job_id = self._NewSerialsUnlocked(1)[0]
+    (job_id, ) = self._NewSerialsUnlocked(1)
     self._EnqueueJobs([self._SubmitJobUnlocked(job_id, ops)])
     return job_id
 
@@ -2012,24 +2019,92 @@ class JobQueue(object):
     @see: L{_SubmitJobUnlocked}
 
     """
-    results = []
-    added_jobs = []
     all_job_ids = self._NewSerialsUnlocked(len(jobs))
-    for job_id, ops in zip(all_job_ids, jobs):
-      try:
-        added_jobs.append(self._SubmitJobUnlocked(job_id, ops))
-        status = True
-        data = job_id
-      except errors.GenericError, err:
-        data = ("%s; opcodes %s" %
-                (err, utils.CommaJoin(op.Summary() for op in ops)))
-        status = False
-      results.append((status, data))
+
+    (results, added_jobs) = \
+      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
 
     self._EnqueueJobs(added_jobs)
 
     return results
 
+  @staticmethod
+  def _FormatSubmitError(msg, ops):
+    """Formats errors which occurred while submitting a job.
+
+    """
+    return ("%s; opcodes %s" %
+            (msg, utils.CommaJoin(op.Summary() for op in ops)))
+
+  @staticmethod
+  def _ResolveJobDependencies(resolve_fn, deps):
+    """Resolves relative job IDs in dependencies.
+
+    @type resolve_fn: callable
+    @param resolve_fn: Function to resolve a relative job ID
+    @type deps: list
+    @param deps: Dependencies
+    @rtype: list
+    @return: Resolved dependencies
+
+    """
+    result = []
+
+    for (dep_job_id, dep_status) in deps:
+      if ht.TRelativeJobId(dep_job_id):
+        assert ht.TInt(dep_job_id) and dep_job_id < 0
+        try:
+          job_id = resolve_fn(dep_job_id)
+        except IndexError:
+          # Abort
+          return (False, "Unable to resolve relative job ID %s" % dep_job_id)
+      else:
+        job_id = dep_job_id
+
+      result.append((job_id, dep_status))
+
+    return (True, result)
+
+  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
+    """Create and store multiple jobs.
+
+    @see: L{_SubmitJobUnlocked}
+
+    """
+    results = []
+    added_jobs = []
+
+    def resolve_fn(job_idx, reljobid):
+      assert reljobid < 0
+      return (previous_job_ids + job_ids[:job_idx])[reljobid]
+
+    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
+      for op in ops:
+        if getattr(op, opcodes.DEPEND_ATTR, None):
+          (status, data) = \
+            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
+                                         op.depends)
+          if not status:
+            # Abort resolving dependencies
+            assert ht.TNonEmptyString(data), "No error message"
+            break
+          # Use resolved dependencies
+          op.depends = data
+      else:
+        try:
+          job = self._SubmitJobUnlocked(job_id, ops)
+        except errors.GenericError, err:
+          status = False
+          data = self._FormatSubmitError(str(err), ops)
+        else:
+          status = True
+          data = job_id
+          added_jobs.append(job)
+
+      results.append((status, data))
+
+    return (results, added_jobs)
+
   def _EnqueueJobs(self, jobs):
     """Helper function to add jobs to worker pool's queue.
 
diff --git a/lib/opcodes.py b/lib/opcodes.py
index 3f7938ba6..bb9ebe5ab 100644
--- a/lib/opcodes.py
+++ b/lib/opcodes.py
@@ -393,6 +393,30 @@ class BaseOpCode(object):
                                      errors.ECODE_INVAL)
 
 
+def _BuildJobDepCheck(relative):
+  """Builds check for job dependencies (L{DEPEND_ATTR}).
+
+  @type relative: bool
+  @param relative: Whether to accept relative job IDs (negative)
+  @rtype: callable
+
+  """
+  if relative:
+    job_id = ht.TOr(ht.TJobId, ht.TRelativeJobId)
+  else:
+    job_id = ht.TJobId
+
+  job_dep = \
+    ht.TAnd(ht.TIsLength(2),
+            ht.TItems([job_id,
+                       ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))]))
+
+  return ht.TOr(ht.TNone, ht.TListOf(job_dep))
+
+
+TNoRelativeJobDependencies = _BuildJobDepCheck(False)
+
+
 class OpCode(BaseOpCode):
   """Abstract OpCode.
 
@@ -416,17 +440,14 @@ class OpCode(BaseOpCode):
   # pylint: disable-msg=E1101
   # as OP_ID is dynamically defined
   WITH_LU = True
-  _T_JOB_DEP = \
-    ht.TAnd(ht.TIsLength(2),
-            ht.TItems([ht.TJobId,
-                       ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))]))
   OP_PARAMS = [
     ("dry_run", None, ht.TMaybeBool, "Run checks only, don't execute"),
     ("debug_level", None, ht.TOr(ht.TNone, ht.TPositiveInt), "Debug level"),
     ("priority", constants.OP_PRIO_DEFAULT,
      ht.TElemOf(constants.OP_PRIO_SUBMIT_VALID), "Opcode priority"),
-    (DEPEND_ATTR, None, ht.TOr(ht.TNone, ht.TListOf(_T_JOB_DEP)),
-     "Job dependencies"),
+    (DEPEND_ATTR, None, _BuildJobDepCheck(True),
+     "Job dependencies; if used through ``SubmitManyJobs`` relative (negative)"
+     " job IDs can be used"),
     ]
 
   def __getstate__(self):
diff --git a/test/ganeti.ht_unittest.py b/test/ganeti.ht_unittest.py
index 567c53c24..59d97576d 100755
--- a/test/ganeti.ht_unittest.py
+++ b/test/ganeti.ht_unittest.py
@@ -250,6 +250,15 @@ class TestTypeChecks(unittest.TestCase):
               None, [], {}, object()]:
       self.assertFalse(ht.TJobId(i))
 
+  def testRelativeJobId(self):
+    for i in [-1, -93, -4395]:
+      self.assertTrue(ht.TRelativeJobId(i))
+      self.assertFalse(ht.TRelativeJobId(str(i)))
+
+    for i in [0, 1, 2, 10, 9289, "", "0", "-1", "-999"]:
+      self.assertFalse(ht.TRelativeJobId(i))
+      self.assertFalse(ht.TRelativeJobId(str(i)))
+
   def testItems(self):
     self.assertRaises(AssertionError, ht.TItems, [])
 
diff --git a/test/ganeti.opcodes_unittest.py b/test/ganeti.opcodes_unittest.py
index d78b6aca3..d6bdfb413 100755
--- a/test/ganeti.opcodes_unittest.py
+++ b/test/ganeti.opcodes_unittest.py
@@ -273,5 +273,37 @@ class TestOpcodes(unittest.TestCase):
     self.assertEqual(op.debug_level, 123)
 
 
+class TestOpcodeDepends(unittest.TestCase):
+  def test(self):
+    check_relative = opcodes._BuildJobDepCheck(True)
+    check_norelative = opcodes.TNoRelativeJobDependencies
+
+    for fn in [check_relative, check_norelative]:
+      self.assertTrue(fn(None))
+      self.assertTrue(fn([]))
+      self.assertTrue(fn([(1, [])]))
+      self.assertTrue(fn([(719833, [])]))
+      self.assertTrue(fn([("24879", [])]))
+      self.assertTrue(fn([(2028, [constants.JOB_STATUS_ERROR])]))
+      self.assertTrue(fn([
+        (2028, [constants.JOB_STATUS_ERROR]),
+        (18750, []),
+        (5063, [constants.JOB_STATUS_SUCCESS, constants.JOB_STATUS_ERROR]),
+        ]))
+
+      self.assertFalse(fn(1))
+      self.assertFalse(fn([(9, )]))
+      self.assertFalse(fn([(15194, constants.JOB_STATUS_ERROR)]))
+
+    for i in [
+      [(-1, [])],
+      [(-27740, [constants.JOB_STATUS_CANCELED, constants.JOB_STATUS_ERROR]),
+       (-1, [constants.JOB_STATUS_ERROR]),
+       (9921, [])],
+      ]:
+      self.assertTrue(check_relative(i))
+      self.assertFalse(check_norelative(i))
+
+
 if __name__ == "__main__":
   testutils.GanetiTestProgram()
-- 
GitLab