diff --git a/lib/cmdlib.py b/lib/cmdlib.py
index af51ec209eead156ac0988f2564e65bdec04b934..a35a54b565f74b1705e73d398bc9be54819cf82d 100644
--- a/lib/cmdlib.py
+++ b/lib/cmdlib.py
@@ -74,7 +74,28 @@ def _SupportsOob(cfg, node):
   return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
 
 
-# End types
+class ResultWithJobs:
+  """Data container for LU results with jobs.
+
+  Instances of this class returned from L{LogicalUnit.Exec} will be recognized
+  by L{mcpu.Processor._ProcessResult}. The latter will then submit the jobs
+  contained in the C{jobs} attribute and include the job IDs in the opcode
+  result.
+
+  """
+  def __init__(self, jobs, **kwargs):
+    """Initializes this class.
+
+    Additional return values can be specified as keyword arguments.
+
+    @type jobs: list of lists of L{opcode.OpCode}
+    @param jobs: A list of lists of opcode objects
+
+    """
+    self.jobs = jobs
+    self.other = kwargs
+
+
 class LogicalUnit(object):
   """Logical Unit base class.
 
diff --git a/lib/constants.py b/lib/constants.py
index 2ae1a43be892bf61d9bde10931177bae4f740392..973914b7ecf09685138c0b712642812579d5abc6 100644
--- a/lib/constants.py
+++ b/lib/constants.py
@@ -529,6 +529,9 @@ DISK_TRANSFER_CONNECT_TIMEOUT = 60
 # Disk index separator
 DISK_SEPARATOR = _autoconf.DISK_SEPARATOR
 
+#: Key for job IDs in opcode result
+JOB_IDS_KEY = "jobs"
+
 # runparts results
 (RUNPARTS_SKIP,
  RUNPARTS_RUN,
diff --git a/lib/jqueue.py b/lib/jqueue.py
index 2871c6ae1ce27ac7b36fa85c3e8f442e5263596b..6b60c5e02fb3309506928ee5ff47436ae11f66fb 100644
--- a/lib/jqueue.py
+++ b/lib/jqueue.py
@@ -540,6 +540,15 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     # Cancel here if we were asked to
     self._CheckCancel()
 
+  def SubmitManyJobs(self, jobs):
+    """Submits jobs for processing.
+
+    See L{JobQueue.SubmitManyJobs}.
+
+    """
+    # Locking is done in job queue
+    return self._queue.SubmitManyJobs(jobs)
+
 
 class _JobChangesChecker(object):
   def __init__(self, fields, prev_job_info, prev_log_serial):
diff --git a/lib/mcpu.py b/lib/mcpu.py
index 863cf9aa8e9f44265aa5b50f5efa8f4b095df7a3..37588e1bdd5761d123bb208f5abddf4615734143 100644
--- a/lib/mcpu.py
+++ b/lib/mcpu.py
@@ -144,6 +144,14 @@ class OpExecCbBase: # pylint: disable-msg=W0232
 
     """
 
+  def SubmitManyJobs(self, jobs):
+    """Submits jobs for processing.
+
+    See L{jqueue.JobQueue.SubmitManyJobs}.
+
+    """
+    raise NotImplementedError
+
 
 def _LUNameForOpName(opname):
   """Computes the LU name for a given OpCode name.
@@ -209,6 +217,24 @@ class Processor(object):
 
     return acquired
 
+  def _ProcessResult(self, result):
+    """
+
+    """
+    if isinstance(result, cmdlib.ResultWithJobs):
+      # Submit jobs
+      job_submission = self._cbs.SubmitManyJobs(result.jobs)
+
+      # Build dictionary
+      result = result.other
+
+      assert constants.JOB_IDS_KEY not in result, \
+        "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
+
+      result[constants.JOB_IDS_KEY] = job_submission
+
+    return result
+
   def _ExecLU(self, lu):
     """Logical Unit execution sequence.
 
@@ -229,7 +255,7 @@ class Processor(object):
       return lu.dry_run_result
 
     try:
-      result = lu.Exec(self.Log)
+      result = self._ProcessResult(lu.Exec(self.Log))
       h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
       result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
                                 self.Log, result)
diff --git a/lib/opcodes.py b/lib/opcodes.py
index c7110467d4f566d6aa54a385feb9ac61129af23b..b09ab309de56feb296c0fa2927dbf9407344da70 100644
--- a/lib/opcodes.py
+++ b/lib/opcodes.py
@@ -1423,6 +1423,7 @@ class OpTestDummy(OpCode):
     ("result", ht.NoDefault, ht.NoType, None),
     ("messages", ht.NoDefault, ht.NoType, None),
     ("fail", ht.NoDefault, ht.NoType, None),
+    ("submit_jobs", None, ht.NoType, None),
     ]
   WITH_LU = False
 
diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py
index 913792724dd08050b11e4741231b552b42806f91..73cf2bc17943e7862fdb8106134e33a2a482d6e9 100755
--- a/test/ganeti.jqueue_unittest.py
+++ b/test/ganeti.jqueue_unittest.py
@@ -428,6 +428,9 @@ class _FakeQueueForProc:
   def __init__(self):
     self._acquired = False
     self._updates = []
+    self._submitted = []
+
+    self._submit_count = itertools.count(1000)
 
   def IsAcquired(self):
     return self._acquired
@@ -435,6 +438,9 @@ class _FakeQueueForProc:
   def GetNextUpdate(self):
     return self._updates.pop(0)
 
+  def GetNextSubmittedJob(self):
+    return self._submitted.pop(0)
+
   def acquire(self, shared=0):
     assert shared == 1
     self._acquired = True
@@ -447,6 +453,12 @@ class _FakeQueueForProc:
     assert self._acquired, "Lock not acquired while updating job"
     self._updates.append((job, bool(replicate)))
 
+  def SubmitManyJobs(self, jobs):
+    assert not self._acquired, "Lock acquired while submitting jobs"
+    job_ids = [self._submit_count.next() for _ in jobs]
+    self._submitted.extend(zip(job_ids, jobs))
+    return job_ids
+
 
 class _FakeExecOpCodeForProc:
   def __init__(self, queue, before_start, after_start):
@@ -473,6 +485,9 @@ class _FakeExecOpCodeForProc:
     if op.fail:
       raise errors.OpExecError("Error requested (%s)" % op.result)
 
+    if hasattr(op, "submit_jobs") and op.submit_jobs is not None:
+      return cbs.SubmitManyJobs(op.submit_jobs)
+
     return op.result
 
 
@@ -1065,6 +1080,90 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
     self.assertFalse(job.GetLogEntries(count))
     self.assertFalse(job.GetLogEntries(count + 3))
 
+  def testSubmitManyJobs(self):
+    queue = _FakeQueueForProc()
+
+    job_id = 15656
+    ops = [
+      opcodes.OpTestDummy(result="Res0", fail=False,
+                          submit_jobs=[]),
+      opcodes.OpTestDummy(result="Res1", fail=False,
+                          submit_jobs=[
+                            [opcodes.OpTestDummy(result="r1j0", fail=False)],
+                            ]),
+      opcodes.OpTestDummy(result="Res2", fail=False,
+                          submit_jobs=[
+                            [opcodes.OpTestDummy(result="r2j0o0", fail=False),
+                             opcodes.OpTestDummy(result="r2j0o1", fail=False),
+                             opcodes.OpTestDummy(result="r2j0o2", fail=False),
+                             opcodes.OpTestDummy(result="r2j0o3", fail=False)],
+                            [opcodes.OpTestDummy(result="r2j1", fail=False)],
+                            [opcodes.OpTestDummy(result="r2j3o0", fail=False),
+                             opcodes.OpTestDummy(result="r2j3o1", fail=False)],
+                            ]),
+      ]
+
+    # Create job
+    job = self._CreateJob(queue, job_id, ops)
+
+    def _BeforeStart(timeout, priority):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+      self.assertFalse(job.cur_opctx)
+
+    def _AfterStart(op, cbs):
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+
+      self.assertFalse(queue.IsAcquired())
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+      self.assertFalse(job.cur_opctx)
+
+      # Job is running, cancelling shouldn't be possible
+      (success, _) = job.Cancel()
+      self.assertFalse(success)
+
+    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
+
+    for idx in range(len(ops)):
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      result = jqueue._JobProcessor(queue, opexec, job)()
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
+      if idx == len(ops) - 1:
+        # Last opcode
+        self.assert_(result)
+      else:
+        self.assertFalse(result)
+
+        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+        self.assert_(job.start_timestamp)
+        self.assertFalse(job.end_timestamp)
+
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
+    for idx, submitted_ops in enumerate(job_ops
+                                        for op in ops
+                                        for job_ops in op.submit_jobs):
+      self.assertEqual(queue.GetNextSubmittedJob(),
+                       (1000 + idx, submitted_ops))
+    self.assertRaises(IndexError, queue.GetNextSubmittedJob)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+    self.assertEqual(job.GetInfo(["opresult"]),
+                     [[[], [1000], [1001, 1002, 1003]]])
+    self.assertEqual(job.GetInfo(["opstatus"]),
+                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
+
+    self._GenericCheckJob(job)
+
+    # Finished jobs can't be processed any further
+    self.assertRaises(errors.ProgrammerError,
+                      jqueue._JobProcessor(queue, opexec, job))
+
 
 class _FakeTimeoutStrategy:
   def __init__(self, timeouts):