Commit 33bbdbec authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Make HttpClientManager threadsafe

This allows a single HttpClientManager to be used from more than one
thread at the same time. We discussed having one HttpClientManager
per job queue thread. Assuming there should be one HTTP thread per
node, this would mean quadratic growth with the number of nodes. By
being able to reuse the manager, this problem is a defused a bit.

This patch also fixes a typo in HttpClientRequestExecutor.

Reviewed-by: iustinp
parent 7c46aafd
......@@ -32,6 +32,7 @@ import time
import signal
import logging
import errno
import threading
from cStringIO import StringIO
......@@ -836,7 +837,7 @@ class HttpClientRequestExecutor(_HttpSocketBase):
send_headers = self.DEFAULT_HEADERS.copy()
if self.request.headers:
send_headers.update(req.headers)
send_headers.update(self.request.headers)
send_headers[HTTP_HOST] = "%s:%s" % (self.request.host, self.request.port)
......@@ -1302,12 +1303,26 @@ class HttpClientRequestExecutor(_HttpSocketBase):
"Timeout while shutting down connection")
class _HttpClientPendingRequest(object):
"""Data class for pending requests.
"""
def __init__(self, request):
self.request = request
# Thread synchronization
self.done = threading.Event()
class HttpClientWorker(workerpool.BaseWorker):
"""HTTP client worker class.
"""
def RunTask(self, req):
HttpClientRequestExecutor(req)
def RunTask(self, pend_req):
try:
HttpClientRequestExecutor(pend_req.request)
finally:
pend_req.done.set()
class HttpClientWorkerPool(workerpool.WorkerPool):
......@@ -1318,6 +1333,9 @@ class HttpClientWorkerPool(workerpool.WorkerPool):
class HttpClientManager(object):
"""Manages HTTP requests.
"""
def __init__(self):
self._wpool = HttpClientWorkerPool(self)
......@@ -1325,16 +1343,38 @@ class HttpClientManager(object):
self.Shutdown()
def ExecRequests(self, requests):
# Add requests to queue
for req in requests:
self._wpool.AddTask(req)
"""Execute HTTP requests.
# And wait for them to finish
self._wpool.Quiesce()
This function can be called from multiple threads at the same time.
@type requests: List of HttpClientRequest instances
@param requests: The requests to execute
@rtype: List of HttpClientRequest instances
@returns: The list of requests passed in
"""
# _HttpClientPendingRequest is used for internal thread synchronization
pending = [_HttpClientPendingRequest(req) for req in requests]
try:
# Add requests to queue
for pend_req in pending:
self._wpool.AddTask(pend_req)
finally:
# In case of an exception we should still wait for the rest, otherwise
# another thread from the worker pool could modify the request object
# after we returned.
# And wait for them to finish
for pend_req in pending:
pend_req.done.wait()
# Return original list
return requests
def Shutdown(self):
self._wpool.Quiesce()
self._wpool.TerminateWorkers()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment