Commit e4e59de8 authored by Michael Hanselmann's avatar Michael Hanselmann

jqueue/mcpu: Determine priority using callback

Instead of being given the priority for acquiring locks by means of a
parameter, mcpu will now call back. This is in preparation for
implementing a command to change a job's priority on the fly and allows
to change it while locks are being acquired (taking effect on the next
lock acquire).
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarBernardo Dal Seno <bdalseno@google.com>
parent 8af734f8
......@@ -564,8 +564,8 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
timestamp = utils.SplitTime(time.time())
self._AppendFeedback(timestamp, log_type, log_msg)
def CheckCancel(self):
"""Check whether job has been cancelled.
def CurrentPriority(self):
"""Returns current priority for opcode.
"""
assert self._op.status in (constants.OP_STATUS_WAITING,
......@@ -574,6 +574,8 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
# Cancel here if we were asked to
self._CheckCancel()
return self._op.priority
def SubmitManyJobs(self, jobs):
"""Submits jobs for processing.
......@@ -1043,7 +1045,7 @@ class _JobProcessor(object):
# Make sure not to hold queue lock while calling ExecOpCode
result = self.opexec_fn(op.input,
_OpExecCallbacks(self.queue, self.job, op),
timeout=timeout, priority=op.priority)
timeout=timeout)
except mcpu.LockAcquireTimeout:
assert timeout is not None, "Received timeout for blocking acquire"
logging.debug("Couldn't acquire locks in %0.6fs", timeout)
......
......@@ -142,10 +142,11 @@ class OpExecCbBase: # pylint: disable=W0232
"""
def CheckCancel(self):
"""Check whether job has been cancelled.
def CurrentPriority(self): # pylint: disable=R0201
"""Returns current priority or C{None}.
"""
return None
def SubmitManyJobs(self, jobs):
"""Submits jobs for processing.
......@@ -274,7 +275,7 @@ class Processor(object):
if not self._enable_locks:
raise errors.ProgrammerError("Attempted to use disabled locks")
def _AcquireLocks(self, level, names, shared, timeout, priority):
def _AcquireLocks(self, level, names, shared, timeout):
"""Acquires locks via the Ganeti lock manager.
@type level: int
......@@ -292,7 +293,9 @@ class Processor(object):
self._CheckLocksEnabled()
if self._cbs:
self._cbs.CheckCancel()
priority = self._cbs.CurrentPriority()
else:
priority = None
acquired = self.context.glm.acquire(level, names, shared=shared,
timeout=timeout, priority=priority)
......@@ -342,7 +345,7 @@ class Processor(object):
def BuildHooksManager(self, lu):
return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
def _LockAndExecLU(self, lu, level, calc_timeout, priority):
def _LockAndExecLU(self, lu, level, calc_timeout):
"""Execute a Logical Unit, with the needed locks.
This is a recursive function that starts locking the given level, and
......@@ -391,7 +394,7 @@ class Processor(object):
needed_locks = lu.needed_locks[level]
self._AcquireLocks(level, needed_locks, share,
calc_timeout(), priority)
calc_timeout())
else:
# Adding locks
add_locks = lu.add_locks[level]
......@@ -408,7 +411,7 @@ class Processor(object):
errors.ECODE_NOTUNIQUE)
try:
result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
result = self._LockAndExecLU(lu, level + 1, calc_timeout)
finally:
if level in lu.remove_locks:
self.context.glm.remove(level, lu.remove_locks[level])
......@@ -417,11 +420,11 @@ class Processor(object):
self.context.glm.release(level)
else:
result = self._LockAndExecLU(lu, level + 1, calc_timeout, priority)
result = self._LockAndExecLU(lu, level + 1, calc_timeout)
return result
def ExecOpCode(self, op, cbs, timeout=None, priority=None):
def ExecOpCode(self, op, cbs, timeout=None):
"""Execute an opcode.
@type op: an OpCode instance
......@@ -430,8 +433,6 @@ class Processor(object):
@param cbs: Runtime callbacks
@type timeout: float or None
@param timeout: Maximum time to acquire all locks, None for no timeout
@type priority: number or None
@param priority: Priority for acquiring lock(s)
@raise LockAcquireTimeout: In case locks couldn't be acquired in specified
amount of time
......@@ -456,8 +457,7 @@ class Processor(object):
# and in a shared fashion otherwise (to prevent concurrent run with
# an exclusive LU.
self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
not lu_class.REQ_BGL, calc_timeout(),
priority)
not lu_class.REQ_BGL, calc_timeout())
elif lu_class.REQ_BGL:
raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
" disabled" % op.OP_ID)
......@@ -468,8 +468,7 @@ class Processor(object):
assert lu.needed_locks is not None, "needed_locks not set by LU"
try:
result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout,
priority)
result = self._LockAndExecLU(lu, locking.LEVEL_INSTANCE, calc_timeout)
finally:
if self._ec_id:
self.context.cfg.DropECReservations(self._ec_id)
......
......@@ -554,13 +554,13 @@ class _FakeExecOpCodeForProc:
self._before_start = before_start
self._after_start = after_start
def __call__(self, op, cbs, timeout=None, priority=None):
def __call__(self, op, cbs, timeout=None):
assert isinstance(op, opcodes.OpTestDummy)
assert not self._queue.IsAcquired(), \
"Queue lock not released when executing opcode"
if self._before_start:
self._before_start(timeout, priority)
self._before_start(timeout, cbs.CurrentPriority())
cbs.NotifyStart()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment