Commit c1ca947a authored by Georgios Gousios's avatar Georgios Gousios
Browse files

Lots of backfixes in status reconciliation code

parent 5fbe73dd
......@@ -173,11 +173,11 @@ def cleanup_queues() :
if ans not in ['Y', 'y']:
return
for exchange in settings.EXCHANGES:
try:
chan.exchange_delete(exchange=exchange)
except amqp.exceptions.AMQPChannelException as e:
print e.amqp_reply_code, " ", e.amqp_reply_text
#for exchange in settings.EXCHANGES:
# try:
# chan.exchange_delete(exchange=exchange)
# except amqp.exceptions.AMQPChannelException as e:
# print e.amqp_reply_code, " ", e.amqp_reply_text
for queue in settings.QUEUES:
try:
......
......@@ -50,6 +50,9 @@ def update_db(message):
_logger.error("Message is of unknown type %s.", msg["type"])
return
if msg["operation"] == "OP_INSTANCE_QUERY_DATA":
return status_job_finished(message)
vmid = utils.id_from_instance_name(msg["instance"])
vm = VirtualMachine.objects.get(id=vmid)
......@@ -113,63 +116,62 @@ def update_credits(message):
message.channel.basic_ack(message.delivery_tag)
def trigger_status_update(message):
_logger.debug("Request to trigger status update:", message.body)
_logger.debug("Request to trigger status update: %s", message.body)
try:
msg = json.loads(message.body)
if msg["type"] != "reconciliate" :
if msg["type"] != "reconcile" :
_logger.error("Message is of unknown type %s", msg["type"])
return
if msg["vm-id"] == "" :
_logger.error("Message does not specify a VM id")
if msg["vmid"] == "" :
_logger.error("Reconciliate message does not specify a VM id")
return
vm = VirtualMachine.objects.get(id=msg["vm-id"])
vm = VirtualMachine.objects.get(id=msg["vmid"])
backend.request_status_update(vm)
message.channel.basic_ack(message.delivery_tag)
except KeyError:
_logger.error("Malformed incoming JSON, missing attributes: %s",
message.body)
except KeyError as k:
_logger.error("Malformed incoming JSON, missing attributes: %s", k)
except Exception as e:
_logger.error("Unexpected error:\n%s" %
"".join(traceback.format_exception(*sys.exc_info())))
_logger.error("Unexpected error:%s", e)
def status_job_finished (message) :
_logger.debug("Job status message received:", message.body)
try:
msg = json.loads(message.body)
msg = message.body;
if msg['operation'] != u'OP_INSTANCE_QUERY_DATA':
if msg["operation"] != 'OP_INSTANCE_QUERY_DATA':
_logger.error("Message is of unknown type %s", msg["operation"])
return
if msg["status"] != "success" :
_logger.error("Status job %d for %s did not finish properly",
_logger.warn("Ignoring non-success status update from job %d on VM %s",
msg['jobId'], msg['instance'])
return
status = backend.get_job_status(msg['jobid'])
status = backend.get_job_status(msg['jobId'])
if status["summary"] != "INSTANCE_QUERY_DATA" or type(status["opresult"]) is not list:
_logger.error("Status is of unknown type %s", msg["summary"])
_logger.debug("Node status job result: %s" % status)
stat = json.loads(status)
if stat["summary"] != "INSTANCE_QUERY_DATA" or \
type(stat["opresult"]) is not list:
_logger.error("Status is of unknown type %s", stat["summary"])
return
req_state = status['opresult'][msg['instance']]['config_state']
run_state = status['opresult'][msg['instance']]['run_state']
req_state = stat['opresult'][msg['instance']]['config_state']
run_state = stat['opresult'][msg['instance']]['run_state']
vm = VirtualMachine.objects.get(name=msg['instance'])
backend.update_status(vm, run_state)
message.channel.basic_ack(message.delivery_tag)
except KeyError:
_logger.error("Malformed incoming JSON, missing attributes: %s",
message.body)
except KeyError as k:
_logger.error("Malformed incoming JSON, missing attributes: %s", k)
except Exception as e:
_logger.error("Unexpected error:\n%s" %
"".join(traceback.format_exception(*sys.exc_info())))
_logger.error("Unexpected error:%s"%e)
def dummy_proc(message):
try:
......
......@@ -67,14 +67,15 @@ class Command(NoArgsCommand):
to_update = all.count() / settings.RECONCILIATION_MIN
vm_ids = map(lambda x: x.name, VirtualMachine.objects.all()[:to_update])
vm_ids = map(lambda x: x.id, VirtualMachine.objects.all()[:to_update])
sent = False
self.open_channel()
for vmid in vm_ids :
while sent is False:
try:
msg = dict(type = "reconciliate", vmid = vmid)
self.chan.basic_publish(json.dumps(msg),
msg = dict(type = "reconcile", vmid = vmid)
amqp_msg = amqp.Message(json.dumps(msg))
self.chan.basic_publish(amqp_msg,
exchange=settings.EXCHANGE_CRON,
routing_key="reconciliation.%s"%vmid)
sent = True
......@@ -83,5 +84,4 @@ class Command(NoArgsCommand):
except Exception:
raise
print "All: %d, To update: %d, Triggered update for: %s" % (all.count(), not_updated.count(), vm_ids)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment