Commit cdd7f900 authored by Guido Trotter's avatar Guido Trotter
Browse files

Convert ganeti-masterd's main thread to mainloop



Not much changes with this patch. The main loop for the IOServer is
repaced by mainloop.Run() and the main thread now uses asyncore to
handle connections to the master socket. Once it accepts them, though,
it just pushes them to the current infrastructure, and everything
proceeds as before.
Signed-off-by: default avatarGuido Trotter <ultrotter@google.com>
Reviewed-by: default avatarMichael Hanselmann <hansmi@google.com>
parent 18215385
......@@ -30,10 +30,10 @@ inheritance from parent classes requires it.
# C0103: Invalid name ganeti-masterd
import sys
import socket
import SocketServer
import time
import collections
import signal
import logging
from optparse import OptionParser
......@@ -65,69 +65,46 @@ class ClientRequestWorker(workerpool.BaseWorker):
def RunTask(self, server, request, client_address):
"""Process the request.
This is copied from the code in ThreadingMixIn.
"""
try:
server.finish_request(request, client_address)
server.close_request(request)
except: # pylint: disable-msg=W0702
server.handle_error(request, client_address)
server.close_request(request)
server.request_handler_class(request, client_address, server)
finally:
request.close()
class IOServer(SocketServer.UnixStreamServer):
"""IO thread class.
class MasterServer(daemon.AsyncStreamServer):
"""Master Server.
This class takes care of initializing the other threads, setting
signal handlers (which are processed only in this thread), and doing
cleanup at shutdown.
This is the main asynchronous master server. It handles connections to the
master socket.
"""
def __init__(self, address, rqhandler):
"""IOServer constructor
def __init__(self, mainloop, address, handler_class):
"""MasterServer constructor
@param address: the address to bind this IOServer to
@param rqhandler: RequestHandler type object
@type mainloop: ganeti.daemon.Mainloop
@param mainloop: Mainloop used to poll for I/O events
@param address: the unix socket address to bind the MasterServer to
@param handler_class: handler class for the connections
"""
SocketServer.UnixStreamServer.__init__(self, address, rqhandler)
daemon.AsyncStreamServer.__init__(self, socket.AF_UNIX, address)
self.request_handler_class = handler_class
self.mainloop = mainloop
# We'll only start threads once we've forked.
self.context = None
self.request_workers = None
def handle_connection(self, connected_socket, client_address):
self.request_workers.AddTask(self, connected_socket, client_address)
def setup_queue(self):
self.context = GanetiContext()
self.request_workers = workerpool.WorkerPool("ClientReq",
CLIENT_REQUEST_WORKERS,
ClientRequestWorker)
def process_request(self, request, client_address):
"""Add task to workerpool to process request.
"""
(pid, uid, gid) = utils.GetSocketCredentials(request)
logging.info("Accepted connection from pid=%s, uid=%s, gid=%s",
pid, uid, gid)
self.request_workers.AddTask(self, request, client_address)
def handle_error(self, request, client_address):
logging.exception("Error while handling request")
@utils.SignalHandled([signal.SIGINT, signal.SIGTERM])
def serve_forever(self, signal_handlers=None): # pylint: disable-msg=W0221
"""Handle one request at a time until told to quit."""
assert isinstance(signal_handlers, dict) and \
len(signal_handlers) > 0, \
"Broken SignalHandled decorator"
# Since we use SignalHandled only once, the resulting dict will map all
# signals to the same handler. We'll just use the first one.
sighandler = signal_handlers.values()[0]
while not sighandler.called:
self.handle_request()
def server_cleanup(self):
"""Cleanup the server.
......@@ -136,7 +113,7 @@ class IOServer(SocketServer.UnixStreamServer):
"""
try:
self.server_close()
self.close()
finally:
if self.request_workers:
self.request_workers.TerminateWorkers()
......@@ -528,7 +505,8 @@ def ExecMasterd (options, args): # pylint: disable-msg=W0613
# concurrent execution.
utils.RemoveFile(constants.MASTER_SOCKET)
master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
mainloop = daemon.Mainloop()
master = MasterServer(mainloop, constants.MASTER_SOCKET, ClientRqHandler)
try:
rpc.Init()
try:
......@@ -541,7 +519,7 @@ def ExecMasterd (options, args): # pylint: disable-msg=W0613
master.setup_queue()
try:
master.serve_forever()
mainloop.Run()
finally:
master.server_cleanup()
finally:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment