From ef2df7d3a6b6fbb880d5fded70ae7cd3f201735c Mon Sep 17 00:00:00 2001
From: Michael Hanselmann <hansmi@google.com>
Date: Tue, 15 Sep 2009 13:00:41 +0200
Subject: [PATCH] Keep lock status with every job

This can be useful for debugging locking problems.

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: Iustin Pop <iustin@google.com>
---
 daemons/ganeti-masterd |  1 -
 lib/jqueue.py          | 28 ++++++++++++++++---
 lib/mcpu.py            | 62 ++++++++++++++++++++++++++++++++++++++++--
 3 files changed, 84 insertions(+), 7 deletions(-)

diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd
index b2e5bc15c..f61811f66 100755
--- a/daemons/ganeti-masterd
+++ b/daemons/ganeti-masterd
@@ -35,7 +35,6 @@ import collections
 import signal
 import logging
 
-from cStringIO import StringIO
 from optparse import OptionParser
 
 from ganeti import config
diff --git a/lib/jqueue.py b/lib/jqueue.py
index 71a6ad705..4bb339604 100644
--- a/lib/jqueue.py
+++ b/lib/jqueue.py
@@ -153,12 +153,13 @@ class _QueuedJob(object):
   @ivar received_timestamp: the timestamp for when the job was received
   @ivar start_timestmap: the timestamp for start of execution
   @ivar end_timestamp: the timestamp for end of execution
+  @ivar lock_status: In-memory locking information for debugging
   @ivar change: a Condition variable we use for waiting for job changes
 
   """
   __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
                "received_timestamp", "start_timestamp", "end_timestamp",
-               "change",
+               "lock_status", "change",
                "__weakref__"]
 
   def __init__(self, queue, job_id, ops):
@@ -186,6 +187,9 @@ class _QueuedJob(object):
     self.start_timestamp = None
     self.end_timestamp = None
 
+    # In-memory attributes
+    self.lock_status = None
+
     # Condition to wait for changes
     self.change = threading.Condition(self.queue._lock)
 
@@ -209,6 +213,9 @@ class _QueuedJob(object):
     obj.start_timestamp = state.get("start_timestamp", None)
     obj.end_timestamp = state.get("end_timestamp", None)
 
+    # In-memory attributes
+    obj.lock_status = None
+
     obj.ops = []
     obj.log_serial = 0
     for op_state in state["ops"]:
@@ -334,7 +341,7 @@ class _QueuedJob(object):
       not_marked = False
 
 
-class _OpCodeExecCallbacks(mcpu.OpExecCbBase):
+class _OpExecCallbacks(mcpu.OpExecCbBase):
   def __init__(self, queue, job, op):
     """Initializes this class.
 
@@ -368,6 +375,9 @@ class _OpCodeExecCallbacks(mcpu.OpExecCbBase):
       assert self._op.status in (constants.OP_STATUS_WAITLOCK,
                                  constants.OP_STATUS_CANCELING)
 
+      # All locks are acquired by now
+      self._job.lock_status = None
+
       # Cancel here if we were asked to
       if self._op.status == constants.OP_STATUS_CANCELING:
         raise CancelJob()
@@ -401,6 +411,15 @@ class _OpCodeExecCallbacks(mcpu.OpExecCbBase):
     finally:
       self._queue.release()
 
+  def ReportLocks(self, msg):
+    """Write locking information to the job.
+
+    Called whenever the LU processor is waiting for a lock or has acquired one.
+
+    """
+    # Not getting the queue lock because this is a single assignment
+    self._job.lock_status = msg
+
 
 class _JobQueueWorker(workerpool.BaseWorker):
   """The actual job workers.
@@ -457,7 +476,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
 
             # Make sure not to hold queue lock while calling ExecOpCode
             result = proc.ExecOpCode(input_opcode,
-                                     _OpCodeExecCallbacks(queue, job, op))
+                                     _OpExecCallbacks(queue, job, op))
 
             queue.acquire()
             try:
@@ -505,6 +524,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
       queue.acquire()
       try:
         try:
+          job.lock_status = None
           job.run_op_index = -1
           job.end_timestamp = TimeStampNow()
           queue.UpdateJobUnlocked(job)
@@ -513,6 +533,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
           status = job.CalcStatus()
       finally:
         queue.release()
+
       logging.info("Worker %s finished job %s, status = %s",
                    self.worker_id, job_id, status)
 
@@ -1081,7 +1102,6 @@ class JobQueue(object):
 
     return results
 
-
   @_RequireOpenQueue
   def UpdateJobUnlocked(self, job):
     """Update a job's on disk storage.
diff --git a/lib/mcpu.py b/lib/mcpu.py
index a69f6dc5c..e4ea09d9a 100644
--- a/lib/mcpu.py
+++ b/lib/mcpu.py
@@ -36,6 +36,7 @@ from ganeti import errors
 from ganeti import rpc
 from ganeti import cmdlib
 from ganeti import locking
+from ganeti import utils
 
 
 class OpExecCbBase:
@@ -55,6 +56,11 @@ class OpExecCbBase:
 
     """
 
+  def ReportLocks(self, msg):
+    """Report lock operations.
+
+    """
+
 
 class Processor(object):
   """Object which runs OpCodes"""
@@ -128,6 +134,48 @@ class Processor(object):
     self.rpc = rpc.RpcRunner(context.cfg)
     self.hmclass = HooksMaster
 
+  def _ReportLocks(self, level, names, shared, acquired):
+    """Reports lock operations.
+
+    @type level: int
+    @param level: Lock level
+    @type names: list or string
+    @param names: Lock names
+    @type shared: bool
+    @param shared: Whether the lock should be acquired in shared mode
+    @type acquired: bool
+    @param acquired: Whether the lock has already been acquired
+
+    """
+    parts = []
+
+    # Build message
+    if acquired:
+      parts.append("acquired")
+    else:
+      parts.append("waiting")
+
+    parts.append(locking.LEVEL_NAMES[level])
+
+    if names == locking.ALL_SET:
+      parts.append("ALL")
+    elif isinstance(names, basestring):
+      parts.append(names)
+    else:
+      parts.append(",".join(names))
+
+    if shared:
+      parts.append("shared")
+    else:
+      parts.append("exclusive")
+
+    msg = "/".join(parts)
+
+    logging.debug("LU locks %s", msg)
+
+    if self._cbs:
+      self._cbs.ReportLocks(msg)
+
   def _ExecLU(self, lu):
     """Logical Unit execution sequence.
 
@@ -184,9 +232,13 @@ class Processor(object):
       share = lu.share_locks[level]
       if acquiring_locks:
         needed_locks = lu.needed_locks[level]
+
+        self._ReportLocks(level, needed_locks, share, False)
         lu.acquired_locks[level] = self.context.glm.acquire(level,
                                                             needed_locks,
                                                             shared=share)
+        self._ReportLocks(level, needed_locks, share, True)
+
       else: # adding_locks
         add_locks = lu.add_locks[level]
         lu.remove_locks[level] = add_locks
@@ -234,8 +286,14 @@ class Processor(object):
       # Acquire the Big Ganeti Lock exclusively if this LU requires it, and in a
       # shared fashion otherwise (to prevent concurrent run with an exclusive
       # LU.
-      self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
-                               shared=not lu_class.REQ_BGL)
+      self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
+                        not lu_class.REQ_BGL, False)
+      try:
+        self.context.glm.acquire(locking.LEVEL_CLUSTER, [locking.BGL],
+                                 shared=not lu_class.REQ_BGL)
+      finally:
+        self._ReportLocks(locking.LEVEL_CLUSTER, [locking.BGL],
+                          not lu_class.REQ_BGL, True)
       try:
         self.exclusive_BGL = lu_class.REQ_BGL
         lu = lu_class(self, op, self.context, self.rpc)
-- 
GitLab