Commit 084f5cde authored by Vangelis Koukis's avatar Vangelis Koukis
Browse files

Merge branch 'network-api'

parents d4785d61 ad711a24
DEPLOYMENT notes
=================
Debian
------
- Service dependencies
* ganeti-0mqd
The Ganeti notification daemon must run on the Ganeti master node.
Edit all relevant settings in settings.py, GANETI_0MQD_* variables.
Then start the server on the Ganeti master.
# ./ganeti/ganeti-0mqd.py
FIXME: The server must be started from the project root directory.
TBD: how to handle master migration.
* db_controller
db_controller receives notifications from 0mq and updates the DB.
For now, db_controller must be run by hand:
$ ./db/db_controller.py
FIXME. Complete this part.
* vncauthproxy
To support OOB console access to the VMs over VNC, the vncauthproxy
daemon must be running on every Django node.
Download and install vncauthproxy from its own repository,
at https://code.grnet.gr/git/vncauthproxy (known good commit: 48b1400e).
Edit default settings on top of vncauthproxy.py.
Set CTRL_SOCKET in util/vapclient.py to point to its control socket.
FIXME: The CTRL_SOCKET setting will be moved to settings.py as
VNCAUTHPROXY_CTRL_SOCKET.
- OS Specific instructions
* Debian Squeeze
TBD.
\ No newline at end of file
......@@ -29,7 +29,7 @@ import traceback
from threading import Thread, Event, currentThread
from synnefo.db.models import VirtualMachine
from synnefo.settings import GANETI_ZMQ_PUBLISHER
from synnefo.settings import GANETI_MASTER_IP, GANETI_0MQD_PUB_PORT
from synnefo.logic import utils, backend
class StoppableThread(Thread):
......@@ -41,7 +41,7 @@ class StoppableThread(Thread):
"""
def __init__(self, *args, **kwargs):
super(StoppableThread, self).__init__(*args, **kwargs)
Thread.__init__(self, *args, **kwargs)
self._stop = Event()
def stop(self):
......@@ -52,6 +52,7 @@ class StoppableThread(Thread):
def zmq_sub_thread(subscriber):
logging.error("Entering 0mq to wait for message on SUB socket.")
while True:
logging.debug("Entering 0mq to wait for message on SUB socket.")
data = subscriber.recv()
......@@ -99,6 +100,7 @@ def main():
# a hopefully unique identity for this 0mq peer.
# Reusing zmq.IDENTITY for two distinct peers triggers this 0mq bug:
# https://github.com/zeromq/zeromq2/issues/30
GANETI_ZMQ_PUBLISHER = "tcp://%s:%d" % (GANETI_MASTER_IP, int(GANETI_0MQD_PUB_PORT))
subscriber.setsockopt(zmq.IDENTITY, platform.node() + getpass.getuser() + "snf-db-controller")
subscriber.setsockopt(zmq.SUBSCRIBE, "")
subscriber.connect(GANETI_ZMQ_PUBLISHER)
......
......@@ -2,33 +2,114 @@
#
# Copyright (c) 2010 Greek Research and Technology Network
#
""" A daemon to monitor the Ganeti job queue and emit job progress notifications over 0mq. """
"""Ganeti notification daemon for 0mqd
A daemon to monitor the Ganeti job queue and publish job progress
and Ganeti VM state notifications over a 0mq PUB endpoint.
"""
from django.core.management import setup_environ
import os
import sys
import os
path = os.path.normpath(os.path.join(os.getcwd(), '..'))
sys.path.append(path)
import synnefo.settings as settings
setup_environ(settings)
import zmq
import time
import json
import logging
import pyinotify
import daemon
import daemon.pidlockfile
from signal import signal, SIGINT, SIGTERM
from threading import Thread, Event, currentThread
from ganeti import utils
from ganeti import jqueue
from ganeti import constants
from ganeti import serializer
GANETI_ZMQ_PUBLISHER = "tcp://*:5801" # FIXME: move to settings.py
class StoppableThread(Thread):
"""Thread class with a stop() method.
The thread needs to check regularly for the stopped() condition.
When it does, it exits, so that another thread may .join() it.
"""
def __init__(self, *args, **kwargs):
Thread.__init__(self, *args, **kwargs)
self._stop = Event()
def stop(self):
self._stop.set()
def stopped(self):
return self._stop.isSet()
class GanetiZMQThread(StoppableThread):
"""The 0mq processing thread: PULLs and then PUBlishes notifications.
This thread runs until stopped, receiving notifications over a
0mq PULL socket, and publishing them over a 0mq PUB socket.
The are currently two sources of notifications:
a. ganeti-0mqd itself, monitoring the Ganeti job queue
b. hooks running in the context of Ganeti
"""
def __init__(self, logger, puller, publisher):
StoppableThread.__init__(self)
self.logger = logger
self.puller = puller
self.publisher = publisher
def run(self):
self.logger.debug("0mq thread ready")
try:
while True:
# Pull
self.logger.debug("Waiting on the 0mq PULL socket")
data = self.puller.recv()
self.logger.debug("Received message on 0mq PULL socket")
if currentThread().stopped():
self.logger.debug("Thread has been stopped, leaving request loop")
return
try:
msg = json.loads(data)
if msg['type'] not in ('ganeti-op-status'):
self.logger.error("Not forwarding message of unknown type: %s", msg.dumps(data))
continue
except Exception, e:
self.logger.exception("Unexpected Exception decoding msg: %s", data)
continue
# Publish
self.logger.debug("PUBlishing msg: %s", json.dumps(msg))
self.publisher.send_json(msg)
except:
self.logger.exception("Caught exception, terminating")
os.kill(os.getpid(), SIGTERM)
class JobFileHandler(pyinotify.ProcessEvent):
def __init__(self, publisher):
def __init__(self, logger, pusher):
pyinotify.ProcessEvent.__init__(self)
self.publisher = publisher
self.logger = logger
self.pusher = pusher
def process_IN_CLOSE_WRITE(self, event):
jobfile = os.path.join(event.path, event.name)
if not event.name.startswith("job-"):
logging.debug("Not a job file: %s" % event.path)
self.logger.debug("Not a job file: %s" % event.path)
return
try:
......@@ -58,9 +139,8 @@ class JobFileHandler(pyinotify.ProcessEvent):
except IndexError:
logmsg = None
logging.debug("%d: %s(%s) %s %s" % (int(job.id), op.input.OP_ID, instances, op.status, logmsg))
if op.status in constants.JOBS_FINALIZED:
logging.info("%d: %s" % (int(job.id), op.status))
self.logger.debug("%d: %s(%s) %s %s",
int(job.id), op.input.OP_ID, instances, op.status, logmsg)
# Construct message
msg = {
......@@ -69,46 +149,147 @@ class JobFileHandler(pyinotify.ProcessEvent):
"operation": op.input.OP_ID,
"jobId": int(job.id),
"status": op.status,
"logmsg": logmsg
"logmsg": logmsg
}
if logmsg:
msg["message"] = logmsg
# Output as JSON
print json.dumps(msg)
self.publisher.send_json(msg)
# Push to the 0mq thread for PUBlication
self.logger.debug("PUSHing msg: %s", json.dumps(msg))
self.pusher.send_json(msg)
handler_logger = None
def fatal_signal_handler(signum, frame):
global handler_logger
handler_logger.info("Caught fatal signal %d, will raise SystemExit",
signum)
raise SystemExit
def parse_arguments(args):
from optparse import OptionParser
parser = OptionParser()
parser.add_option("-d", "--debug", action="store_true", dest="debug",
help="Enable debugging information")
parser.add_option("-l", "--log", dest="log_file",
default=settings.GANETI_0MQD_LOG_FILE,
metavar="FILE",
help="Write log to FILE instead of %s" %
settings.GANETI_0MQD_LOG_FILE),
parser.add_option('--pid-file', dest="pid_file",
default=settings.GANETI_0MQD_PID_FILE,
metavar='PIDFILE',
help="Save PID to file (default: %s)" %
settings.GANETI_0MQD_PID_FILE)
parser.add_option("-p", "--pull-port", dest="pull_port",
default=settings.GANETI_0MQD_PULL_PORT, type="int", metavar="PULL_PORT",
help="The TCP port number to use for the 0mq PULL endpoint")
parser.add_option("-P", "--pub-port", dest="pub_port",
default=settings.GANETI_0MQD_PUB_PORT, type="int", metavar="PUB_PORT",
help="The TCP port number to use for the 0mq PUB endpoint")
return parser.parse_args(args)
def main():
global handler_logger
(opts, args) = parse_arguments(sys.argv[1:])
# The 0mq endpoints to use for receiving and publishing notifications.
GANETI_0MQD_PUB_ENDPOINT = "tcp://*:%d" % int(opts.pub_port)
GANETI_0MQD_PULL_ENDPOINT = "tcp://*:%d" % int(opts.pull_port)
GANETI_0MQD_INPROC_ENDPOINT = "inproc://ganeti-0mqd"
# Create pidfile
pidf = daemon.pidlockfile.TimeoutPIDLockFile(
opts.pid_file, 10)
# Initialize logger
lvl = logging.DEBUG if opts.debug else logging.INFO
logger = logging.getLogger("ganeti-0mqd")
logger.setLevel(lvl)
formatter = logging.Formatter("%(asctime)s %(module)s[%(process)d] %(levelname)s: %(message)s",
"%Y-%m-%d %H:%M:%S")
handler = logging.FileHandler(opts.log_file)
handler.setFormatter(formatter)
logger.addHandler(handler)
handler_logger = logger
# Become a daemon:
# Redirect stdout and stderr to handler.stream to catch
# early errors in the daemonization process [e.g., pidfile creation]
# which will otherwise go to /dev/null.
daemon_context = daemon.DaemonContext(
pidfile=pidf,
umask=022,
stdout=handler.stream,
stderr=handler.stream,
files_preserve=[handler.stream])
daemon_context.open()
logger.info("Became a daemon")
# Catch signals to ensure graceful shutdown
signal(SIGINT, fatal_signal_handler)
signal(SIGTERM, fatal_signal_handler)
# Create 0mq sockets: One for the PUBlisher, one for the PULLer,
# one inproc PUSHer for inter-thread communication.
zmqc = zmq.Context()
publisher = zmqc.socket(zmq.PUB)
publisher.bind(GANETI_ZMQ_PUBLISHER)
logging.info("Now publishing on %s" % GANETI_ZMQ_PUBLISHER)
puller = zmqc.socket(zmq.PULL)
puller.bind(GANETI_0MQD_PULL_ENDPOINT)
puller.bind(GANETI_0MQD_INPROC_ENDPOINT)
publisher = zmqc.socket(zmq.PUB)
publisher.bind(GANETI_0MQD_PUB_ENDPOINT)
pusher = zmqc.socket(zmq.PUSH)
pusher.connect(GANETI_0MQD_INPROC_ENDPOINT)
logger.info("PUSHing to %s", GANETI_0MQD_INPROC_ENDPOINT)
logger.info("PULLing from (%s, %s)",
GANETI_0MQD_PULL_ENDPOINT, GANETI_0MQD_INPROC_ENDPOINT)
logger.info("PUBlishing on %s", GANETI_0MQD_PUB_ENDPOINT)
# Use a separate thread for 0mq processing,
# needed because the Python runtime interacts badly with 0mq's blocking semantics.
zmqt = GanetiZMQThread(logger, puller, publisher)
zmqt.start()
# Monitor the Ganeti job queue, create and push notifications
wm = pyinotify.WatchManager()
mask = pyinotify.EventsCodes.ALL_FLAGS["IN_CLOSE_WRITE"]
handler = JobFileHandler(publisher)
handler = JobFileHandler(logger, pusher)
notifier = pyinotify.Notifier(wm, handler)
wm.add_watch(constants.QUEUE_DIR, mask)
logging.info("Now watching %s" % constants.QUEUE_DIR)
try:
# Fail if adding the inotify() watch fails for any reason
res = wm.add_watch(constants.QUEUE_DIR, mask)
if res[constants.QUEUE_DIR] < 0:
raise Exception("pyinotify add_watch returned negative watch descriptor")
logger.info("Now watching %s" % constants.QUEUE_DIR)
while True: # loop forever
try:
# process the queue of events as explained above
while True: # loop forever
# process the queue of events as explained above
notifier.process_events()
if notifier.check_events():
# read notified events and enqeue them
notifier.read_events()
except KeyboardInterrupt:
# destroy the inotify's instance on this interrupt (stop monitoring)
notifier.stop()
break
except SystemExit:
logger.info("SystemExit")
except:
logger.exception("Caught exception, terminating")
finally:
# destroy the inotify's instance on this interrupt (stop monitoring)
notifier.stop()
# mark the 0mq thread as stopped, wake it up so that it notices
zmqt.stop()
pusher.send_json({'type': 'null'})
raise
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
sys.exit(main())
# vim: set ts=4 sts=4 sw=4 et ai :
......@@ -38,7 +38,7 @@ def get_rsapi_state(vm):
successful completion of an operation. If Ganeti says an OP_INSTANCE_STARTUP
operation succeeded, vm.operstate is set to "STARTED".
* To support any transitive states defined by the API (only REBOOT for the time
* To support any transitional states defined by the API (only REBOOT for the time
being) this mapping is amended with information reported by Ganeti regarding
any outstanding operation. If an OP_INSTANCE_STARTUP had succeeded previously
and an OP_INSTANCE_REBOOT has been reported as in progress, the API state is
......
......@@ -138,7 +138,8 @@ INSTALLED_APPS = (
# The RAPI endpoint and associated credentials to use
# for talking to the Ganeti backend.
GANETI_CLUSTER_INFO = ("62.217.120.78", 5080, "synnefo", "ocean!")
GANETI_MASTER_IP = "62.217.120.78"
GANETI_CLUSTER_INFO = (GANETI_MASTER_IP, 5080, "synnefo", "ocean!")
# This prefix gets used when determining the instance names
# of Synnefo VMs at the Ganeti backend.
......@@ -161,9 +162,20 @@ TIMEOUT = 10 * 1000
# parameter refers to a point in time more than POLL_LIMIT seconds ago.
POLL_LIMIT = 3600
# This should be set to the PUB endpoint maintained by ganeti-0mqd.
# Normally, this is a TCP port on the Ganeti master IP.
GANETI_ZMQ_PUBLISHER = "tcp://62.217.120.67:5801"
# Configuration for ganeti-0mqd, the Ganeti notification daemon
#
# ganeti-0mqd uses two 0mqd endpoints:
# * A PULL endpoint for receiving job updates from code running inside
# Ganeti hooks, running on TCP port GANETI_0MQD_PULL_PORT on the
# Ganeti master IP,
# * A PUB endpoint for publishing notifications to the rest of the
# infrastructure, running on TCP port GANETI_0MQD_PUB_PORT on the
# Ganeti master IP.
#
GANETI_0MQD_PUB_PORT = "5801"
GANETI_0MQD_PULL_PORT = "5802"
GANETI_0MQD_LOG_FILE = "/var/log/synnefo/ganeti-0mqd.log"
GANETI_0MQD_PID_FILE = "/var/run/synnefo/ganeti-0mqd.pid"
# The API implementation needs to accept and return absolute references
# to its resources. Thus, it needs to know its public URL.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment