diff --git a/lib/jqueue.py b/lib/jqueue.py index 6929d6a049ffcee6df063d899da7a8d3fe2cf075..a562e6e7306f4dce4c435582552b2594c43b4cba 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -564,8 +564,8 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): timestamp = utils.SplitTime(time.time()) self._AppendFeedback(timestamp, log_type, log_msg) - def CheckCancel(self): - """Check whether job has been cancelled. + def CurrentPriority(self): + """Returns current priority for opcode. """ assert self._op.status in (constants.OP_STATUS_WAITING, @@ -574,6 +574,8 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): # Cancel here if we were asked to self._CheckCancel() + return self._op.priority + def SubmitManyJobs(self, jobs): """Submits jobs for processing. @@ -1043,7 +1045,7 @@ class _JobProcessor(object): # Make sure not to hold queue lock while calling ExecOpCode result = self.opexec_fn(op.input, _OpExecCallbacks(self.queue, self.job, op), - timeout=timeout, priority=op.priority) + timeout=timeout) except mcpu.LockAcquireTimeout: assert timeout is not None, "Received timeout for blocking acquire" logging.debug("Couldn't acquire locks in %0.6fs", timeout) diff --git a/lib/mcpu.py b/lib/mcpu.py index 27f29fca13d9be82b08dd046c9596f1e0095a350..b227334df504e26be541381ee803d260ed6e9033 100644 --- a/lib/mcpu.py +++ b/lib/mcpu.py @@ -142,10 +142,11 @@ class OpExecCbBase: # pylint: disable=W0232 """ - def CheckCancel(self): - """Check whether job has been cancelled. + def CurrentPriority(self): # pylint: disable=R0201 + """Returns current priority or C{None}. """ + return None def SubmitManyJobs(self, jobs): """Submits jobs for processing. @@ -274,7 +275,7 @@ class Processor(object): if not self._enable_locks: raise errors.ProgrammerError("Attempted to use disabled locks") - def _AcquireLocks(self, level, names, shared, timeout, priority): + def _AcquireLocks(self, level, names, shared, timeout): """Acquires locks via the Ganeti lock manager. @type level: int @@ -292,7 +293,9 @@ class Processor(object): self._CheckLocksEnabled() if self._cbs: - self._cbs.CheckCancel() + priority = self._cbs.CurrentPriority() + else: + priority = None acquired = self.context.glm.acquire(level, names, shared=shared, timeout=timeout, priority=priority) @@ -342,7 +345,7 @@ class Processor(object): def BuildHooksManager(self, lu): return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu) - def _LockAndExecLU(self, lu, level, calc_timeout, priority): + def _LockAndExecLU(self, lu, level, calc_timeout): """Execute a Logical Unit, with the needed locks. This is a recursive function that starts locking the given level, and @@ -391,7 +394,7 @@ class Processor(object): needed_locks = lu.needed_locks[level] self._AcquireLocks(level, needed_locks, share, - calc_timeout(), priority) + calc_timeout()) else: # Adding locks add_locks = lu.add_locks[level] @@ -408,7 +411,7 @@ class Processor(object): errors.ECODE_NOTUNIQUE) try: - result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority) + result = self._LockAndExecLU(lu, level + 1, calc_timeout) finally: if level in lu.remove_locks: self.context.glm.remove(level, lu.remove_locks[level]) @@ -417,11 +420,11 @@ class Processor(object): self.context.glm.release(level) else: - result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority) + result = self._LockAndExecLU(lu, level + 1, calc_timeout) return result - def ExecOpCode(self, op, cbs, timeout=None, priority=None): + def ExecOpCode(self, op, cbs, timeout=None): """Execute an opcode. @type op: an OpCode instance @@ -430,8 +433,6 @@ class Processor(object): @param cbs: Runtime callbacks @type timeout: float or None @param timeout: Maximum time to acquire all locks, None for no timeout - @type priority: number or None - @param priority: Priority for acquiring lock(s) @raise LockAcquireTimeout: In case locks couldn't be acquired in specified amount of time @@ -456,8 +457,7 @@ class Processor(object): # and in a shared fashion otherwise (to prevent concurrent run with # an exclusive LU. self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL, - not lu_class.REQ_BGL, calc_timeout(), - priority) + not lu_class.REQ_BGL, calc_timeout()) elif lu_class.REQ_BGL: raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are" " disabled" % op.OP_ID) @@ -468,8 +468,7 @@ class Processor(object): assert lu.needed_locks is not None, "needed_locks not set by LU" try: - result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout, - priority) + result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout) finally: if self._ec_id: self.context.cfg.DropECReservations(self._ec_id) diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py index 8b1cb250d45d05da75d5ade01e2146124cda2c32..a5b19ad3e82cc9432c987e75b93ee8fe9dc91b6a 100755 --- a/test/ganeti.jqueue_unittest.py +++ b/test/ganeti.jqueue_unittest.py @@ -554,13 +554,13 @@ class _FakeExecOpCodeForProc: self._before_start = before_start self._after_start = after_start - def __call__(self, op, cbs, timeout=None, priority=None): + def __call__(self, op, cbs, timeout=None): assert isinstance(op, opcodes.OpTestDummy) assert not self._queue.IsAcquired(), \ "Queue lock not released when executing opcode" if self._before_start: - self._before_start(timeout, priority) + self._before_start(timeout, cbs.CurrentPriority()) cbs.NotifyStart()