Commit 0401e6a9 authored by Ilias Tsitsimpis's avatar Ilias Tsitsimpis

Merge pull request #18 from cstavr/feature-dispatcher-check

Tool for checking status of Cyclades update path.

This patch series implements a tool for checking the status of Cyclades update path, which
includes Ganeti, AMQP, snf-ganeti-eventd and snf-dispatcher. The tool is
created as part of the snf-dispatcher and can be used by passing the
'--status-check' option. Also, snf-ganeti-eventd is extended in order to send
heartbeat messages.
parents 1dd1cffe d868eb84
# Copyright 2012 GRNET S.A. All rights reserved.
# Copyright 2012-2014 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
......@@ -142,7 +142,7 @@ class AMQPHaighaClient():
auto_delete=False, durable=True)
def queue_declare(self, queue, exclusive=False, mirrored=True,
mirrored_nodes='all'):
mirrored_nodes='all', ttl=None):
"""Declare a queue
@type queue: string
......@@ -157,6 +157,8 @@ class AMQPHaighaClient():
the specified nodes, and the master will be the
first node in the list. Node names must be provided
and not host IP. example: [node1@rabbit,node2@rabbit]
@type ttl: int
@param tll: Queue TTL in seconds
"""
......@@ -172,6 +174,9 @@ class AMQPHaighaClient():
else:
arguments = {}
if ttl is not None:
arguments['x-expires'] = ttl * 1000
self.channel.queue.declare(queue, durable=True, exclusive=exclusive,
auto_delete=False, arguments=arguments)
......@@ -222,7 +227,7 @@ class AMQPHaighaClient():
(exchange, routing_key, body) = self.unacked[mid]
self.basic_publish(exchange, routing_key, body)
def basic_consume(self, queue, callback):
def basic_consume(self, queue, callback, no_ack=False, exclusive=False):
"""Consume from a queue.
@type queue: string or list of strings
......@@ -233,7 +238,8 @@ class AMQPHaighaClient():
"""
self.consumers[queue] = callback
self.channel.basic.consume(queue, consumer=callback, no_ack=False)
self.channel.basic.consume(queue, consumer=callback, no_ack=no_ack,
exclusive=exclusive)
@reconnect_decorator
def basic_wait(self):
......@@ -249,8 +255,8 @@ class AMQPHaighaClient():
gevent.sleep(0)
@reconnect_decorator
def basic_get(self, queue):
self.channel.basic.get(queue, no_ack=False)
def basic_get(self, queue, no_ack=False):
self.channel.basic.get(queue, no_ack=no_ack)
@reconnect_decorator
def basic_ack(self, message):
......
# Copyright 2012 GRNET S.A. All rights reserved.
# Copyright 2012-2014 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
......@@ -188,7 +188,7 @@ class AMQPPukaClient(object):
@reconnect_decorator
def queue_declare(self, queue, exclusive=False,
mirrored=True, mirrored_nodes='all',
dead_letter_exchange=None):
dead_letter_exchange=None, ttl=None):
"""Declare a queue
@type queue: string
......@@ -203,6 +203,8 @@ class AMQPPukaClient(object):
the specified nodes, and the master will be the
first node in the list. Node names must be provided
and not host IP. example: [node1@rabbit,node2@rabbit]
@type ttl: int
@param ttl: Queue TTL in seconds
"""
self.log.info('Declaring queue: %s', queue)
......@@ -218,6 +220,9 @@ class AMQPPukaClient(object):
else:
arguments = {}
if ttl is not None:
arguments['x-expires'] = ttl * 1000
if dead_letter_exchange:
arguments['x-dead-letter-exchange'] = dead_letter_exchange
......@@ -299,7 +304,7 @@ class AMQPPukaClient(object):
self.unsend.pop(body)
@reconnect_decorator
def basic_consume(self, queue, callback, prefetch_count=0):
def basic_consume(self, queue, callback, no_ack=False, prefetch_count=0):
"""Consume from a queue.
@type queue: string or list of strings
......@@ -322,7 +327,8 @@ class AMQPPukaClient(object):
consume_promise = \
self.client.basic_consume(queue=queue,
prefetch_count=prefetch_count,
callback=handle_delivery)
callback=handle_delivery,
no_ack=no_ack)
self.consume_promises.append(consume_promise)
return consume_promise
......@@ -342,14 +348,14 @@ class AMQPPukaClient(object):
return self.client.wait(self.consume_promises, timeout)
@reconnect_decorator
def basic_get(self, queue):
def basic_get(self, queue, no_ack=False):
"""Get a single message from a queue.
This is a non-blocking method for getting messages from a queue.
It will return None if the queue is empty.
"""
get_promise = self.client.basic_get(queue=queue)
get_promise = self.client.basic_get(queue=queue, no_ack=no_ack)
result = self.client.wait(get_promise)
if 'empty' in result:
# The queue is empty
......
# Copyright 2012 GRNET S.A. All rights reserved.
# Copyright 2012-2014 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
......@@ -86,3 +86,14 @@ def convert_queue_to_dead(queue):
def convert_exchange_to_dead(exchange):
"""Convert the name of an exchange to the corresponding dead-letter one"""
return exchange + "-dl"
EVENTD_HEARTBEAT_ROUTING_KEY = "eventd.heartbeat"
def get_dispatcher_request_queue(hostname, pid):
return "snf:dispatcher:%s:%s" % (hostname, pid)
def get_dispatcher_heartbeat_queue(hostname, pid):
return "snf:dispatcher:heartbeat:%s:%s" % (hostname, pid)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2011 GRNET S.A. All rights reserved.
# Copyright 2011-2014 GRNET S.A. All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
......@@ -190,7 +190,8 @@ class JobFileHandler(pyinotify.ProcessEvent):
self.op_handlers = {"INSTANCE": self.process_instance_op,
"NETWORK": self.process_network_op,
"CLUSTER": self.process_cluster_op}
"CLUSTER": self.process_cluster_op,
"TAGS": self.process_tag_op}
# "GROUP": self.process_group_op}
def process_IN_CLOSE_WRITE(self, event):
......@@ -359,6 +360,28 @@ class JobFileHandler(pyinotify.ProcessEvent):
return msg, routekey
def process_tag_op(self, op, job_id):
""" Process OP_TAGS_* opcodes.
"""
input = op.input
op_id = input.OP_ID
if op_id == "OP_TAGS_SET":
if op.status == "waiting" and input.tags and input.dry_run and\
input.kind == "cluster":
# Special where a prefixed cluster tag operation in dry-run
# mode is used in order to trigger eventd to send a
# heartbeat message.
tag = input.tags[0]
if tag.startswith("snf:eventd:heartbeat"):
self.logger.debug("Received heartbeat tag '%s'."
" Sending response.", tag)
msg = {"type": "eventd-heartbeat",
"cluster": self.cluster_name}
return msg, "eventd.heartbeat"
return None, None
def find_cluster_name():
global handler_logger
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment