Commit 50a3fbb2 authored by Michael Hanselmann's avatar Michael Hanselmann
Browse files

Instantiate new job queue in master daemon

Reviewed-by: iustinp
parent e2715f69
......@@ -82,9 +82,16 @@ class IOServer(SocketServer.UnixStreamServer):
self.queue = jqueue.QueueManager()
self.context = context
self.processors = []
# We'll only start threads once we've forked.
self.jobqueue = None
signal.signal(signal.SIGINT, self.handle_quit_signals)
signal.signal(signal.SIGTERM, self.handle_quit_signals)
def setup_queue(self):
self.jobqueue = jqueue.JobQueue(self.context)
def setup_processors(self):
"""Spawn the processors threads.
......@@ -140,14 +147,18 @@ class IOServer(SocketServer.UnixStreamServer):
socket.
"""
self.server_close()
utils.RemoveFile(constants.MASTER_SOCKET)
for i in range(self.QUEUE_PROCESSOR_SIZE):
self.queue.new_queue.put(None)
for idx, t in enumerate(self.processors):
logging.debug("waiting for processor thread %s...", idx)
t.join()
logging.debug("threads done")
try:
self.server_close()
utils.RemoveFile(constants.MASTER_SOCKET)
for i in range(self.QUEUE_PROCESSOR_SIZE):
self.queue.new_queue.put(None)
for idx, t in enumerate(self.processors):
logging.debug("waiting for processor thread %s...", idx)
t.join()
logging.debug("threads done")
finally:
if self.jobqueue:
self.jobqueue.Shutdown()
class ClientRqHandler(SocketServer.BaseRequestHandler):
......@@ -419,6 +430,7 @@ def main():
try:
master.setup_processors()
master.setup_queue()
try:
master.serve_forever()
finally:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment