Commit ef2df7d3 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Keep lock status with every job



This can be useful for debugging locking problems.
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarIustin Pop <iustin@google.com>
parent 031a3e57
......@@ -35,7 +35,6 @@ import collections
import signal
import logging
from cStringIO import StringIO
from optparse import OptionParser
from ganeti import config
......
......@@ -153,12 +153,13 @@ class _QueuedJob(object):
@ivar received_timestamp: the timestamp for when the job was received
@ivar start_timestmap: the timestamp for start of execution
@ivar end_timestamp: the timestamp for end of execution
@ivar lock_status: In-memory locking information for debugging
@ivar change: a Condition variable we use for waiting for job changes
"""
__slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
"received_timestamp", "start_timestamp", "end_timestamp",
"change",
"lock_status", "change",
"__weakref__"]
def __init__(self, queue, job_id, ops):
......@@ -186,6 +187,9 @@ class _QueuedJob(object):
self.start_timestamp = None
self.end_timestamp = None
# In-memory attributes
self.lock_status = None
# Condition to wait for changes
self.change = threading.Condition(self.queue._lock)
......@@ -209,6 +213,9 @@ class _QueuedJob(object):
obj.start_timestamp = state.get("start_timestamp", None)
obj.end_timestamp = state.get("end_timestamp", None)
# In-memory attributes
obj.lock_status = None
obj.ops = []
obj.log_serial = 0
for op_state in state["ops"]:
......@@ -334,7 +341,7 @@ class _QueuedJob(object):
not_marked = False
class _OpCodeExecCallbacks(mcpu.OpExecCbBase):
class _OpExecCallbacks(mcpu.OpExecCbBase):
def __init__(self, queue, job, op):
"""Initializes this class.
......@@ -368,6 +375,9 @@ class _OpCodeExecCallbacks(mcpu.OpExecCbBase):
assert self._op.status in (constants.OP_STATUS_WAITLOCK,
constants.OP_STATUS_CANCELING)
# All locks are acquired by now
self._job.lock_status = None
# Cancel here if we were asked to
if self._op.status == constants.OP_STATUS_CANCELING:
raise CancelJob()
......@@ -401,6 +411,15 @@ class _OpCodeExecCallbacks(mcpu.OpExecCbBase):
finally:
self._queue.release()
def ReportLocks(self, msg):
"""Write locking information to the job.
Called whenever the LU processor is waiting for a lock or has acquired one.
"""
# Not getting the queue lock because this is a single assignment
self._job.lock_status = msg
class _JobQueueWorker(workerpool.BaseWorker):
"""The actual job workers.
......@@ -457,7 +476,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
# Make sure not to hold queue lock while calling ExecOpCode
result = proc.ExecOpCode(input_opcode,
_OpCodeExecCallbacks(queue, job, op))
_OpExecCallbacks(queue, job, op))
queue.acquire()
try:
......@@ -505,6 +524,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
queue.acquire()
try:
try:
job.lock_status = None
job.run_op_index = -1
job.end_timestamp = TimeStampNow()
queue.UpdateJobUnlocked(job)
......@@ -513,6 +533,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
status = job.CalcStatus()
finally:
queue.release()
logging.info("Worker %s finished job %s, status = %s",
self.worker_id, job_id, status)
......@@ -1081,7 +1102,6 @@ class JobQueue(object):
return results
@_RequireOpenQueue
def UpdateJobUnlocked(self, job):
"""Update a job's on disk storage.
......
......@@ -36,6 +36,7 @@ from ganeti import errors
from ganeti import rpc
from ganeti import cmdlib
from ganeti import locking
from ganeti import utils
class OpExecCbBase:
......@@ -55,6 +56,11 @@ class OpExecCbBase:
"""
def ReportLocks(self, msg):
"""Report lock operations.
"""
class Processor(object):
"""Object which runs OpCodes"""
......@@ -128,6 +134,48 @@ class Processor(object):
self.rpc = rpc.RpcRunner(context.cfg)
self.hmclass = HooksMaster
def _ReportLocks(self, level, names, shared, acquired):
"""Reports lock operations.
@type level: int
@param level: Lock level
@type names: list or string
@param names: Lock names
@type shared: bool
@param shared: Whether the lock should be acquired in shared mode
@type acquired: bool
@param acquired: Whether the lock has already been acquired
"""
parts = []
# Build message
if acquired:
parts.append("acquired")
else:
parts.append("waiting")
parts.append(locking.LEVEL_NAMES[level])
if names == locking.ALL_SET:
parts.append("ALL")
elif isinstance(names, basestring):
parts.append(names)
else:
parts.append(",".join(names))
if shared:
parts.append("shared")
else:
parts.append("exclusive")
msg = "/".join(parts)
logging.debug("LU locks %s", msg)
if self._cbs:
self._cbs.ReportLocks(msg)
def _ExecLU(self, lu):
"""Logical Unit execution sequence.
......@@ -184,9 +232,13 @@ class Processor(object):
share = lu.share_locks[level]
if acquiring_locks:
needed_locks = lu.needed_locks[level]
self._ReportLocks(level, needed_locks, share, False)
lu.acquired_locks[level] = self.context.glm.acquire(level,
needed_locks,
shared=share)
self._ReportLocks(level, needed_locks, share, True)
else: # adding_locks
add_locks = lu.add_locks[level]
lu.remove_locks[level] = add_locks
......@@ -234,8 +286,14 @@ class Processor(object):
# Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
# shared fashion otherwise (to prevent concurrent run with an exclusive
# LU.
self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
shared=not lu_class.REQ_BGL)
self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
not lu_class.REQ_BGL, False)
try:
self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
shared=not lu_class.REQ_BGL)
finally:
self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
not lu_class.REQ_BGL, True)
try:
self.exclusive_BGL = lu_class.REQ_BGL
lu = lu_class(self, op, self.context, self.rpc)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment