Commit aea5caef authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

rpc/http: Show pending RPC requests in lock monitor



Not all requests use an instance of RpcRunner yet and therefore won't
show up (only instances have access to the global Ganeti context).
Currently only the IP address is accessible. Another patch will add a
nicer name for requests.

Example output (gnt-debug locks -o name,pending):
Name                      Pending
rpc/192.0.2.18/test_delay thread:Jq12/Job683/TEST_DELAY
Signed-off-by: default avatarMichael Hanselmann <hansmi@google.com>
Reviewed-by: default avatarRené Nussbaumer <rn@google.com>
parent ecd61b4e
......@@ -24,11 +24,13 @@
import logging
import pycurl
import threading
from cStringIO import StringIO
from ganeti import http
from ganeti import compat
from ganeti import netutils
from ganeti import locking
class HttpClientRequest(object):
......@@ -378,11 +380,12 @@ class HttpClientPool:
"""
return pycurl.CurlMulti()
def ProcessRequests(self, requests):
def ProcessRequests(self, requests, lock_monitor_cb=None):
"""Processes any number of HTTP client requests using pooled objects.
@type requests: list of L{HttpClientRequest}
@param requests: List of all requests
@param lock_monitor_cb: Callable for registering with lock monitor
"""
# For client cleanup
......@@ -403,12 +406,24 @@ class HttpClientPool:
assert len(curl_to_pclient) == len(requests)
if lock_monitor_cb:
monitor = _PendingRequestMonitor(threading.currentThread(),
curl_to_pclient.values)
lock_monitor_cb(monitor)
else:
monitor = _NoOpRequestMonitor
# Process all requests and act based on the returned values
for (curl, msg) in _ProcessCurlRequests(self._CreateCurlMultiHandle(),
curl_to_pclient.keys()):
pclient = curl_to_pclient[curl]
req = pclient.client.GetCurrentRequest()
pclient.client.Done(msg)
monitor.acquire(shared=0)
try:
pclient.client.Done(msg)
finally:
monitor.release()
assert ((msg is None and req.success and req.error is None) ^
(msg is not None and not req.success and req.error == msg))
......@@ -416,8 +431,15 @@ class HttpClientPool:
assert compat.all(pclient.client.GetCurrentRequest() is None
for pclient in curl_to_pclient.values())
# Return clients to pool
self._Return(curl_to_pclient.values())
monitor.acquire(shared=0)
try:
# Don't try to read information from returned clients
monitor.Disable()
# Return clients to pool
self._Return(curl_to_pclient.values())
finally:
monitor.release()
assert compat.all(req.error is not None or
(req.success and
......@@ -426,6 +448,64 @@ class HttpClientPool:
for req in requests)
class _NoOpRequestMonitor: # pylint: disable=W0232
"""No-op request monitor.
"""
@staticmethod
def acquire(*args, **kwargs):
pass
release = acquire
Disable = acquire
class _PendingRequestMonitor:
_LOCK = "_lock"
def __init__(self, owner, pending_fn):
"""Initializes this class.
"""
self._owner = owner
self._pending_fn = pending_fn
# The lock monitor runs in another thread, hence locking is necessary
self._lock = locking.SharedLock("PendingHttpRequests")
self.acquire = self._lock.acquire
self.release = self._lock.release
def Disable(self):
"""Disable monitor.
"""
self._pending_fn = None
@locking.ssynchronized(_LOCK, shared=1)
def GetLockInfo(self, requested): # pylint: disable=W0613
"""Retrieves information about pending requests.
@type requested: set
@param requested: Requested information, see C{query.LQ_*}
"""
# No need to sort here, that's being done by the lock manager and query
# library. There are no priorities for requests, hence all show up as
# one item under "pending".
result = []
if self._pending_fn:
owner_name = self._owner.getName()
for pclient in self._pending_fn():
req = pclient.client.GetCurrentRequest()
if req:
result.append(("rpc/%s%s" % (req.host, req.path), None, None,
[("thread", [owner_name])]))
return result
def _ProcessCurlRequests(multi, requests):
"""cURL request processor.
......
......@@ -368,7 +368,7 @@ def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts):
class _RpcProcessor:
def __init__(self, resolver, port):
def __init__(self, resolver, port, lock_monitor_cb=None):
"""Initializes this class.
@param resolver: callable accepting a list of hostnames, returning a list
......@@ -376,10 +376,12 @@ class _RpcProcessor:
the special value L{_OFFLINE} to mark offline machines)
@type port: int
@param port: TCP port
@param lock_monitor_cb: Callable for registering with lock monitor
"""
self._resolver = resolver
self._port = port
self._lock_monitor_cb = lock_monitor_cb
@staticmethod
def _PrepareRequests(hosts, port, procedure, body, read_timeout):
......@@ -452,7 +454,8 @@ class _RpcProcessor:
self._PrepareRequests(self._resolver(hosts), self._port, procedure,
str(body), read_timeout)
http_pool.ProcessRequests(requests.values())
http_pool.ProcessRequests(requests.values(),
lock_monitor_cb=self._lock_monitor_cb)
assert not frozenset(results).intersection(requests)
......@@ -489,7 +492,8 @@ class RpcRunner(object):
self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver,
self._cfg.GetNodeInfo,
self._cfg.GetAllNodesInfo),
netutils.GetDaemonPort(constants.NODED))
netutils.GetDaemonPort(constants.NODED),
lock_monitor_cb=context.glm.AddToLockMonitor)
def _InstDict(self, instance, hvp=None, bep=None, osp=None):
"""Convert the given instance to a dict.
......
......@@ -51,7 +51,7 @@ class FakeHttpPool:
self._response_fn = response_fn
self.reqcount = 0
def ProcessRequests(self, reqs):
def ProcessRequests(self, reqs, lock_monitor_cb=None):
for req in reqs:
self.reqcount += 1
self._response_fn(req)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment