diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd
index b2e5bc15ca71be0c7f84d6cd355fda268b0e6276..f61811f6604937ad84d88f396b2bb99a4b2c4565 100755
--- a/daemons/ganeti-masterd
+++ b/daemons/ganeti-masterd
@@ -35,7 +35,6 @@ import collections
import signal
import logging
-from cStringIO import StringIO
from optparse import OptionParser
from ganeti import config
diff --git a/lib/jqueue.py b/lib/jqueue.py
index 71a6ad7056e26f24a7dacc5a7d682f256853a0fb..4bb339604765792655ac9291bec7198c766c8d4b 100644
--- a/lib/jqueue.py
+++ b/lib/jqueue.py
@@ -153,12 +153,13 @@ class _QueuedJob(object):
@ivar received_timestamp: the timestamp for when the job was received
@ivar start_timestmap: the timestamp for start of execution
@ivar end_timestamp: the timestamp for end of execution
+ @ivar lock_status: In-memory locking information for debugging
@ivar change: a Condition variable we use for waiting for job changes
"""
__slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
"received_timestamp", "start_timestamp", "end_timestamp",
- "change",
+ "lock_status", "change",
"__weakref__"]
def __init__(self, queue, job_id, ops):
@@ -186,6 +187,9 @@ class _QueuedJob(object):
self.start_timestamp = None
self.end_timestamp = None
+ # In-memory attributes
+ self.lock_status = None
+
# Condition to wait for changes
self.change = threading.Condition(self.queue._lock)
@@ -209,6 +213,9 @@ class _QueuedJob(object):
obj.start_timestamp = state.get("start_timestamp", None)
obj.end_timestamp = state.get("end_timestamp", None)
+ # In-memory attributes
+ obj.lock_status = None
+
obj.ops = []
obj.log_serial = 0
for op_state in state["ops"]:
@@ -334,7 +341,7 @@ class _QueuedJob(object):
not_marked = False
-class _OpCodeExecCallbacks(mcpu.OpExecCbBase):
+class _OpExecCallbacks(mcpu.OpExecCbBase):
def __init__(self, queue, job, op):
"""Initializes this class.
@@ -368,6 +375,9 @@ class _OpCodeExecCallbacks(mcpu.OpExecCbBase):
assert self._op.status in (constants.OP_STATUS_WAITLOCK,
constants.OP_STATUS_CANCELING)
+ # All locks are acquired by now
+ self._job.lock_status = None
+
# Cancel here if we were asked to
if self._op.status == constants.OP_STATUS_CANCELING:
raise CancelJob()
@@ -401,6 +411,15 @@ class _OpCodeExecCallbacks(mcpu.OpExecCbBase):
finally:
self._queue.release()
+ def ReportLocks(self, msg):
+ """Write locking information to the job.
+
+ Called whenever the LU processor is waiting for a lock or has acquired one.
+
+ """
+ # Not getting the queue lock because this is a single assignment
+ self._job.lock_status = msg
+
class _JobQueueWorker(workerpool.BaseWorker):
"""The actual job workers.
@@ -457,7 +476,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
# Make sure not to hold queue lock while calling ExecOpCode
result = proc.ExecOpCode(input_opcode,
- _OpCodeExecCallbacks(queue, job, op))
+ _OpExecCallbacks(queue, job, op))
queue.acquire()
try:
@@ -505,6 +524,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
queue.acquire()
try:
try:
+ job.lock_status = None
job.run_op_index = -1
job.end_timestamp = TimeStampNow()
queue.UpdateJobUnlocked(job)
@@ -513,6 +533,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
status = job.CalcStatus()
finally:
queue.release()
+
logging.info("Worker %s finished job %s, status = %s",
self.worker_id, job_id, status)
@@ -1081,7 +1102,6 @@ class JobQueue(object):
return results
-
@_RequireOpenQueue
def UpdateJobUnlocked(self, job):
"""Update a job's on disk storage.
diff --git a/lib/mcpu.py b/lib/mcpu.py
index a69f6dc5c0674a44fafbd9d73b62bf8eeb399959..e4ea09d9ad31cbe1e8333a9a196a14cc2bb60651 100644
--- a/lib/mcpu.py
+++ b/lib/mcpu.py
@@ -36,6 +36,7 @@ from ganeti import errors
from ganeti import rpc
from ganeti import cmdlib
from ganeti import locking
+from ganeti import utils
class OpExecCbBase:
@@ -55,6 +56,11 @@ class OpExecCbBase:
"""
+ def ReportLocks(self, msg):
+ """Report lock operations.
+
+ """
+
class Processor(object):
"""Object which runs OpCodes"""
@@ -128,6 +134,48 @@ class Processor(object):
self.rpc = rpc.RpcRunner(context.cfg)
self.hmclass = HooksMaster
+ def _ReportLocks(self, level, names, shared, acquired):
+ """Reports lock operations.
+
+ @type level: int
+ @param level: Lock level
+ @type names: list or string
+ @param names: Lock names
+ @type shared: bool
+ @param shared: Whether the lock should be acquired in shared mode
+ @type acquired: bool
+ @param acquired: Whether the lock has already been acquired
+
+ """
+ parts = []
+
+ # Build message
+ if acquired:
+ parts.append("acquired")
+ else:
+ parts.append("waiting")
+
+ parts.append(locking.LEVEL_NAMES[level])
+
+ if names == locking.ALL_SET:
+ parts.append("ALL")
+ elif isinstance(names, basestring):
+ parts.append(names)
+ else:
+ parts.append(",".join(names))
+
+ if shared:
+ parts.append("shared")
+ else:
+ parts.append("exclusive")
+
+ msg = "/".join(parts)
+
+ logging.debug("LU locks %s", msg)
+
+ if self._cbs:
+ self._cbs.ReportLocks(msg)
+
def _ExecLU(self, lu):
"""Logical Unit execution sequence.
@@ -184,9 +232,13 @@ class Processor(object):
share = lu.share_locks[level]
if acquiring_locks:
needed_locks = lu.needed_locks[level]
+
+ self._ReportLocks(level, needed_locks, share, False)
lu.acquired_locks[level] = self.context.glm.acquire(level,
needed_locks,
shared=share)
+ self._ReportLocks(level, needed_locks, share, True)
+
else: # adding_locks
add_locks = lu.add_locks[level]
lu.remove_locks[level] = add_locks
@@ -234,8 +286,14 @@ class Processor(object):
# Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
# shared fashion otherwise (to prevent concurrent run with an exclusive
# LU.
- self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
- shared=not lu_class.REQ_BGL)
+ self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
+ not lu_class.REQ_BGL, False)
+ try:
+ self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
+ shared=not lu_class.REQ_BGL)
+ finally:
+ self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
+ not lu_class.REQ_BGL, True)
try:
self.exclusive_BGL = lu_class.REQ_BGL
lu = lu_class(self, op, self.context, self.rpc)