Skip to content
Snippets Groups Projects
Commit 23e50d39 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Use workerpool in master daemon

Reusing threads instead of starting one for each request is more efficient.

Reviewed-by: iustinp
parent d4104181
No related branches found
No related tags found
No related merge requests found
......@@ -29,7 +29,6 @@ inheritance from parent classes requires it.
import sys
import SocketServer
import threading
import time
import collections
import Queue
......@@ -52,12 +51,30 @@ from ganeti import utils
from ganeti import errors
from ganeti import ssconf
from ganeti import logger
from ganeti import workerpool
CLIENT_REQUEST_WORKERS = 16
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
class ClientRequestWorker(workerpool.BaseWorker):
def RunTask(self, server, request, client_address):
"""Process the request.
This is copied from the code in ThreadingMixIn.
"""
try:
server.finish_request(request, client_address)
server.close_request(request)
except:
server.handle_error(request, client_address)
server.close_request(request)
class IOServer(SocketServer.UnixStreamServer):
"""IO thread class.
......@@ -81,35 +98,21 @@ class IOServer(SocketServer.UnixStreamServer):
# We'll only start threads once we've forked.
self.jobqueue = None
self.request_workers = None
signal.signal(signal.SIGINT, self.handle_quit_signals)
signal.signal(signal.SIGTERM, self.handle_quit_signals)
def setup_queue(self):
self.jobqueue = jqueue.JobQueue(self.context)
def process_request_thread(self, request, client_address):
"""Process the request.
This is copied from the code in ThreadingMixIn.
"""
try:
self.finish_request(request, client_address)
self.close_request(request)
except:
self.handle_error(request, client_address)
self.close_request(request)
self.request_workers = workerpool.WorkerPool(CLIENT_REQUEST_WORKERS,
ClientRequestWorker)
def process_request(self, request, client_address):
"""Start a new thread to process the request.
This is copied from the coode in ThreadingMixIn.
"""Add task to workerpool to process request.
"""
t = threading.Thread(target=self.process_request_thread,
args=(request, client_address))
t.start()
self.request_workers.AddTask(self, request, client_address)
def handle_quit_signals(self, signum, frame):
print "received %s in %s" % (signum, frame)
......@@ -132,6 +135,8 @@ class IOServer(SocketServer.UnixStreamServer):
self.server_close()
utils.RemoveFile(constants.MASTER_SOCKET)
finally:
if self.request_workers:
self.request_workers.Shutdown()
if self.jobqueue:
self.jobqueue.Shutdown()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment