From aea5caefb5908bfe19a981d8d7b19c116330a213 Mon Sep 17 00:00:00 2001
From: Michael Hanselmann <hansmi@google.com>
Date: Tue, 27 Sep 2011 16:43:45 +0200
Subject: [PATCH] rpc/http: Show pending RPC requests in lock monitor
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Not all requests use an instance of RpcRunner yet and therefore won't
show up (only instances have access to the global Ganeti context).
Currently only the IP address is accessible. Another patch will add a
nicer name for requests.

Example output (gnt-debug locks -o name,pending):
Name                      Pending
rpc/192.0.2.18/test_delay thread:Jq12/Job683/TEST_DELAY

Signed-off-by: Michael Hanselmann <hansmi@google.com>
Reviewed-by: RenΓ© Nussbaumer <rn@google.com>
---
 lib/http/client.py          | 88 +++++++++++++++++++++++++++++++++++--
 lib/rpc.py                  | 10 +++--
 test/ganeti.rpc_unittest.py |  2 +-
 3 files changed, 92 insertions(+), 8 deletions(-)

diff --git a/lib/http/client.py b/lib/http/client.py
index 55ccb94e6..4095bb765 100644
--- a/lib/http/client.py
+++ b/lib/http/client.py
@@ -24,11 +24,13 @@
 
 import logging
 import pycurl
+import threading
 from cStringIO import StringIO
 
 from ganeti import http
 from ganeti import compat
 from ganeti import netutils
+from ganeti import locking
 
 
 class HttpClientRequest(object):
@@ -378,11 +380,12 @@ class HttpClientPool:
     """
     return pycurl.CurlMulti()
 
-  def ProcessRequests(self, requests):
+  def ProcessRequests(self, requests, lock_monitor_cb=None):
     """Processes any number of HTTP client requests using pooled objects.
 
     @type requests: list of L{HttpClientRequest}
     @param requests: List of all requests
+    @param lock_monitor_cb: Callable for registering with lock monitor
 
     """
     # For client cleanup
@@ -403,12 +406,24 @@ class HttpClientPool:
 
     assert len(curl_to_pclient) == len(requests)
 
+    if lock_monitor_cb:
+      monitor = _PendingRequestMonitor(threading.currentThread(),
+                                       curl_to_pclient.values)
+      lock_monitor_cb(monitor)
+    else:
+      monitor = _NoOpRequestMonitor
+
     # Process all requests and act based on the returned values
     for (curl, msg) in _ProcessCurlRequests(self._CreateCurlMultiHandle(),
                                             curl_to_pclient.keys()):
       pclient = curl_to_pclient[curl]
       req = pclient.client.GetCurrentRequest()
-      pclient.client.Done(msg)
+
+      monitor.acquire(shared=0)
+      try:
+        pclient.client.Done(msg)
+      finally:
+        monitor.release()
 
       assert ((msg is None and req.success and req.error is None) ^
               (msg is not None and not req.success and req.error == msg))
@@ -416,8 +431,15 @@ class HttpClientPool:
     assert compat.all(pclient.client.GetCurrentRequest() is None
                       for pclient in curl_to_pclient.values())
 
-    # Return clients to pool
-    self._Return(curl_to_pclient.values())
+    monitor.acquire(shared=0)
+    try:
+      # Don't try to read information from returned clients
+      monitor.Disable()
+
+      # Return clients to pool
+      self._Return(curl_to_pclient.values())
+    finally:
+      monitor.release()
 
     assert compat.all(req.error is not None or
                       (req.success and
@@ -426,6 +448,64 @@ class HttpClientPool:
                       for req in requests)
 
 
+class _NoOpRequestMonitor: # pylint: disable=W0232
+  """No-op request monitor.
+
+  """
+  @staticmethod
+  def acquire(*args, **kwargs):
+    pass
+
+  release = acquire
+  Disable = acquire
+
+
+class _PendingRequestMonitor:
+  _LOCK = "_lock"
+
+  def __init__(self, owner, pending_fn):
+    """Initializes this class.
+
+    """
+    self._owner = owner
+    self._pending_fn = pending_fn
+
+    # The lock monitor runs in another thread, hence locking is necessary
+    self._lock = locking.SharedLock("PendingHttpRequests")
+    self.acquire = self._lock.acquire
+    self.release = self._lock.release
+
+  def Disable(self):
+    """Disable monitor.
+
+    """
+    self._pending_fn = None
+
+  @locking.ssynchronized(_LOCK, shared=1)
+  def GetLockInfo(self, requested): # pylint: disable=W0613
+    """Retrieves information about pending requests.
+
+    @type requested: set
+    @param requested: Requested information, see C{query.LQ_*}
+
+    """
+    # No need to sort here, that's being done by the lock manager and query
+    # library. There are no priorities for requests, hence all show up as
+    # one item under "pending".
+    result = []
+
+    if self._pending_fn:
+      owner_name = self._owner.getName()
+
+      for pclient in self._pending_fn():
+        req = pclient.client.GetCurrentRequest()
+        if req:
+          result.append(("rpc/%s%s" % (req.host, req.path), None, None,
+                         [("thread", [owner_name])]))
+
+    return result
+
+
 def _ProcessCurlRequests(multi, requests):
   """cURL request processor.
 
diff --git a/lib/rpc.py b/lib/rpc.py
index bb0344800..3525019e6 100644
--- a/lib/rpc.py
+++ b/lib/rpc.py
@@ -368,7 +368,7 @@ def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
 
 
 class _RpcProcessor:
-  def __init__(self, resolver, port):
+  def __init__(self, resolver, port, lock_monitor_cb=None):
     """Initializes this class.
 
     @param resolver: callable accepting a list of hostnames, returning a list
@@ -376,10 +376,12 @@ class _RpcProcessor:
       the special value L{_OFFLINE} to mark offline machines)
     @type port: int
     @param port: TCP port
+    @param lock_monitor_cb: Callable for registering with lock monitor
 
     """
     self._resolver = resolver
     self._port = port
+    self._lock_monitor_cb = lock_monitor_cb
 
   @staticmethod
   def _PrepareRequests(hosts, port, procedure, body, read_timeout):
@@ -452,7 +454,8 @@ class _RpcProcessor:
       self._PrepareRequests(self._resolver(hosts), self._port, procedure,
                             str(body), read_timeout)
 
-    http_pool.ProcessRequests(requests.values())
+    http_pool.ProcessRequests(requests.values(),
+                              lock_monitor_cb=self._lock_monitor_cb)
 
     assert not frozenset(results).intersection(requests)
 
@@ -489,7 +492,8 @@ class RpcRunner(object):
     self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
                                               self._cfg.GetNodeInfo,
                                               self._cfg.GetAllNodesInfo),
-                               netutils.GetDaemonPort(constants.NODED))
+                               netutils.GetDaemonPort(constants.NODED),
+                               lock_monitor_cb=context.glm.AddToLockMonitor)
 
   def _InstDict(self, instance, hvp=None, bep=None, osp=None):
     """Convert the given instance to a dict.
diff --git a/test/ganeti.rpc_unittest.py b/test/ganeti.rpc_unittest.py
index 9ed0628dd..6dee12ed8 100755
--- a/test/ganeti.rpc_unittest.py
+++ b/test/ganeti.rpc_unittest.py
@@ -51,7 +51,7 @@ class FakeHttpPool:
     self._response_fn = response_fn
     self.reqcount = 0
 
-  def ProcessRequests(self, reqs):
+  def ProcessRequests(self, reqs, lock_monitor_cb=None):
     for req in reqs:
       self.reqcount += 1
       self._response_fn(req)
-- 
GitLab