Commit eb79cea5 authored by Georgios Gousios's avatar Georgios Gousios
Browse files

Initial (untested) impl of the Django reconciliation command

parent 7de020ae
......@@ -92,7 +92,7 @@ def trigger_status_update(message):
try:
msg = json.loads(message.body)
if msg["type"] != "" :
if msg["type"] != "reconciliate" :
_logger.error("Message is of unknown type %s", msg["type"])
return
......
#
# Reconciliate VM state - Management Script
#
# Copyright 2010 Greek Research and Technology Network
#
from django.core.management.base import NoArgsCommand
from synnefo.db.models import VirtualMachine
from django.conf import settings
from datetime import datetime, timedelta
from amqplib import client_0_8 as amqp
import time
import socket
import json
class Command(NoArgsCommand):
help = 'Reconciliate VM status with the backend'
def open_channel(self):
conn = None
while conn == None:
try:
conn = amqp.Connection( host=settings.RABBIT_HOST,
userid=settings.RABBIT_USERNAME,
password=settings.RABBIT_PASSWORD,
virtual_host=settings.RABBIT_VHOST)
except socket.error:
time.sleep(1)
pass
return conn.channel()
def handle_noargs(self, **options):
now = datetime.now()
last_update = timedelta(minutes = 30)
not_updated = VirtualMachine.objects.filter(updated__lte = (now - last_update))
all = VirtualMachine.objects.all()
to_update = all.count() / settings.RECONCILIATION_MIN
vm_ids = map(lambda x: x.vm_id, all.filter()) #TODO: Fix filtering
sent = False
for vmid in vm_ids :
while sent is False:
try:
msg = dict(type = "reconciliate", vmid = vmid)
self.chan.basic_publish(json.dumps(msg),
exchange=settings.EXCHANGE_CRON,
routing_key="reconciliation.%s", vmid)
sent = True
except socket.error:
self.chan = self.open_channel()
except Exception:
raise
print "All:%d, Not Updated:%d, Triggered update for:%d" % (all.count(), not_updated.count(), vm_ids)
......@@ -183,8 +183,10 @@ EXCHANGES = (EXCHANGE_GANETI, EXCHANGE_CRON, EXCHANGE_API)
QUEUE_GANETI_EVENTS = "events"
QUEUE_CRON_CREDITS = "credits"
QUEUE_EMAIL = "email"
QUEUE_API = "api"
QUEUE_RECONC = "reconciliation"
QUEUE_DEBUG = "debug" # Debug queue, retrieves all messages
QUEUES = (QUEUE_GANETI_EVENTS, QUEUE_CRON_CREDITS, QUEUE_EMAIL)
QUEUES = (QUEUE_GANETI_EVENTS, QUEUE_CRON_CREDITS, QUEUE_EMAIL, QUEUE_API)
BINDINGS_DEBUG = [
# Queue # Exchange # RouteKey # Handler
......@@ -200,7 +202,7 @@ BINDINGS = [
(QUEUE_CRON_CREDITS, EXCHANGE_CRON, '*.credits.*', 'update_credits'),
(QUEUE_EMAIL, EXCHANGE_API, '*.email.*', 'send_email'),
(QUEUE_EMAIL, EXCHANGE_CRON, '*.email.*', 'send_email'),
(QUEUE_API, EXCHANGE_API, '*.email.*', 'send_email'),
(QUEUE_RECONC, EXCHANGE_CRON, 'reconciliation.*', 'trigger_status_update'),
]
def fix_amqp_settings(backend_prefix):
......@@ -240,5 +242,5 @@ LOGIN_PATH = "/login"
# work after this many hours after 2011/05/10
AUTH_TOKEN_DURATION = 30 * 24
# Number of minutes between reconciliations
# Minutes between reconciliations
RECONCILIATION_MIN = 30
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment