diff --git a/lib/http.py b/lib/http.py index 4defe5ed0bedf5ac572c1a9e97e1d08d54957e77..9acfa5416e51d846083fef731f6097e2d55512b1 100644 --- a/lib/http.py +++ b/lib/http.py @@ -32,6 +32,7 @@ import time import signal import logging import errno +import threading from cStringIO import StringIO @@ -836,7 +837,7 @@ class HttpClientRequestExecutor(_HttpSocketBase): send_headers = self.DEFAULT_HEADERS.copy() if self.request.headers: - send_headers.update(req.headers) + send_headers.update(self.request.headers) send_headers[HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port) @@ -1302,12 +1303,26 @@ class HttpClientRequestExecutor(_HttpSocketBase): "Timeout while shutting down connection") +class _HttpClientPendingRequest(object): + """Data class for pending requests. + + """ + def __init__(self, request): + self.request = request + + # Thread synchronization + self.done = threading.Event() + + class HttpClientWorker(workerpool.BaseWorker): """HTTP client worker class. """ - def RunTask(self, req): - HttpClientRequestExecutor(req) + def RunTask(self, pend_req): + try: + HttpClientRequestExecutor(pend_req.request) + finally: + pend_req.done.set() class HttpClientWorkerPool(workerpool.WorkerPool): @@ -1318,6 +1333,9 @@ class HttpClientWorkerPool(workerpool.WorkerPool): class HttpClientManager(object): + """Manages HTTP requests. + + """ def __init__(self): self._wpool = HttpClientWorkerPool(self) @@ -1325,16 +1343,38 @@ class HttpClientManager(object): self.Shutdown() def ExecRequests(self, requests): - # Add requests to queue - for req in requests: - self._wpool.AddTask(req) + """Execute HTTP requests. - # And wait for them to finish - self._wpool.Quiesce() + This function can be called from multiple threads at the same time. + @type requests: List of HttpClientRequest instances + @param requests: The requests to execute + @rtype: List of HttpClientRequest instances + @returns: The list of requests passed in + + """ + # _HttpClientPendingRequest is used for internal thread synchronization + pending = [_HttpClientPendingRequest(req) for req in requests] + + try: + # Add requests to queue + for pend_req in pending: + self._wpool.AddTask(pend_req) + + finally: + # In case of an exception we should still wait for the rest, otherwise + # another thread from the worker pool could modify the request object + # after we returned. + + # And wait for them to finish + for pend_req in pending: + pend_req.done.wait() + + # Return original list return requests def Shutdown(self): + self._wpool.Quiesce() self._wpool.TerminateWorkers()