diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd index 7ba175d730d6bab4417585622dc9c13ae4c8eca8..f49bc43c1ef5215d5b242644131c3ebcf9b78fc3 100755 --- a/daemons/ganeti-masterd +++ b/daemons/ganeti-masterd @@ -261,7 +261,7 @@ class ClientOps: """ proc = mcpu.Processor(self.server.context) # TODO: Where should log messages go? - return proc.ExecOpCode(op, self._DummyLog) + return proc.ExecOpCode(op, self._DummyLog, None) class GanetiContext(object): diff --git a/lib/constants.py b/lib/constants.py index 424fbb5b065ec9135b7f8b10fa8b781fcce633e1..afd40c772af0d7df1469bb86d9ab555b9c0332b1 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -304,12 +304,14 @@ JOB_NOTCHANGED = "nochange" # Job status JOB_STATUS_QUEUED = "queued" +JOB_STATUS_WAITLOCK = "waiting" JOB_STATUS_RUNNING = "running" JOB_STATUS_CANCELED = "canceled" JOB_STATUS_SUCCESS = "success" JOB_STATUS_ERROR = "error" OP_STATUS_QUEUED = "queued" +OP_STATUS_WAITLOCK = "waiting" OP_STATUS_RUNNING = "running" OP_STATUS_CANCELED = "canceled" OP_STATUS_SUCCESS = "success" diff --git a/lib/jqueue.py b/lib/jqueue.py index e596434eddb3a578f18efd6965723fe9b56861ba..a4c7b1d8462d61ef0f3155e188402a6ffe8eb9cc 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -159,6 +159,8 @@ class _QueuedJob(object): if op.status == constants.OP_STATUS_QUEUED: pass + elif op.status == constants.OP_STATUS_WAITLOCK: + status = constants.JOB_STATUS_WAITLOCK elif op.status == constants.OP_STATUS_RUNNING: status = constants.JOB_STATUS_RUNNING elif op.status == constants.OP_STATUS_ERROR: @@ -188,6 +190,24 @@ class _QueuedJob(object): class _JobQueueWorker(workerpool.BaseWorker): + def _NotifyStart(self): + """Mark the opcode as running, not lock-waiting. + + This is called from the mcpu code as a notifier function, when the + LU is finally about to start the Exec() method. Of course, to have + end-user visible results, the opcode must be initially (before + calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK. + + """ + assert self.queue, "Queue attribute is missing" + assert self.opcode, "Opcode attribute is missing" + + self.queue.acquire() + try: + self.opcode.status = constants.OP_STATUS_RUNNING + finally: + self.queue.release() + def RunTask(self, job): """Job executor. @@ -198,7 +218,7 @@ class _JobQueueWorker(workerpool.BaseWorker): logging.debug("Worker %s processing job %s", self.worker_id, job.id) proc = mcpu.Processor(self.pool.queue.context) - queue = job.queue + self.queue = queue = job.queue try: try: count = len(job.ops) @@ -209,7 +229,7 @@ class _JobQueueWorker(workerpool.BaseWorker): queue.acquire() try: job.run_op_index = idx - op.status = constants.OP_STATUS_RUNNING + op.status = constants.OP_STATUS_WAITLOCK op.result = None op.start_timestamp = TimeStampNow() if idx == 0: # first opcode @@ -246,7 +266,8 @@ class _JobQueueWorker(workerpool.BaseWorker): queue.release() # Make sure not to hold lock while _Log is called - result = proc.ExecOpCode(input_opcode, _Log) + self.opcode = op + result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart) queue.acquire() try: @@ -365,7 +386,8 @@ class JobQueue(object): if status in (constants.JOB_STATUS_QUEUED, ): self._wpool.AddTask(job) - elif status in (constants.JOB_STATUS_RUNNING, ): + elif status in (constants.JOB_STATUS_RUNNING, + constants.JOB_STATUS_WAITLOCK): logging.warning("Unfinished job %s found: %s", job.id, job) try: for op in job.ops: @@ -621,7 +643,8 @@ class JobQueue(object): log_entries = serializer.LoadJson(serializer.DumpJson(log_entries)) if status not in (constants.JOB_STATUS_QUEUED, - constants.JOB_STATUS_RUNNING): + constants.JOB_STATUS_RUNNING, + constants.JOB_STATUS_WAITLOCK): # Don't even try to wait if the job is no longer running, there will be # no changes. break diff --git a/lib/mcpu.py b/lib/mcpu.py index 2f60fd7c27e82e73f46c2f718844cd965a95ad60..af0202abe56867725772f2d74363545656917169 100644 --- a/lib/mcpu.py +++ b/lib/mcpu.py @@ -131,6 +131,8 @@ class Processor(object): adding_locks = level in lu.add_locks acquiring_locks = level in lu.needed_locks if level not in locking.LEVELS: + if callable(self._run_notifier): + self._run_notifier() result = self._ExecLU(lu) elif adding_locks and acquiring_locks: # We could both acquire and add locks at the same level, but for now we @@ -170,11 +172,18 @@ class Processor(object): return result - def ExecOpCode(self, op, feedback_fn): + def ExecOpCode(self, op, feedback_fn, run_notifier): """Execute an opcode. - Args: - op: the opcode to be executed + @type op: an OpCode instance + @param op: the opcode to be executed + @type feedback_fn: a function that takes a single argument + @param feedback_fn: this function will be used as feedback from the LU + code to the end-user + @type run_notifier: callable (no arguments) or None + @param run_notifier: this function (if callable) will be called when + we are about to call the lu's Exec() method, that + is, after we have aquired all locks """ if not isinstance(op, opcodes.OpCode): @@ -182,6 +191,7 @@ class Processor(object): " to ExecOpcode") self._feedback_fn = feedback_fn + self._run_notifier = run_notifier lu_class = self.DISPATCH_TABLE.get(op.__class__, None) if lu_class is None: raise errors.OpCodeUnknown("Unknown opcode") diff --git a/scripts/gnt-job b/scripts/gnt-job index ab00234e2e04f4fa2a1b8f7f0ec36a3005718527..d9a22539d0dc102e54765d8bcdc42a07fd932a4b 100755 --- a/scripts/gnt-job +++ b/scripts/gnt-job @@ -38,6 +38,7 @@ _LIST_DEF_FIELDS = ["id", "status", "summary"] _USER_JOB_STATUS = { constants.JOB_STATUS_QUEUED: "queued", + constants.JOB_STATUS_WAITLOCK: "waiting", constants.JOB_STATUS_RUNNING: "running", constants.JOB_STATUS_CANCELED: "canceled", constants.JOB_STATUS_SUCCESS: "success",