Commit 7de020ae authored by Georgios Gousios's avatar Georgios Gousios
Browse files

Dispatcher configuration for reconciliation

parent 53f41485
......@@ -131,3 +131,15 @@ def shutdown_instance(vm):
def get_instance_console(vm):
return rapi.GetInstanceConsole(vm.backend_id)
def request_status_update(vm):
return rapi.GetInstanceInfo(vm.backend_id)
def get_job_status(jobid):
return rapi.GetJobStatus(jobid)
def update_status(vm, status):
utils.update_state(vm, status)
\ No newline at end of file
from datetime import datetime, timedelta
import time
# Some utility classes / functions first
class AllMatch(set):
"""Universal set - match everything"""
def __contains__(self, item):
return True
allMatch = AllMatch()
# The actual Event class
class Event(object):
def __init__(self, action, min=allMatch, hour=allMatch,
day=allMatch, month=allMatch, dow=allMatch,
args=(), kwargs={}):
self.mins = self._conv_to_set(min)
self.hours= self._conv_to_set(hour)
self.days = self._conv_to_set(day)
self.months = self._conv_to_set(month)
self.dow = self._conv_to_set(dow)
self.action = action
self.args = args
self.kwargs = kwargs
def matchtime(self, t):
"""Return True if this event should trigger at the specified datetime"""
return ((t.minute in self.mins) and
(t.hour in self.hours) and
(t.day in self.days) and
(t.month in self.months) and
(t.weekday() in self.dow))
def check(self, t):
if self.matchtime(t):
self.action(*self.args, **self.kwargs)
def _conv_to_set(self,obj): # Allow single integer to be provided
if isinstance(obj, (int,long)):
return set([obj]) # Single item
if not isinstance(obj, set):
obj = set(obj)
return obj
class CronTab(object):
def __init__(self, *events):
self.events = events
def run(self):
t=datetime(*datetime.now().timetuple()[:5])
while 1:
for e in self.events:
e.check(t)
t += timedelta(minutes=1)
n = datetime.now()
while n < t:
s = (t - n).seconds + 1
time.sleep(s)
n = datetime.now()
#
# Callback functions used by the cron dispatcher.
#
# Copyright 2010 Greek Research and Technology Network
#
def send_recons_req():
"""
Publish a reconsiliation request to the queue
"""
reconcile = dict()
reconcile['msg'] = 'reconcile'
pass
def send_credit_calc_req():
"""
Publish a credit calculation request to the queue
"""
pass
\ No newline at end of file
......@@ -86,6 +86,64 @@ def update_credits(message):
_logger.debug("Request to update credits")
message.channel.basic_ack(message.delivery_tag)
def trigger_status_update(message):
_logger.debug("Request to trigger status update:", message.body)
try:
msg = json.loads(message.body)
if msg["type"] != "" :
_logger.error("Message is of unknown type %s", msg["type"])
return
if msg["vm-id"] == "" :
_logger.error("Message does not specify a VM id")
return
vm = VirtualMachine.objects.get(id=msg["vm-id"])
backend.request_status_update(vm)
message.channel.basic_ack(message.delivery_tag)
except KeyError:
_logger.error("Malformed incoming JSON, missing attributes: %s",
message.body)
except Exception as e:
_logger.error("Unexpected error:\n%s" %
"".join(traceback.format_exception(*sys.exc_info())))
def status_job_finished (message) :
_logger.debug("Job status message received:", message.body)
try:
msg = message.body;
if msg['operation'] != u'OP_INSTANCE_QUERY_DATA':
_logger.error("Message is of unknown type %s", msg["operation"])
return
if msg["status"] != "success" :
_logger.error("Status job %d for %s did not finish properly",
msg['jobId'], msg['instance'])
return
status = backend.get_job_status(msg['jobid'])
if status["summary"] != "INSTANCE_QUERY_DATA" or type(status["opresult"]) is not list:
_logger.error("Status is of unknown type %s", msg["summary"])
return
req_state = status['opresult'][msg['instance']]['config_state']
run_state = status['opresult'][msg['instance']]['run_state']
vm = VirtualMachine.objects.get(name=msg['instance'])
backend.update_status(vm, run_state)
message.channel.basic_ack(message.delivery_tag)
except KeyError:
_logger.error("Malformed incoming JSON, missing attributes: %s",
message.body)
except Exception as e:
_logger.error("Unexpected error:\n%s" %
"".join(traceback.format_exception(*sys.exc_info())))
def dummy_proc(message):
try:
......
......@@ -200,6 +200,7 @@ BINDINGS = [
(QUEUE_CRON_CREDITS, EXCHANGE_CRON, '*.credits.*', 'update_credits'),
(QUEUE_EMAIL, EXCHANGE_API, '*.email.*', 'send_email'),
(QUEUE_EMAIL, EXCHANGE_CRON, '*.email.*', 'send_email'),
(QUEUE_API, EXCHANGE_API, '*.email.*', 'send_email'),
]
def fix_amqp_settings(backend_prefix):
......@@ -239,3 +240,5 @@ LOGIN_PATH = "/login"
# work after this many hours after 2011/05/10
AUTH_TOKEN_DURATION = 30 * 24
# Number of minutes between reconciliations
RECONCILIATION_MIN = 30
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment