Commit b3a72b6c authored by Christos Stavrakakis's avatar Christos Stavrakakis
Browse files

Remove messages that can not be handled from queue

Temprory fix for removing accumulated messages from RabbitMQ. Currently
snf-ganeti-eventd produces messages that can not be handled from
snf-dispatcher (e.g messages not concering an instance). These messages
are now removed from queue, since they do not concern dispatcher.
parent 8c4f2f8d
......@@ -52,6 +52,7 @@ def update_db(message):
if msg["type"] != "ganeti-op-status":
log.error("Message is of unknown type %s.", msg["type"])
message.channel.basic_ack(message.delivery_tag)
return
if msg["operation"] == "OP_INSTANCE_QUERY_DATA":
......@@ -68,13 +69,17 @@ def update_db(message):
except KeyError:
log.error("Malformed incoming JSON, missing attributes: %s",
message.body)
message.channel.basic_ack(message.delivery_tag)
except VirtualMachine.InvalidBackendIdError:
log.debug("Ignoring msg for unknown instance %s.", msg["instance"])
message.channel.basic_ack(message.delivery_tag)
except VirtualMachine.InvalidBackendMsgError, e:
log.debug("Ignoring msg of unknown type: %s.", e)
message.channel.basic_ack(message.delivery_tag)
except VirtualMachine.DoesNotExist:
log.error("VM for instance %s with id %d not found in DB.",
msg["instance"], vmid)
message.channel.basic_ack(message.delivery_tag)
except Exception as e:
log.exception("Unexpected error, msg: %s", msg)
......@@ -88,6 +93,7 @@ def update_net(message):
if msg["type"] != "ganeti-net-status":
log.error("Message is of unknown type %s", msg["type"])
message.channel.basic_ack(message.delivery_tag)
return
vmid = utils.id_from_instance_name(msg["instance"])
......@@ -100,11 +106,14 @@ def update_net(message):
except KeyError:
log.error("Malformed incoming JSON, missing attributes: %s",
message.body)
message.channel.basic_ack(message.delivery_tag)
except VirtualMachine.InvalidBackendIdError:
log.debug("Ignoring msg for unknown instance %s.", msg["instance"])
message.channel.basic_ack(message.delivery_tag)
except VirtualMachine.DoesNotExist:
log.error("VM for instance %s with id %d not found in DB.",
msg["instance"], vmid)
message.channel.basic_ack(message.delivery_tag)
except Exception as e:
log.exception("Unexpected error, msg: %s", msg)
......@@ -118,6 +127,7 @@ def update_build_progress(message):
if msg['type'] != "ganeti-create-progress":
log.error("Message is of unknown type %s", msg["type"])
message.channel.basic_ack(message.delivery_tag)
return
# XXX: The following assumes names like snf-12
......@@ -131,6 +141,7 @@ def update_build_progress(message):
except KeyError:
log.error("Malformed incoming JSON, missing attributes: %s",
message.body)
message.channel.basic_ack(message.delivery_tag)
except Exception as e:
log.exception("Unexpected error, msg: %s", msg)
raise
......@@ -144,10 +155,12 @@ def trigger_status_update(message):
msg = json.loads(message.body)
if msg["type"] != "reconcile":
log.error("Message is of unknown type %s", msg["type"])
return
message.channel.basic_ack(message.delivery_tag)
log.error("Message is of unknown type %s", msg["type"])
return
if msg["vmid"] == "":
message.channel.basic_ack(message.delivery_tag)
log.error("Reconciliation message does not specify a VM id")
return
......@@ -157,6 +170,7 @@ def trigger_status_update(message):
message.channel.basic_ack(message.delivery_tag)
except KeyError as k:
log.error("Malformed incoming JSON, missing attributes: %s", k)
message.channel.basic_ack(message.delivery_tag)
except Exception as e:
log.exception("Unexpected error, msg: %s", msg)
......@@ -169,6 +183,7 @@ def status_job_finished(message):
if msg["operation"] != 'OP_INSTANCE_QUERY_DATA':
log.error("Message is of unknown type %s", msg["operation"])
message.channel.basic_ack(message.delivery_tag)
return
if msg["status"] != "success":
......@@ -182,9 +197,10 @@ def status_job_finished(message):
log.debug("Node status job result: %s", status)
if status['summary'][0] != u'INSTANCE_QUERY_DATA':
log.error("Status update is of unknown type %s",
log.error("Status update is of unknown type %s",
status['summary'])
return
message.channel.basic_ack(message.delivery_tag)
return
conf_state = status['opresult'][0][msg['instance']]['config_state']
run_state = status['opresult'][0][msg['instance']]['run_state']
......@@ -206,6 +222,7 @@ def status_job_finished(message):
message.channel.basic_ack(message.delivery_tag)
except KeyError as k:
log.error("Malformed incoming JSON, missing attributes: %s", k)
message.channel.basic_ack(message.delivery_tag)
except Exception as e:
log.exception("Unexpected error, msg: %s", msg)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment