From c1f2901b6ed51af22aa3d52f272155e3e290a786 Mon Sep 17 00:00:00 2001 From: Iustin Pop <iustin@google.com> Date: Sat, 5 Apr 2008 15:29:36 +0000 Subject: [PATCH] Implement forking/master role checking in masterd This patch adds checks for the master role and daemonize support to ganeti-masterd. The patch modifies the startup/shutdown of the server because: - we want bind()/listen() to the master socket to occur before forking so that we can return a correct exit code and write messages to stderr - but we want thread startup to occur after fork(), otherwise python threading gets confused The patch also has some small cleanups: - remove the unix socket after closing it, so we don't need to remove it manually - instead of just telling the threads to terminate via the new_queue, we also join() them so that the logs show what thread clinging to life - the daemon logs to its own logfile now - there is command line parameter support :) Reviewed-by: imsnah --- daemons/ganeti-masterd | 106 ++++++++++++++++++++++++++++++++++++++--- lib/constants.py | 1 + 2 files changed, 101 insertions(+), 6 deletions(-) diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd index b18a8bb2b..24fec0ead 100644 --- a/daemons/ganeti-masterd +++ b/daemons/ganeti-masterd @@ -27,6 +27,7 @@ inheritance from parent classes requires it. """ +import sys import SocketServer import threading import time @@ -38,6 +39,7 @@ import simplejson from cStringIO import StringIO +from optparse import OptionParser from ganeti import constants from ganeti import mcpu @@ -45,6 +47,12 @@ from ganeti import opcodes from ganeti import jqueue from ganeti import luxi from ganeti import utils +from ganeti import errors +from ganeti import ssconf + + +EXIT_NOTMASTER = constants.EXIT_NOTMASTER +EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR class IOServer(SocketServer.UnixStreamServer): @@ -62,12 +70,22 @@ class IOServer(SocketServer.UnixStreamServer): self.do_quit = False self.queue = jqueue.QueueManager() self.processors = [] + signal.signal(signal.SIGINT, self.handle_quit_signals) + signal.signal(signal.SIGTERM, self.handle_quit_signals) + + def setup_processors(self): + """Spawn the processors threads. + + This initializes the queue and the thread processors. It is done + separately from the constructor because we want the clone() + syscalls to happen after the daemonize part. + + """ for i in range(self.QUEUE_PROCESSOR_SIZE): self.processors.append(threading.Thread(target=PoolWorker, args=(i, self.queue.new_queue))) for t in self.processors: t.start() - signal.signal(signal.SIGINT, self.handle_sigint) def process_request_thread(self, request, client_address): """Process the request. @@ -92,17 +110,31 @@ class IOServer(SocketServer.UnixStreamServer): args=(request, client_address)) t.start() - def handle_sigint(self, signum, frame): + def handle_quit_signals(self, signum, frame): print "received %s in %s" % (signum, frame) self.do_quit = True - self.server_close() - for i in range(self.QUEUE_PROCESSOR_SIZE): - self.queue.new_queue.put(None) def serve_forever(self): """Handle one request at a time until told to quit.""" while not self.do_quit: self.handle_request() + print "served request, quit=%s" % (self.do_quit) + + def server_cleanup(self): + """Cleanup the server. + + This involves shutting down the processor threads and the master + socket. + + """ + self.server_close() + utils.RemoveFile(constants.MASTER_SOCKET) + for i in range(self.QUEUE_PROCESSOR_SIZE): + self.queue.new_queue.put(None) + for idx, t in enumerate(self.processors): + print "waiting for processor thread %s..." % idx + t.join() + print "done threads" class ClientRqHandler(SocketServer.BaseRequestHandler): @@ -235,12 +267,74 @@ def PoolWorker(worker_id, incoming_queue): print "worker %s exiting" % worker_id +def CheckMaster(debug): + """Checks the node setup. + + If this is the master, the function will return. Otherwise it will + exit with an exit code based on the node status. + + """ + try: + ss = ssconf.SimpleStore() + master_name = ss.GetMasterNode() + except errors.ConfigurationError, err: + print "Cluster configuration incomplete: '%s'" % str(err) + sys.exit(EXIT_NODESETUP_ERROR) + + try: + myself = utils.HostInfo() + except errors.ResolverError, err: + sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0]) + sys.exit(EXIT_NODESETUP_ERROR) + + if myself.name != master_name: + if debug: + sys.stderr.write("Not master, exiting.\n") + sys.exit(EXIT_NOTMASTER) + + +def ParseOptions(): + """Parse the command line options. + + Returns: + (options, args) as from OptionParser.parse_args() + + """ + parser = OptionParser(description="Ganeti master daemon", + usage="%prog [-f] [-d]", + version="%%prog (ganeti) %s" % + constants.RELEASE_VERSION) + + parser.add_option("-f", "--foreground", dest="fork", + help="Don't detach from the current terminal", + default=True, action="store_false") + parser.add_option("-d", "--debug", dest="debug", + help="Enable some debug messages", + default=False, action="store_true") + options, args = parser.parse_args() + return options, args + + def main(): """Main function""" + options, args = ParseOptions() + utils.debug = options.debug + + CheckMaster(options.debug) + master = IOServer(constants.MASTER_SOCKET, ClientRqHandler) - master.serve_forever() + # become a daemon + if options.fork: + utils.Daemonize(logfile=constants.LOG_MASTERDAEMON, + noclose_fds=[master.fileno()]) + + master.setup_processors() + try: + master.serve_forever() + finally: + master.server_cleanup() if __name__ == "__main__": main() diff --git a/lib/constants.py b/lib/constants.py index 1b9cb32de..d27f18d7c 100644 --- a/lib/constants.py +++ b/lib/constants.py @@ -54,6 +54,7 @@ LOG_DIR = _autoconf.LOCALSTATEDIR + "/log/ganeti" LOG_OS_DIR = LOG_DIR + "/os" LOG_NODESERVER = LOG_DIR + "/node-daemon.log" LOG_WATCHER = LOG_DIR + "/watcher.log" +LOG_MASTERDAEMON = LOG_DIR + "/master-daemon.log" OS_SEARCH_PATH = _autoconf.OS_SEARCH_PATH EXPORT_DIR = _autoconf.EXPORT_DIR -- GitLab