Commit 031a3e57 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Move OpCode processor callbacks into separate class



There are two major arguments for this:
- There will be more callbacks (e.g. for lock debugging) and extending the
  parameter list is a lot of work.
- In the jqueue module this allows us to keep per-job or per-opcode variables in
  a separate class. Instead of having to clean up the worker class after
  processing one job, these references will automatically go out of scope.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent f60759f7
......@@ -316,16 +316,12 @@ class ClientOps:
logging.info("Received invalid request '%s'", method)
raise ValueError("Invalid operation '%s'" % method)
def _DummyLog(self, *args):
pass
def _Query(self, op):
"""Runs the specified opcode and returns the result.
"""
proc = mcpu.Processor(self.server.context)
# TODO: Where should log messages go?
return proc.ExecOpCode(op, self._DummyLog, None)
return proc.ExecOpCode(op, None)
class GanetiContext(object):
......
......@@ -334,35 +334,78 @@ class _QueuedJob(object):
not_marked = False
class _JobQueueWorker(workerpool.BaseWorker):
"""The actual job workers.
class _OpCodeExecCallbacks(mcpu.OpExecCbBase):
def __init__(self, queue, job, op):
"""Initializes this class.
"""
def _NotifyStart(self):
@type queue: L{JobQueue}
@param queue: Job queue
@type job: L{_QueuedJob}
@param job: Job object
@type op: L{_QueuedOpCode}
@param op: OpCode
"""
assert queue, "Queue is missing"
assert job, "Job is missing"
assert op, "Opcode is missing"
self._queue = queue
self._job = job
self._op = op
def NotifyStart(self):
"""Mark the opcode as running, not lock-waiting.
This is called from the mcpu code as a notifier function, when the
LU is finally about to start the Exec() method. Of course, to have
end-user visible results, the opcode must be initially (before
calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
This is called from the mcpu code as a notifier function, when the LU is
finally about to start the Exec() method. Of course, to have end-user
visible results, the opcode must be initially (before calling into
Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
"""
assert self.queue, "Queue attribute is missing"
assert self.opcode, "Opcode attribute is missing"
self.queue.acquire()
self._queue.acquire()
try:
assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
constants.OP_STATUS_CANCELING)
assert self._op.status in (constants.OP_STATUS_WAITLOCK,
constants.OP_STATUS_CANCELING)
# Cancel here if we were asked to
if self.opcode.status == constants.OP_STATUS_CANCELING:
if self._op.status == constants.OP_STATUS_CANCELING:
raise CancelJob()
self.opcode.status = constants.OP_STATUS_RUNNING
self._op.status = constants.OP_STATUS_RUNNING
finally:
self.queue.release()
self._queue.release()
def Feedback(self, *args):
"""Append a log entry.
"""
assert len(args) < 3
if len(args) == 1:
log_type = constants.ELOG_MESSAGE
log_msg = args[0]
else:
(log_type, log_msg) = args
# The time is split to make serialization easier and not lose
# precision.
timestamp = utils.SplitTime(time.time())
self._queue.acquire()
try:
self._job.log_serial += 1
self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
self._job.change.notifyAll()
finally:
self._queue.release()
class _JobQueueWorker(workerpool.BaseWorker):
"""The actual job workers.
"""
def RunTask(self, job):
"""Job executor.
......@@ -376,7 +419,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
logging.info("Worker %s processing job %s",
self.worker_id, job.id)
proc = mcpu.Processor(self.pool.queue.context)
self.queue = queue = job.queue
queue = job.queue
try:
try:
count = len(job.ops)
......@@ -412,34 +455,9 @@ class _JobQueueWorker(workerpool.BaseWorker):
finally:
queue.release()
def _Log(*args):
"""Append a log entry.
"""
assert len(args) < 3
if len(args) == 1:
log_type = constants.ELOG_MESSAGE
log_msg = args[0]
else:
log_type, log_msg = args
# The time is split to make serialization easier and not lose
# precision.
timestamp = utils.SplitTime(time.time())
queue.acquire()
try:
job.log_serial += 1
op.log.append((job.log_serial, timestamp, log_type, log_msg))
job.change.notifyAll()
finally:
queue.release()
# Make sure not to hold lock while _Log is called
self.opcode = op
result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
# Make sure not to hold queue lock while calling ExecOpCode
result = proc.ExecOpCode(input_opcode,
_OpCodeExecCallbacks(queue, job, op))
queue.acquire()
try:
......
......@@ -38,6 +38,24 @@ from ganeti import cmdlib
from ganeti import locking
class OpExecCbBase:
"""Base class for OpCode execution callbacks.
"""
def NotifyStart(self):
"""Called when we are about to execute the LU.
This function is called when we're about to start the lu's Exec() method,
that is, after we have acquired all locks.
"""
def Feedback(self, *args):
"""Sends feedback from the LU code to the end-user.
"""
class Processor(object):
"""Object which runs OpCodes"""
DISPATCH_TABLE = {
......@@ -103,12 +121,9 @@ class Processor(object):
def __init__(self, context):
"""Constructor for Processor
Args:
- feedback_fn: the feedback function (taking one string) to be run when
interesting events are happening
"""
self.context = context
self._feedback_fn = None
self._cbs = None
self.exclusive_BGL = False
self.rpc = rpc.RpcRunner(context.cfg)
self.hmclass = HooksMaster
......@@ -122,7 +137,7 @@ class Processor(object):
hm = HooksMaster(self.rpc.call_hooks_runner, lu)
h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
self._feedback_fn, None)
self._Feedback, None)
if getattr(lu.op, "dry_run", False):
# in this mode, no post-hooks are run, and the config is not
......@@ -133,10 +148,10 @@ class Processor(object):
return lu.dry_run_result
try:
result = lu.Exec(self._feedback_fn)
result = lu.Exec(self._Feedback)
h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
self._feedback_fn, result)
self._Feedback, result)
finally:
# FIXME: This needs locks if not lu_class.REQ_BGL
if write_count != self.context.cfg.write_count:
......@@ -155,8 +170,9 @@ class Processor(object):
adding_locks = level in lu.add_locks
acquiring_locks = level in lu.needed_locks
if level not in locking.LEVELS:
if callable(self._run_notifier):
self._run_notifier()
if self._cbs:
self._cbs.NotifyStart()
result = self._ExecLU(lu)
elif adding_locks and acquiring_locks:
# We could both acquire and add locks at the same level, but for now we
......@@ -196,52 +212,57 @@ class Processor(object):
return result
def ExecOpCode(self, op, feedback_fn, run_notifier):
def ExecOpCode(self, op, cbs):
"""Execute an opcode.
@type op: an OpCode instance
@param op: the opcode to be executed
@type feedback_fn: a function that takes a single argument
@param feedback_fn: this function will be used as feedback from the LU
code to the end-user
@type run_notifier: callable (no arguments) or None
@param run_notifier: this function (if callable) will be called when
we are about to call the lu's Exec() method, that
is, after we have acquired all locks
@type cbs: L{OpExecCbBase}
@param cbs: Runtime callbacks
"""
if not isinstance(op, opcodes.OpCode):
raise errors.ProgrammerError("Non-opcode instance passed"
" to ExecOpcode")
self._feedback_fn = feedback_fn
self._run_notifier = run_notifier
lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
if lu_class is None:
raise errors.OpCodeUnknown("Unknown opcode")
# Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
# shared fashion otherwise (to prevent concurrent run with an exclusive LU.
self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
shared=not lu_class.REQ_BGL)
self._cbs = cbs
try:
self.exclusive_BGL = lu_class.REQ_BGL
lu = lu_class(self, op, self.context, self.rpc)
lu.ExpandNames()
assert lu.needed_locks is not None, "needed_locks not set by LU"
result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
if lu_class is None:
raise errors.OpCodeUnknown("Unknown opcode")
# Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
# shared fashion otherwise (to prevent concurrent run with an exclusive
# LU.
self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
shared=not lu_class.REQ_BGL)
try:
self.exclusive_BGL = lu_class.REQ_BGL
lu = lu_class(self, op, self.context, self.rpc)
lu.ExpandNames()
assert lu.needed_locks is not None, "needed_locks not set by LU"
result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE)
finally:
self.context.glm.release(locking.LEVEL_CLUSTER)
self.exclusive_BGL = False
finally:
self.context.glm.release(locking.LEVEL_CLUSTER)
self.exclusive_BGL = False
self._cbs = None
return result
def _Feedback(self, *args):
"""Forward call to feedback callback function.
"""
if self._cbs:
self._cbs.Feedback(*args)
def LogStep(self, current, total, message):
"""Log a change in LU execution progress.
"""
logging.debug("Step %d/%d %s", current, total, message)
self._feedback_fn("STEP %d/%d %s" % (current, total, message))
self._Feedback("STEP %d/%d %s" % (current, total, message))
def LogWarning(self, message, *args, **kwargs):
"""Log a warning to the logs and the user.
......@@ -258,9 +279,9 @@ class Processor(object):
message = message % tuple(args)
if message:
logging.warning(message)
self._feedback_fn(" - WARNING: %s" % message)
self._Feedback(" - WARNING: %s" % message)
if "hint" in kwargs:
self._feedback_fn(" Hint: %s" % kwargs["hint"])
self._Feedback(" Hint: %s" % kwargs["hint"])
def LogInfo(self, message, *args):
"""Log an informational message to the logs and the user.
......@@ -269,7 +290,7 @@ class Processor(object):
if args:
message = message % tuple(args)
logging.info(message)
self._feedback_fn(" - INFO: %s" % message)
self._Feedback(" - INFO: %s" % message)
class HooksMaster(object):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment