Commit 667d82de authored by Christos Stavrakakis's avatar Christos Stavrakakis
Browse files

cyclades: Callback for cluster modifications

Extend snf-dispatcher callbacks with one to run every time an
OP_CLUSTER_ opcode is executed in a Ganeti backend.

* make snf-ganeti-eventd send messages on cluster modifications. These
  messages do not contain any description about the job.
* create new queue %(prefix)-events-cluster with routing key
  'ganeti.event.cluster'.
* add update_cluster callback to update the disk templates and the
  resources of the backend.
parent 4447a69f
......@@ -34,8 +34,9 @@ import logging
import json
from functools import wraps
from django.db import transaction
from synnefo.db.models import Backend, VirtualMachine, Network, BackendNetwork
from synnefo.logic import utils, backend
from synnefo.logic import utils, backend_mod
from synnefo.lib.utils import merge_time
......@@ -172,9 +173,10 @@ def update_db(vm, msg, event_time):
nics = msg.get("nics", None)
beparams = msg.get("beparams", None)
backend.process_op_status(vm, event_time, msg['jobId'], msg['operation'],
msg['status'], msg['logmsg'], nics=nics,
beparams=beparams)
backend_mod.process_op_status(vm, event_time, msg['jobId'],
msg['operation'], msg['status'],
msg['logmsg'], nics=nics,
beparams=beparams)
log.debug("Done processing ganeti-op-status msg for vm %s.",
msg['instance'])
......@@ -195,12 +197,12 @@ def update_network(network, msg, event_time):
jobid = msg['jobId']
if opcode == "OP_NETWORK_SET_PARAMS":
backend.process_network_modify(network, event_time, jobid, opcode,
status, msg['add_reserved_ips'],
msg['remove_reserved_ips'])
backend_mod.process_network_modify(network, event_time, jobid, opcode,
status, msg['add_reserved_ips'],
msg['remove_reserved_ips'])
else:
backend.process_network_status(network, event_time, jobid, opcode,
status, msg['logmsg'])
backend_mod.process_network_status(network, event_time, jobid, opcode,
status, msg['logmsg'])
log.debug("Done processing ganeti-network-status msg for network %s.",
msg['network'])
......@@ -221,7 +223,7 @@ def update_build_progress(vm, msg, event_time):
return
if msg['type'] == 'image-copy-progress':
backend.process_create_progress(vm, event_time, msg['progress'])
backend_mod.process_create_progress(vm, event_time, msg['progress'])
# we do not add diagnostic messages for copy-progress messages
return
......@@ -261,13 +263,23 @@ def update_build_progress(vm, msg, event_time):
message = " ".join(source.split("-")).capitalize()
# create the diagnostic entry
backend.create_instance_diagnostic(vm, message, source, level, event_time,
details=details)
backend_mod.create_instance_diagnostic(vm, message, source, level,
event_time, details=details)
log.debug("Done processing ganeti-create-progress msg for vm %s.",
msg['instance'])
@transaction.commit_on_success()
def update_cluster(msg):
clustername = msg.get("cluster")
if clustername is None:
return
backend = Backend.objects.select_for_update().get(clustername=clustername)
backend_mod.update_backend_disk_templates(backend)
backend_mod.update_backend_resources(backend)
def dummy_proc(client, message, *args, **kwargs):
try:
log.debug("Msg: %s", message['body'])
......
......@@ -45,7 +45,8 @@ class Command(BaseCommand):
help = HELP_MSG
def handle(self, **options):
for backend in Backend.objects.filter(offline=False):
for backend in Backend.objects.select_for_update()\
.filter(offline=False):
backend_mod.update_backend_disk_templates(backend)
backend_mod.update_backend_resources(backend)
self.stdout.write("Successfully updated backend '%s'\n" % backend)
......@@ -43,6 +43,7 @@ EXCHANGES = (EXCHANGE_GANETI,)
QUEUE_OP = "%s-events-op" % prefix
QUEUE_NETWORK = "%s-events-network" % prefix
QUEUE_PROGRESS = "%s-events-progress" % prefix
QUEUE_CLUSTER = "%s-events-cluster" % prefix
QUEUES = (QUEUE_OP,
......@@ -56,6 +57,7 @@ KEY_OP = 'ganeti.%s.event.op' % prefix
KEY_NETWORK = 'ganeti.%s.event.network' % prefix
# notifications of type "ganeti-create-progress"
KEY_PROGRESS = 'ganeti.%s.event.progress' % prefix
KEY_CLUSTER = 'ganeti.event.cluster'
# BINDINGS:
BINDINGS = (
......@@ -63,6 +65,7 @@ BINDINGS = (
(QUEUE_OP, EXCHANGE_GANETI, KEY_OP, 'update_db'),
(QUEUE_NETWORK, EXCHANGE_GANETI, KEY_NETWORK, 'update_network'),
(QUEUE_PROGRESS, EXCHANGE_GANETI, KEY_PROGRESS, 'update_build_progress'),
(QUEUE_CLUSTER, EXCHANGE_GANETI, KEY_CLUSTER, 'update_cluster'),
)
......
......@@ -188,7 +188,8 @@ class JobFileHandler(pyinotify.ProcessEvent):
self.client.exchange_declare(settings.EXCHANGE_GANETI, type='topic')
self.op_handlers = {"INSTANCE": self.process_instance_op,
"NETWORK": self.process_network_op}
"NETWORK": self.process_network_op,
"CLUSTER": self.process_cluster_op}
# "GROUP": self.process_group_op}
def process_IN_CLOSE_WRITE(self, event):
......@@ -329,12 +330,22 @@ class JobFileHandler(pyinotify.ProcessEvent):
return msg, routekey
def process_cluster_op(self, op, job_id):
""" Process OP_CLUSTER_* opcodes.
# def process_group_op(self, op, job_id):
# """ Process OP_GROUP_* opcodes.
"""
input = op.input
op_id = input.OP_ID
self.logger.debug("Job: %d: %s %s", job_id, op_id, op.status)
# """
# return None, None
msg = {'operation': op_id,
'type': "ganeti-cluster-status"}
routekey = "ganeti.event.cluster"
return msg, routekey
def find_cluster_name():
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment