Commit c1f2901b authored by Iustin Pop's avatar Iustin Pop
Browse files

Implement forking/master role checking in masterd

This patch adds checks for the master role and daemonize support to
ganeti-masterd.

The patch modifies the startup/shutdown of the server because:
  - we want bind()/listen() to the master socket to occur before forking
    so that we can return a correct exit code and write messages to
    stderr
  - but we want thread startup to occur after fork(), otherwise python
    threading gets confused

The patch also has some small cleanups:
  - remove the unix socket after closing it, so we don't need to remove
    it manually
  - instead of just telling the threads to terminate via the new_queue,
    we also join() them so that the logs show what thread clinging to
    life
  - the daemon logs to its own logfile now
  - there is command line parameter support :)

Reviewed-by: imsnah
parent 6f695a2e
......@@ -27,6 +27,7 @@ inheritance from parent classes requires it.
"""
import sys
import SocketServer
import threading
import time
......@@ -38,6 +39,7 @@ import simplejson
from cStringIO import StringIO
from optparse import OptionParser
from ganeti import constants
from ganeti import mcpu
......@@ -45,6 +47,12 @@ from ganeti import opcodes
from ganeti import jqueue
from ganeti import luxi
from ganeti import utils
from ganeti import errors
from ganeti import ssconf
EXIT_NOTMASTER = constants.EXIT_NOTMASTER
EXIT_NODESETUP_ERROR = constants.EXIT_NODESETUP_ERROR
class IOServer(SocketServer.UnixStreamServer):
......@@ -62,12 +70,22 @@ class IOServer(SocketServer.UnixStreamServer):
self.do_quit = False
self.queue = jqueue.QueueManager()
self.processors = []
signal.signal(signal.SIGINT, self.handle_quit_signals)
signal.signal(signal.SIGTERM, self.handle_quit_signals)
def setup_processors(self):
"""Spawn the processors threads.
This initializes the queue and the thread processors. It is done
separately from the constructor because we want the clone()
syscalls to happen after the daemonize part.
"""
for i in range(self.QUEUE_PROCESSOR_SIZE):
self.processors.append(threading.Thread(target=PoolWorker,
args=(i, self.queue.new_queue)))
for t in self.processors:
t.start()
signal.signal(signal.SIGINT, self.handle_sigint)
def process_request_thread(self, request, client_address):
"""Process the request.
......@@ -92,17 +110,31 @@ class IOServer(SocketServer.UnixStreamServer):
args=(request, client_address))
t.start()
def handle_sigint(self, signum, frame):
def handle_quit_signals(self, signum, frame):
print "received %s in %s" % (signum, frame)
self.do_quit = True
self.server_close()
for i in range(self.QUEUE_PROCESSOR_SIZE):
self.queue.new_queue.put(None)
def serve_forever(self):
"""Handle one request at a time until told to quit."""
while not self.do_quit:
self.handle_request()
print "served request, quit=%s" % (self.do_quit)
def server_cleanup(self):
"""Cleanup the server.
This involves shutting down the processor threads and the master
socket.
"""
self.server_close()
utils.RemoveFile(constants.MASTER_SOCKET)
for i in range(self.QUEUE_PROCESSOR_SIZE):
self.queue.new_queue.put(None)
for idx, t in enumerate(self.processors):
print "waiting for processor thread %s..." % idx
t.join()
print "done threads"
class ClientRqHandler(SocketServer.BaseRequestHandler):
......@@ -235,12 +267,74 @@ def PoolWorker(worker_id, incoming_queue):
print "worker %s exiting" % worker_id
def CheckMaster(debug):
"""Checks the node setup.
If this is the master, the function will return. Otherwise it will
exit with an exit code based on the node status.
"""
try:
ss = ssconf.SimpleStore()
master_name = ss.GetMasterNode()
except errors.ConfigurationError, err:
print "Cluster configuration incomplete: '%s'" % str(err)
sys.exit(EXIT_NODESETUP_ERROR)
try:
myself = utils.HostInfo()
except errors.ResolverError, err:
sys.stderr.write("Cannot resolve my own name (%s)\n" % err.args[0])
sys.exit(EXIT_NODESETUP_ERROR)
if myself.name != master_name:
if debug:
sys.stderr.write("Not master, exiting.\n")
sys.exit(EXIT_NOTMASTER)
def ParseOptions():
"""Parse the command line options.
Returns:
(options, args) as from OptionParser.parse_args()
"""
parser = OptionParser(description="Ganeti master daemon",
usage="%prog [-f] [-d]",
version="%%prog (ganeti) %s" %
constants.RELEASE_VERSION)
parser.add_option("-f", "--foreground", dest="fork",
help="Don't detach from the current terminal",
default=True, action="store_false")
parser.add_option("-d", "--debug", dest="debug",
help="Enable some debug messages",
default=False, action="store_true")
options, args = parser.parse_args()
return options, args
def main():
"""Main function"""
options, args = ParseOptions()
utils.debug = options.debug
CheckMaster(options.debug)
master = IOServer(constants.MASTER_SOCKET, ClientRqHandler)
master.serve_forever()
# become a daemon
if options.fork:
utils.Daemonize(logfile=constants.LOG_MASTERDAEMON,
noclose_fds=[master.fileno()])
master.setup_processors()
try:
master.serve_forever()
finally:
master.server_cleanup()
if __name__ == "__main__":
main()
......@@ -54,6 +54,7 @@ LOG_DIR = _autoconf.LOCALSTATEDIR + "/log/ganeti"
LOG_OS_DIR = LOG_DIR + "/os"
LOG_NODESERVER = LOG_DIR + "/node-daemon.log"
LOG_WATCHER = LOG_DIR + "/watcher.log"
LOG_MASTERDAEMON = LOG_DIR + "/master-daemon.log"
OS_SEARCH_PATH = _autoconf.OS_SEARCH_PATH
EXPORT_DIR = _autoconf.EXPORT_DIR
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment