From 33bbdbece57f1d57357874b86c4fe4c74a78e1e3 Mon Sep 17 00:00:00 2001 From: Michael Hanselmann <hansmi@google.com> Date: Wed, 19 Nov 2008 13:18:08 +0000 Subject: [PATCH] Make HttpClientManager threadsafe This allows a single HttpClientManager to be used from more than one thread at the same time. We discussed having one HttpClientManager per job queue thread. Assuming there should be one HTTP thread per node, this would mean quadratic growth with the number of nodes. By being able to reuse the manager, this problem is a defused a bit. This patch also fixes a typo in HttpClientRequestExecutor. Reviewed-by: iustinp --- lib/http.py | 56 +++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 48 insertions(+), 8 deletions(-) diff --git a/lib/http.py b/lib/http.py index 4defe5ed0..9acfa5416 100644 --- a/lib/http.py +++ b/lib/http.py @@ -32,6 +32,7 @@ import time import signal import logging import errno +import threading from cStringIO import StringIO @@ -836,7 +837,7 @@ class HttpClientRequestExecutor(_HttpSocketBase): send_headers = self.DEFAULT_HEADERS.copy() if self.request.headers: - send_headers.update(req.headers) + send_headers.update(self.request.headers) send_headers[HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port) @@ -1302,12 +1303,26 @@ class HttpClientRequestExecutor(_HttpSocketBase): "Timeout while shutting down connection") +class _HttpClientPendingRequest(object): + """Data class for pending requests. + + """ + def __init__(self, request): + self.request = request + + # Thread synchronization + self.done = threading.Event() + + class HttpClientWorker(workerpool.BaseWorker): """HTTP client worker class. """ - def RunTask(self, req): - HttpClientRequestExecutor(req) + def RunTask(self, pend_req): + try: + HttpClientRequestExecutor(pend_req.request) + finally: + pend_req.done.set() class HttpClientWorkerPool(workerpool.WorkerPool): @@ -1318,6 +1333,9 @@ class HttpClientWorkerPool(workerpool.WorkerPool): class HttpClientManager(object): + """Manages HTTP requests. + + """ def __init__(self): self._wpool = HttpClientWorkerPool(self) @@ -1325,16 +1343,38 @@ class HttpClientManager(object): self.Shutdown() def ExecRequests(self, requests): - # Add requests to queue - for req in requests: - self._wpool.AddTask(req) + """Execute HTTP requests. - # And wait for them to finish - self._wpool.Quiesce() + This function can be called from multiple threads at the same time. + @type requests: List of HttpClientRequest instances + @param requests: The requests to execute + @rtype: List of HttpClientRequest instances + @returns: The list of requests passed in + + """ + # _HttpClientPendingRequest is used for internal thread synchronization + pending = [_HttpClientPendingRequest(req) for req in requests] + + try: + # Add requests to queue + for pend_req in pending: + self._wpool.AddTask(pend_req) + + finally: + # In case of an exception we should still wait for the rest, otherwise + # another thread from the worker pool could modify the request object + # after we returned. + + # And wait for them to finish + for pend_req in pending: + pend_req.done.wait() + + # Return original list return requests def Shutdown(self): + self._wpool.Quiesce() self._wpool.TerminateWorkers() -- GitLab