Implement chained jobs

An overview is available in the design document for this change,

When a job enters the job processor, the current opcode's dependencies
are evaluated. If a referenced job has not yet reached the desired
status, the current job is registered as a dependant. The job processor
will continue to work on other pending tasks. When a job finishes it
notifies any pending dependants by re-adding them to the workerpool.

A per-job processor lock is necessary for rare cases where the same job
can be re-added twice.

There is no way to view waiting jobs at the moment, but I plan to
export this information to “gnt-debug locks”.

A so-called dependency manager takes care of managing waiting jobs and
keeping track of their status.

Unittests are included.
Signed-off-by: default avatarMichael Hanselmann <>
Reviewed-by: default avatarIustin Pop <>
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
......@@ -34,6 +34,7 @@ import errno
import re
import time
import weakref
import threading
# pylint: disable-msg=E0611
......@@ -55,6 +56,7 @@ from ganeti import rpc
from ganeti import runtime
from ganeti import netutils
from ganeti import compat
from ganeti import ht
......@@ -177,7 +179,7 @@ class _QueuedJob(object):
# pylint: disable-msg=W0212
__slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
"received_timestamp", "start_timestamp", "end_timestamp",
"__weakref__", "processor_lock"]
def __init__(self, queue, job_id, ops):
"""Constructor for the _QueuedJob.
......@@ -211,6 +213,7 @@ class _QueuedJob(object):
obj.ops_iter = None
obj.cur_opctx = None
obj.processor_lock = threading.Lock()
def __repr__(self):
status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
......@@ -804,6 +807,12 @@ class _OpExecContext:
self.log_prefix = log_prefix
self.summary = op.input.Summary()
# Create local copy to modify
if getattr(op.input, opcodes.DEPEND_ATTR, None):
self.jobdeps = op.input.depends[:]
self.jobdeps = None
self._timeout_strategy_factory = timeout_strategy_factory
......@@ -927,6 +936,62 @@ class _JobProcessor(object):
return update
def _CheckDependencies(queue, job, opctx):
"""Checks if an opcode has dependencies and if so, processes them.
@type queue: L{JobQueue}
@param queue: Queue object
@type job: L{_QueuedJob}
@param job: Job object
@type opctx: L{_OpExecContext}
@param opctx: Opcode execution context
@rtype: bool
@return: Whether opcode will be re-scheduled by dependency tracker
op = opctx.op
result = False
while opctx.jobdeps:
(dep_job_id, dep_status) = opctx.jobdeps[0]
(depresult, depmsg) = queue.depmgr.CheckAndRegister(job, dep_job_id,
assert ht.TNonEmptyString(depmsg), "No dependency message""%s: %s", opctx.log_prefix, depmsg)
if depresult == _JobDependencyManager.CONTINUE:
# Remove dependency and continue
elif depresult == _JobDependencyManager.WAIT:
# Need to wait for notification, dependency tracker will re-add job
# to workerpool
result = True
elif depresult == _JobDependencyManager.CANCEL:
# Job was cancelled, cancel this job as well
assert op.status == constants.OP_STATUS_CANCELING
elif depresult in (_JobDependencyManager.WRONGSTATUS,
# Job failed or there was an error, this job must fail
op.status = constants.OP_STATUS_ERROR
op.result = _EncodeOpError(errors.OpExecError(depmsg))
raise errors.ProgrammerError("Unknown dependency result '%s'" %
return result
def _ExecOpCodeUnlocked(self, opctx):
"""Processes one opcode and returns the result.
......@@ -1013,6 +1078,8 @@ class _JobProcessor(object):
assert (op.priority <= constants.OP_PRIO_LOWEST and
op.priority >= constants.OP_PRIO_HIGHEST)
waitjob = None
if op.status != constants.OP_STATUS_CANCELING:
assert op.status in (constants.OP_STATUS_QUEUED,
......@@ -1025,18 +1092,32 @@ class _JobProcessor(object):
assert op.status == constants.OP_STATUS_WAITLOCK
assert job.CalcStatus() == constants.JOB_STATUS_WAITLOCK
assert job.start_timestamp and op.start_timestamp
assert waitjob is None
# Check if waiting for a job is necessary
waitjob = self._CheckDependencies(queue, job, opctx)"%s: opcode %s waiting for locks",
opctx.log_prefix, opctx.summary)
assert op.status in (constants.OP_STATUS_WAITLOCK,
(op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
if not (waitjob or op.status in (constants.OP_STATUS_CANCELING,
constants.OP_STATUS_ERROR)):"%s: opcode %s waiting for locks",
opctx.log_prefix, opctx.summary)
op.status = op_status
op.result = op_result
assert not opctx.jobdeps, "Not all dependencies were removed"
(op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
op.status = op_status
op.result = op_result
assert not waitjob
if op.status == constants.OP_STATUS_WAITLOCK:
# Couldn't get locks in time
......@@ -1051,10 +1132,10 @@ class _JobProcessor(object):
assert op.status in constants.OPS_FINALIZED
if op.status == constants.OP_STATUS_WAITLOCK:
if op.status == constants.OP_STATUS_WAITLOCK or waitjob:
finalize = False
if opctx.CheckPriorityIncrease():
if not waitjob and opctx.CheckPriorityIncrease():
# Priority was changed, need to update on-disk file
......@@ -1113,11 +1194,17 @@ class _JobProcessor(object):
# allowed. Once the file has been written, it can be archived anytime.
assert not waitjob
if finalize:"Finished job %s, status = %s",, job.CalcStatus())
# TODO: Check locking
return True
return False
assert not waitjob or queue.depmgr.JobWaiting(job)
return bool(waitjob)
......@@ -1129,12 +1216,24 @@ class _JobQueueWorker(workerpool.BaseWorker):
def RunTask(self, job): # pylint: disable-msg=W0221
"""Job executor.
This functions processes a job. It is closely tied to the L{_QueuedJob} and
L{_QueuedOpCode} classes.
@type job: L{_QueuedJob}
@param job: the job to be processed
# Ensure only one worker is active on a single job. If a job registers for
# a dependency job, and the other job notifies before the first worker is
# done, the job can end up in the tasklist more than once.
return self._RunTaskInner(job)
def _RunTaskInner(self, job):
"""Executes a job.
Must be called with per-job lock acquired.
queue = job.queue
assert queue == self.pool.queue
......@@ -1194,6 +1293,117 @@ class _JobQueueWorkerPool(workerpool.WorkerPool):
self.queue = queue
class _JobDependencyManager:
"""Keeps track of job dependencies.
WRONGSTATUS) = range(1, 6)
# TODO: Export waiter information to lock monitor
def __init__(self, getstatus_fn, enqueue_fn):
"""Initializes this class.
self._getstatus_fn = getstatus_fn
self._enqueue_fn = enqueue_fn
self._waiters = {}
self._lock = locking.SharedLock("JobDepMgr")
@locking.ssynchronized(_LOCK, shared=1)
def JobWaiting(self, job):
"""Checks if a job is waiting.
return compat.any(job in jobs
for jobs in self._waiters.values())
def CheckAndRegister(self, job, dep_job_id, dep_status):
"""Checks if a dependency job has the requested status.
If the other job is not yet in a finalized status, the calling job will be
notified (re-added to the workerpool) at a later point.
@type job: L{_QueuedJob}
@param job: Job object
@type dep_job_id: string
@param dep_job_id: ID of dependency job
@type dep_status: list
@param dep_status: Required status
assert ht.TString(
assert ht.TString(dep_job_id)
assert ht.TListOf(ht.TElemOf(constants.JOBS_FINALIZED))(dep_status)
if == dep_job_id:
return (self.ERROR, "Job can't depend on itself")
# Get status of dependency job
status = self._getstatus_fn(dep_job_id)
except errors.JobLost, err:
return (self.ERROR, "Dependency error: %s" % err)
assert status in constants.JOB_STATUS_ALL
job_id_waiters = self._waiters.setdefault(dep_job_id, set())
if status not in constants.JOBS_FINALIZED:
# Register for notification and wait for job to finish
return (self.WAIT,
"Need to wait for job %s, wanted status '%s'" %
(dep_job_id, dep_status))
# Remove from waiters list
if job in job_id_waiters:
if (status == constants.JOB_STATUS_CANCELED and
constants.JOB_STATUS_CANCELED not in dep_status):
return (self.CANCEL, "Dependency job %s was cancelled" % dep_job_id)
elif not dep_status or status in dep_status:
return (self.CONTINUE,
"Dependency job %s finished with status '%s'" %
(dep_job_id, status))
return (self.WRONGSTATUS,
"Dependency job %s finished with status '%s',"
" not one of '%s' as required" %
(dep_job_id, status, utils.CommaJoin(dep_status)))
def NotifyWaiters(self, job_id):
"""Notifies all jobs waiting for a certain job ID.
@type job_id: string
@param job_id: Job ID
assert ht.TString(job_id)
jobs = self._waiters.pop(job_id, None)
if jobs:
# Re-add jobs to workerpool
logging.debug("Re-adding %s jobs which were waiting for job %s",
len(jobs), job_id)
# Remove all jobs without actual waiters
for job_id in [job_id for (job_id, waiters) in self._waiters.items()
if not waiters]:
del self._waiters[job_id]
def _RequireOpenQueue(fn):
"""Decorator for "public" functions.
......@@ -1277,6 +1487,10 @@ class JobQueue(object):
self._drained = jstore.CheckDrainFlag()
# Job dependencies
self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
# Setup worker pool
self._wpool = _JobQueueWorkerPool(self)
......@@ -1808,6 +2022,27 @@ class JobQueue(object):
self._wpool.AddManyTasks([(job, ) for job in jobs],
priority=[job.CalcPriority() for job in jobs])
def _GetJobStatusForDependencies(self, job_id):
"""Gets the status of a job for dependencies.
@type job_id: string
@param job_id: Job ID
@raise errors.JobLost: If job can't be found
if not isinstance(job_id, basestring):
job_id = self._FormatJobID(job_id)
# Not using in-memory cache as doing so would require an exclusive lock
# Try to load from disk
job = self.SafeLoadJobFromDisk(job_id, True)
if job:
return job.CalcStatus()
raise errors.JobLost("Job %s not found" % job_id)
def UpdateJobUnlocked(self, job, replicate=True):
"""Update a job's on disk storage.
......@@ -151,6 +151,9 @@ _SUMMARY_PREFIX = {
"INSTANCE_": "I_",
#: Attribute name for dependencies
DEPEND_ATTR = "depends"
def _NameToId(name):
"""Convert an opcode class name to an OP_ID.
......@@ -422,7 +425,7 @@ class OpCode(BaseOpCode):
("debug_level", None, ht.TOr(ht.TNone, ht.TPositiveInt), "Debug level"),
("priority", constants.OP_PRIO_DEFAULT,
ht.TElemOf(constants.OP_PRIO_SUBMIT_VALID), "Opcode priority"),
("depends", None, ht.TOr(ht.TNone, ht.TListOf(_T_JOB_DEP)),
(DEPEND_ATTR, None, ht.TOr(ht.TNone, ht.TListOf(_T_JOB_DEP)),
"Job dependencies"),
