diff --git a/lib/http/client.py b/lib/http/client.py index 55ccb94e6e426eea56d9e4652da4e8ce5b7ef886..4095bb7651bb53a2bfd0cb27bd6aa25f9f67e57e 100644 --- a/lib/http/client.py +++ b/lib/http/client.py @@ -24,11 +24,13 @@ import logging import pycurl +import threading from cStringIO import StringIO from ganeti import http from ganeti import compat from ganeti import netutils +from ganeti import locking class HttpClientRequest(object): @@ -378,11 +380,12 @@ class HttpClientPool: """ return pycurl.CurlMulti() - def ProcessRequests(self, requests): + def ProcessRequests(self, requests, lock_monitor_cb=None): """Processes any number of HTTP client requests using pooled objects. @type requests: list of L{HttpClientRequest} @param requests: List of all requests + @param lock_monitor_cb: Callable for registering with lock monitor """ # For client cleanup @@ -403,12 +406,24 @@ class HttpClientPool: assert len(curl_to_pclient) == len(requests) + if lock_monitor_cb: + monitor = _PendingRequestMonitor(threading.currentThread(), + curl_to_pclient.values) + lock_monitor_cb(monitor) + else: + monitor = _NoOpRequestMonitor + # Process all requests and act based on the returned values for (curl, msg) in _ProcessCurlRequests(self._CreateCurlMultiHandle(), curl_to_pclient.keys()): pclient = curl_to_pclient[curl] req = pclient.client.GetCurrentRequest() - pclient.client.Done(msg) + + monitor.acquire(shared=0) + try: + pclient.client.Done(msg) + finally: + monitor.release() assert ((msg is None and req.success and req.error is None) ^ (msg is not None and not req.success and req.error == msg)) @@ -416,8 +431,15 @@ class HttpClientPool: assert compat.all(pclient.client.GetCurrentRequest() is None for pclient in curl_to_pclient.values()) - # Return clients to pool - self._Return(curl_to_pclient.values()) + monitor.acquire(shared=0) + try: + # Don't try to read information from returned clients + monitor.Disable() + + # Return clients to pool + self._Return(curl_to_pclient.values()) + finally: + monitor.release() assert compat.all(req.error is not None or (req.success and @@ -426,6 +448,64 @@ class HttpClientPool: for req in requests) +class _NoOpRequestMonitor: # pylint: disable=W0232 + """No-op request monitor. + + """ + @staticmethod + def acquire(*args, **kwargs): + pass + + release = acquire + Disable = acquire + + +class _PendingRequestMonitor: + _LOCK = "_lock" + + def __init__(self, owner, pending_fn): + """Initializes this class. + + """ + self._owner = owner + self._pending_fn = pending_fn + + # The lock monitor runs in another thread, hence locking is necessary + self._lock = locking.SharedLock("PendingHttpRequests") + self.acquire = self._lock.acquire + self.release = self._lock.release + + def Disable(self): + """Disable monitor. + + """ + self._pending_fn = None + + @locking.ssynchronized(_LOCK, shared=1) + def GetLockInfo(self, requested): # pylint: disable=W0613 + """Retrieves information about pending requests. + + @type requested: set + @param requested: Requested information, see C{query.LQ_*} + + """ + # No need to sort here, that's being done by the lock manager and query + # library. There are no priorities for requests, hence all show up as + # one item under "pending". + result = [] + + if self._pending_fn: + owner_name = self._owner.getName() + + for pclient in self._pending_fn(): + req = pclient.client.GetCurrentRequest() + if req: + result.append(("rpc/%s%s" % (req.host, req.path), None, None, + [("thread", [owner_name])])) + + return result + + def _ProcessCurlRequests(multi, requests): """cURL request processor. diff --git a/lib/rpc.py b/lib/rpc.py index bb03448002eacac34acfe60482a280aa46994ff2..3525019e6a932c4c9f3209edc4d03b9654724368 100644 --- a/lib/rpc.py +++ b/lib/rpc.py @@ -368,7 +368,7 @@ def _NodeConfigResolver(single_node_fn, all_nodes_fn, hosts): class _RpcProcessor: - def __init__(self, resolver, port): + def __init__(self, resolver, port, lock_monitor_cb=None): """Initializes this class. @param resolver: callable accepting a list of hostnames, returning a list @@ -376,10 +376,12 @@ class _RpcProcessor: the special value L{_OFFLINE} to mark offline machines) @type port: int @param port: TCP port + @param lock_monitor_cb: Callable for registering with lock monitor """ self._resolver = resolver self._port = port + self._lock_monitor_cb = lock_monitor_cb @staticmethod def _PrepareRequests(hosts, port, procedure, body, read_timeout): @@ -452,7 +454,8 @@ class _RpcProcessor: self._PrepareRequests(self._resolver(hosts), self._port, procedure, str(body), read_timeout) - http_pool.ProcessRequests(requests.values()) + http_pool.ProcessRequests(requests.values(), + lock_monitor_cb=self._lock_monitor_cb) assert not frozenset(results).intersection(requests) @@ -489,7 +492,8 @@ class RpcRunner(object): self._proc = _RpcProcessor(compat.partial(_NodeConfigResolver, self._cfg.GetNodeInfo, self._cfg.GetAllNodesInfo), - netutils.GetDaemonPort(constants.NODED)) + netutils.GetDaemonPort(constants.NODED), + lock_monitor_cb=context.glm.AddToLockMonitor) def _InstDict(self, instance, hvp=None, bep=None, osp=None): """Convert the given instance to a dict. diff --git a/test/ganeti.rpc_unittest.py b/test/ganeti.rpc_unittest.py index 9ed0628ddc4d07e8003b9498574caf64f8bc86d5..6dee12ed8467b45790e4ef5f06db01b24b4e0ac8 100755 --- a/test/ganeti.rpc_unittest.py +++ b/test/ganeti.rpc_unittest.py @@ -51,7 +51,7 @@ class FakeHttpPool: self._response_fn = response_fn self.reqcount = 0 - def ProcessRequests(self, reqs): + def ProcessRequests(self, reqs, lock_monitor_cb=None): for req in reqs: self.reqcount += 1 self._response_fn(req)