Commit e92376d7 authored by Iustin Pop's avatar Iustin Pop

Implement job 'waiting' status

Background: when we have multiple jobs in the queue (more than just a
few), many of the jobs (up to the number of threads) will be in state
'running', although many of them could be actually blocked, waiting for
some locks. This is not good, as one cannot easily see what is

The patch extends the opcode/job possible statuses with another one,
waiting, which shows that the LU is in the acquire locks phase. The
mechanism for doing so is simple, we initialize (in the job queue) the
opcode with OP_STATUS_WAITLOCK, and when the processor is ready to give
control to the LU's Exec, it will call a notifier back into the
_JobQueueWorker that sets the opcode status to OP_STATUS_RUNNING (with
the proper queue locking). Because this mechanism does not save the job,
all opcodes on disk will be in status WAITLOCK and not RUNNING anymore,
so we also change the load sequence to consider WAITLOCK as RUNNING.

With the patch applied, creating in parallel (via burnin) five instances
on a five node cluster shows that only two are executing, while three
are waiting for locks.

Reviewed-by: imsnah
parent 12222048
...@@ -261,7 +261,7 @@ class ClientOps: ...@@ -261,7 +261,7 @@ class ClientOps:
""" """
proc = mcpu.Processor(self.server.context) proc = mcpu.Processor(self.server.context)
# TODO: Where should log messages go? # TODO: Where should log messages go?
return proc.ExecOpCode(op, self._DummyLog) return proc.ExecOpCode(op, self._DummyLog, None)
class GanetiContext(object): class GanetiContext(object):
...@@ -304,12 +304,14 @@ JOB_NOTCHANGED = "nochange" ...@@ -304,12 +304,14 @@ JOB_NOTCHANGED = "nochange"
# Job status # Job status
...@@ -159,6 +159,8 @@ class _QueuedJob(object): ...@@ -159,6 +159,8 @@ class _QueuedJob(object):
if op.status == constants.OP_STATUS_QUEUED: if op.status == constants.OP_STATUS_QUEUED:
pass pass
elif op.status == constants.OP_STATUS_WAITLOCK:
status = constants.JOB_STATUS_WAITLOCK
elif op.status == constants.OP_STATUS_RUNNING: elif op.status == constants.OP_STATUS_RUNNING:
status = constants.JOB_STATUS_RUNNING status = constants.JOB_STATUS_RUNNING
elif op.status == constants.OP_STATUS_ERROR: elif op.status == constants.OP_STATUS_ERROR:
...@@ -188,6 +190,24 @@ class _QueuedJob(object): ...@@ -188,6 +190,24 @@ class _QueuedJob(object):
class _JobQueueWorker(workerpool.BaseWorker): class _JobQueueWorker(workerpool.BaseWorker):
def _NotifyStart(self):
"""Mark the opcode as running, not lock-waiting.
This is called from the mcpu code as a notifier function, when the
LU is finally about to start the Exec() method. Of course, to have
end-user visible results, the opcode must be initially (before
calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
assert self.queue, "Queue attribute is missing"
assert self.opcode, "Opcode attribute is missing"
self.opcode.status = constants.OP_STATUS_RUNNING
def RunTask(self, job): def RunTask(self, job):
"""Job executor. """Job executor.
...@@ -198,7 +218,7 @@ class _JobQueueWorker(workerpool.BaseWorker): ...@@ -198,7 +218,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
logging.debug("Worker %s processing job %s", logging.debug("Worker %s processing job %s",
self.worker_id, self.worker_id,
proc = mcpu.Processor(self.pool.queue.context) proc = mcpu.Processor(self.pool.queue.context)
queue = job.queue self.queue = queue = job.queue
try: try:
try: try:
count = len(job.ops) count = len(job.ops)
...@@ -209,7 +229,7 @@ class _JobQueueWorker(workerpool.BaseWorker): ...@@ -209,7 +229,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
queue.acquire() queue.acquire()
try: try:
job.run_op_index = idx job.run_op_index = idx
op.status = constants.OP_STATUS_RUNNING op.status = constants.OP_STATUS_WAITLOCK
op.result = None op.result = None
op.start_timestamp = TimeStampNow() op.start_timestamp = TimeStampNow()
if idx == 0: # first opcode if idx == 0: # first opcode
...@@ -246,7 +266,8 @@ class _JobQueueWorker(workerpool.BaseWorker): ...@@ -246,7 +266,8 @@ class _JobQueueWorker(workerpool.BaseWorker):
queue.release() queue.release()
# Make sure not to hold lock while _Log is called # Make sure not to hold lock while _Log is called
result = proc.ExecOpCode(input_opcode, _Log) self.opcode = op
result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
queue.acquire() queue.acquire()
try: try:
...@@ -365,7 +386,8 @@ class JobQueue(object): ...@@ -365,7 +386,8 @@ class JobQueue(object):
if status in (constants.JOB_STATUS_QUEUED, ): if status in (constants.JOB_STATUS_QUEUED, ):
self._wpool.AddTask(job) self._wpool.AddTask(job)
elif status in (constants.JOB_STATUS_RUNNING, ): elif status in (constants.JOB_STATUS_RUNNING,
logging.warning("Unfinished job %s found: %s",, job) logging.warning("Unfinished job %s found: %s",, job)
try: try:
for op in job.ops: for op in job.ops:
...@@ -621,7 +643,8 @@ class JobQueue(object): ...@@ -621,7 +643,8 @@ class JobQueue(object):
log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
if status not in (constants.JOB_STATUS_QUEUED, if status not in (constants.JOB_STATUS_QUEUED,
# Don't even try to wait if the job is no longer running, there will be # Don't even try to wait if the job is no longer running, there will be
# no changes. # no changes.
break break
...@@ -131,6 +131,8 @@ class Processor(object): ...@@ -131,6 +131,8 @@ class Processor(object):
adding_locks = level in lu.add_locks adding_locks = level in lu.add_locks
acquiring_locks = level in lu.needed_locks acquiring_locks = level in lu.needed_locks
if level not in locking.LEVELS: if level not in locking.LEVELS:
if callable(self._run_notifier):
result = self._ExecLU(lu) result = self._ExecLU(lu)
elif adding_locks and acquiring_locks: elif adding_locks and acquiring_locks:
# We could both acquire and add locks at the same level, but for now we # We could both acquire and add locks at the same level, but for now we
...@@ -170,11 +172,18 @@ class Processor(object): ...@@ -170,11 +172,18 @@ class Processor(object):
return result return result
def ExecOpCode(self, op, feedback_fn): def ExecOpCode(self, op, feedback_fn, run_notifier):
"""Execute an opcode. """Execute an opcode.
Args: @type op: an OpCode instance
op: the opcode to be executed @param op: the opcode to be executed
@type feedback_fn: a function that takes a single argument
@param feedback_fn: this function will be used as feedback from the LU
code to the end-user
@type run_notifier: callable (no arguments) or None
@param run_notifier: this function (if callable) will be called when
we are about to call the lu's Exec() method, that
is, after we have aquired all locks
""" """
if not isinstance(op, opcodes.OpCode): if not isinstance(op, opcodes.OpCode):
...@@ -182,6 +191,7 @@ class Processor(object): ...@@ -182,6 +191,7 @@ class Processor(object):
" to ExecOpcode") " to ExecOpcode")
self._feedback_fn = feedback_fn self._feedback_fn = feedback_fn
self._run_notifier = run_notifier
lu_class = self.DISPATCH_TABLE.get(op.__class__, None) lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
if lu_class is None: if lu_class is None:
raise errors.OpCodeUnknown("Unknown opcode") raise errors.OpCodeUnknown("Unknown opcode")
...@@ -38,6 +38,7 @@ _LIST_DEF_FIELDS = ["id", "status", "summary"] ...@@ -38,6 +38,7 @@ _LIST_DEF_FIELDS = ["id", "status", "summary"]
constants.JOB_STATUS_QUEUED: "queued", constants.JOB_STATUS_QUEUED: "queued",
constants.JOB_STATUS_WAITLOCK: "waiting",
constants.JOB_STATUS_RUNNING: "running", constants.JOB_STATUS_RUNNING: "running",
constants.JOB_STATUS_CANCELED: "canceled", constants.JOB_STATUS_CANCELED: "canceled",
constants.JOB_STATUS_SUCCESS: "success", constants.JOB_STATUS_SUCCESS: "success",
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment