Commit 1f50ac0d authored by Vangelis Koukis's avatar Vangelis Koukis
Browse files

Refactor ganeti-0mqd as daemon, message broker

Refactor ganeti-0mqd to run as a UNIX daemon:
  * Log under /var/log/synnefo by default
  * Maintain PID file under /var/run/synnefo
  * Handle signals gracefully

Refactor ganeti-0mqd to be a 0mq message broker:
    * A separate thread handles receiving notifications from a PULL
      socket and publishing them to a PUB endpoint.
    * Notifications come from running inotify() on the Ganeti job queue
    * ...or from synnefo hooks running inside Ganeti (e.g., notifications
      for VM network updates)

Make all settings user-configurable, through settings.py and
command-line arguments.
parent 83dcbbe1
......@@ -29,7 +29,7 @@ import traceback
from threading import Thread, Event, currentThread
from synnefo.db.models import VirtualMachine
from synnefo.settings import GANETI_ZMQ_PUBLISHER
from synnefo.settings import GANETI_MASTER_IP, GANETI_0MQD_PUB_PORT
from synnefo.logic import utils, backend
class StoppableThread(Thread):
......@@ -41,7 +41,7 @@ class StoppableThread(Thread):
"""
def __init__(self, *args, **kwargs):
super(StoppableThread, self).__init__(*args, **kwargs)
Thread.__init__(self, *args, **kwargs)
self._stop = Event()
def stop(self):
......@@ -52,6 +52,7 @@ class StoppableThread(Thread):
def zmq_sub_thread(subscriber):
logging.error("Entering 0mq to wait for message on SUB socket.")
while True:
logging.debug("Entering 0mq to wait for message on SUB socket.")
data = subscriber.recv()
......@@ -99,6 +100,7 @@ def main():
# a hopefully unique identity for this 0mq peer.
# Reusing zmq.IDENTITY for two distinct peers triggers this 0mq bug:
# https://github.com/zeromq/zeromq2/issues/30
GANETI_ZMQ_PUBLISHER = "tcp://%s:%d" % (GANETI_MASTER_IP, int(GANETI_0MQD_PUB_PORT))
subscriber.setsockopt(zmq.IDENTITY, platform.node() + getpass.getuser() + "snf-db-controller")
subscriber.setsockopt(zmq.SUBSCRIBE, "")
subscriber.connect(GANETI_ZMQ_PUBLISHER)
......
......@@ -2,33 +2,114 @@
#
# Copyright (c) 2010 Greek Research and Technology Network
#
""" A daemon to monitor the Ganeti job queue and emit job progress notifications over 0mq. """
"""Ganeti notification daemon for 0mqd
A daemon to monitor the Ganeti job queue and publish job progress
and Ganeti VM state notifications over a 0mq PUB endpoint.
"""
from django.core.management import setup_environ
import os
import sys
import os
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
sys.path.append(path)
import synnefo.settings as settings
setup_environ(settings)
import zmq
import time
import json
import logging
import pyinotify
import daemon
import daemon.pidlockfile
from signal import signal, SIGINT, SIGTERM
from threading import Thread, Event, currentThread
from ganeti import utils
from ganeti import jqueue
from ganeti import constants
from ganeti import serializer
GANETI_ZMQ_PUBLISHER = "tcp://*:5801" # FIXME: move to settings.py
class StoppableThread(Thread):
"""Thread class with a stop() method.
The thread needs to check regularly for the stopped() condition.
When it does, it exits, so that another thread may .join() it.
"""
def __init__(self, *args, **kwargs):
Thread.__init__(self, *args, **kwargs)
self._stop = Event()
def stop(self):
self._stop.set()
def stopped(self):
return self._stop.isSet()
class GanetiZMQThread(StoppableThread):
"""The 0mq processing thread: PULLs and then PUBlishes notifications.
This thread runs until stopped, receiving notifications over a
0mq PULL socket, and publishing them over a 0mq PUB socket.
The are currently two sources of notifications:
a. ganeti-0mqd itself, monitoring the Ganeti job queue
b. hooks running in the context of Ganeti
"""
def __init__(self, logger, puller, publisher):
StoppableThread.__init__(self)
self.logger = logger
self.puller = puller
self.publisher = publisher
def run(self):
self.logger.debug("0mq thread ready")
try:
while True:
# Pull
self.logger.debug("Waiting on the 0mq PULL socket")
data = self.puller.recv()
self.logger.debug("Received message on 0mq PULL socket")
if currentThread().stopped():
self.logger.debug("Thread has been stopped, leaving request loop")
return
try:
msg = json.loads(data)
if msg['type'] not in ('ganeti-op-status'):
self.logger.error("Not forwarding message of unknown type: %s", msg.dumps(data))
continue
except Exception, e:
self.logger.exception("Unexpected Exception decoding msg: %s", data)
continue
# Publish
self.logger.debug("PUBlishing msg: %s", json.dumps(msg))
self.publisher.send_json(msg)
except:
self.logger.exception("Caught exception, terminating")
os.kill(os.getpid(), SIGTERM)
class JobFileHandler(pyinotify.ProcessEvent):
def __init__(self, publisher):
def __init__(self, logger, pusher):
pyinotify.ProcessEvent.__init__(self)
self.publisher = publisher
self.logger = logger
self.pusher = pusher
def process_IN_CLOSE_WRITE(self, event):
jobfile = os.path.join(event.path, event.name)
if not event.name.startswith("job-"):
logging.debug("Not a job file: %s" % event.path)
self.logger.debug("Not a job file: %s" % event.path)
return
try:
......@@ -58,9 +139,8 @@ class JobFileHandler(pyinotify.ProcessEvent):
except IndexError:
logmsg = None
logging.debug("%d: %s(%s) %s %s" % (int(job.id), op.input.OP_ID, instances, op.status, logmsg))
if op.status in constants.JOBS_FINALIZED:
logging.info("%d: %s" % (int(job.id), op.status))
self.logger.debug("%d: %s(%s) %s %s",
int(job.id), op.input.OP_ID, instances, op.status, logmsg)
# Construct message
msg = {
......@@ -69,46 +149,147 @@ class JobFileHandler(pyinotify.ProcessEvent):
"operation": op.input.OP_ID,
"jobId": int(job.id),
"status": op.status,
"logmsg": logmsg
"logmsg": logmsg
}
if logmsg:
msg["message"] = logmsg
# Output as JSON
print json.dumps(msg)
self.publisher.send_json(msg)
# Push to the 0mq thread for PUBlication
self.logger.debug("PUSHing msg: %s", json.dumps(msg))
self.pusher.send_json(msg)
handler_logger = None
def fatal_signal_handler(signum, frame):
global handler_logger
handler_logger.info("Caught fatal signal %d, will raise SystemExit",
signum)
raise SystemExit
def parse_arguments(args):
from optparse import OptionParser
parser = OptionParser()
parser.add_option("-d", "--debug", action="store_true", dest="debug",
help="Enable debugging information")
parser.add_option("-l", "--log", dest="log_file",
default=settings.GANETI_0MQD_LOG_FILE,
metavar="FILE",
help="Write log to FILE instead of %s" %
settings.GANETI_0MQD_LOG_FILE),
parser.add_option('--pid-file', dest="pid_file",
default=settings.GANETI_0MQD_PID_FILE,
metavar='PIDFILE',
help="Save PID to file (default: %s)" %
settings.GANETI_0MQD_PID_FILE)
parser.add_option("-p", "--pull-port", dest="pull_port",
default=settings.GANETI_0MQD_PULL_PORT, type="int", metavar="PULL_PORT",
help="The TCP port number to use for the 0mq PULL endpoint")
parser.add_option("-P", "--pub-port", dest="pub_port",
default=settings.GANETI_0MQD_PUB_PORT, type="int", metavar="PUB_PORT",
help="The TCP port number to use for the 0mq PUB endpoint")
return parser.parse_args(args)
def main():
global handler_logger
(opts, args) = parse_arguments(sys.argv[1:])
# The 0mq endpoints to use for receiving and publishing notifications.
GANETI_0MQD_PUB_ENDPOINT = "tcp://*:%d" % int(opts.pub_port)
GANETI_0MQD_PULL_ENDPOINT = "tcp://*:%d" % int(opts.pull_port)
GANETI_0MQD_INPROC_ENDPOINT = "inproc://ganeti-0mqd"
# Create pidfile
pidf = daemon.pidlockfile.TimeoutPIDLockFile(
opts.pid_file, 10)
# Initialize logger
lvl = logging.DEBUG if opts.debug else logging.INFO
logger = logging.getLogger("ganeti-0mqd")
logger.setLevel(lvl)
formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
"%Y-%m-%d %H:%M:%S")
handler = logging.FileHandler(opts.log_file)
handler.setFormatter(formatter)
logger.addHandler(handler)
handler_logger = logger
# Become a daemon:
# Redirect stdout and stderr to handler.stream to catch
# early errors in the daemonization process [e.g., pidfile creation]
# which will otherwise go to /dev/null.
daemon_context = daemon.DaemonContext(
pidfile=pidf,
umask=022,
stdout=handler.stream,
stderr=handler.stream,
files_preserve=[handler.stream])
daemon_context.open()
logger.info("Became a daemon")
# Catch signals to ensure graceful shutdown
signal(SIGINT, fatal_signal_handler)
signal(SIGTERM, fatal_signal_handler)
# Create 0mq sockets: One for the PUBlisher, one for the PULLer,
# one inproc PUSHer for inter-thread communication.
zmqc = zmq.Context()
publisher = zmqc.socket(zmq.PUB)
publisher.bind(GANETI_ZMQ_PUBLISHER)
logging.info("Now publishing on %s" % GANETI_ZMQ_PUBLISHER)
puller = zmqc.socket(zmq.PULL)
puller.bind(GANETI_0MQD_PULL_ENDPOINT)
puller.bind(GANETI_0MQD_INPROC_ENDPOINT)
publisher = zmqc.socket(zmq.PUB)
publisher.bind(GANETI_0MQD_PUB_ENDPOINT)
pusher = zmqc.socket(zmq.PUSH)
pusher.connect(GANETI_0MQD_INPROC_ENDPOINT)
logger.info("PUSHing to %s", GANETI_0MQD_INPROC_ENDPOINT)
logger.info("PULLing from (%s, %s)",
GANETI_0MQD_PULL_ENDPOINT, GANETI_0MQD_INPROC_ENDPOINT)
logger.info("PUBlishing on %s", GANETI_0MQD_PUB_ENDPOINT)
# Use a separate thread for 0mq processing,
# needed because the Python runtime interacts badly with 0mq's blocking semantics.
zmqt = GanetiZMQThread(logger, puller, publisher)
zmqt.start()
# Monitor the Ganeti job queue, create and push notifications
wm = pyinotify.WatchManager()
mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
handler = JobFileHandler(publisher)
handler = JobFileHandler(logger, pusher)
notifier = pyinotify.Notifier(wm, handler)
wm.add_watch(constants.QUEUE_DIR, mask)
logging.info("Now watching %s" % constants.QUEUE_DIR)
try:
# Fail if adding the inotify() watch fails for any reason
res = wm.add_watch(constants.QUEUE_DIR, mask)
if res[constants.QUEUE_DIR] < 0:
raise Exception("pyinotify add_watch returned negative watch descriptor")
logger.info("Now watching %s" % constants.QUEUE_DIR)
while True: # loop forever
try:
# process the queue of events as explained above
while True: # loop forever
# process the queue of events as explained above
notifier.process_events()
if notifier.check_events():
# read notified events and enqeue them
notifier.read_events()
except KeyboardInterrupt:
# destroy the inotify's instance on this interrupt (stop monitoring)
notifier.stop()
break
except SystemExit:
logger.info("SystemExit")
except:
logger.exception("Caught exception, terminating")
finally:
# destroy the inotify's instance on this interrupt (stop monitoring)
notifier.stop()
# mark the 0mq thread as stopped, wake it up so that it notices
zmqt.stop()
pusher.send_json({'type': 'null'})
raise
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
sys.exit(main())
# vim: set ts=4 sts=4 sw=4 et ai :
......@@ -133,12 +133,12 @@ INSTALLED_APPS = (
'synnefo.db',
'synnefo.ganeti',
'synnefo.logic',
'south'
)
# The RAPI endpoint and associated credentials to use
# for talking to the Ganeti backend.
GANETI_CLUSTER_INFO = ("62.217.120.78", 5080, "synnefo", "ocean!")
GANETI_MASTER_IP = "62.217.120.78"
GANETI_CLUSTER_INFO = (GANETI_MASTER_IP, 5080, "synnefo", "ocean!")
# This prefix gets used when determining the instance names
# of Synnefo VMs at the Ganeti backend.
......@@ -161,9 +161,20 @@ TIMEOUT = 10 * 1000
# parameter refers to a point in time more than POLL_LIMIT seconds ago.
POLL_LIMIT = 3600
# This should be set to the PUB endpoint maintained by ganeti-0mqd.
# Normally, this is a TCP port on the Ganeti master IP.
GANETI_ZMQ_PUBLISHER = "tcp://62.217.120.67:5801"
# Configuration for ganeti-0mqd, the Ganeti notification daemon
#
# ganeti-0mqd uses two 0mqd endpoints:
# * A PULL endpoint for receiving job updates from code running inside
# Ganeti hooks, running on TCP port GANETI_0MQD_PULL_PORT on the
# Ganeti master IP,
# * A PUB endpoint for publishing notifications to the rest of the
# infrastructure, running on TCP port GANETI_0MQD_PUB_PORT on the
# Ganeti master IP.
#
GANETI_0MQD_PUB_PORT = "5801"
GANETI_0MQD_PULL_PORT = "5802"
GANETI_0MQD_LOG_FILE = "/var/log/synnefo/ganeti-0mqd.log"
GANETI_0MQD_PID_FILE = "/var/run/synnefo/ganeti-0mqd.pid"
# The API implementation needs to accept and return absolute references
# to its resources. Thus, it needs to know its public URL.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment