Commit a82ae57c authored by Christos Stavrakakis's avatar Christos Stavrakakis

snf-common: Support 'no_ack' in AMQP clients

Enable the Synnefo AMQP clients to consume messages from the queues
without requiring acknowledgments.

Also, add the missing 'exclusive' argument in 'AMQPHaighaClient'.
parent 267ba53d
...@@ -227,7 +227,7 @@ class AMQPHaighaClient(): ...@@ -227,7 +227,7 @@ class AMQPHaighaClient():
(exchange, routing_key, body) = self.unacked[mid] (exchange, routing_key, body) = self.unacked[mid]
self.basic_publish(exchange, routing_key, body) self.basic_publish(exchange, routing_key, body)
def basic_consume(self, queue, callback): def basic_consume(self, queue, callback, no_ack=False, exclusive=False):
"""Consume from a queue. """Consume from a queue.
@type queue: string or list of strings @type queue: string or list of strings
...@@ -238,7 +238,8 @@ class AMQPHaighaClient(): ...@@ -238,7 +238,8 @@ class AMQPHaighaClient():
""" """
self.consumers[queue] = callback self.consumers[queue] = callback
self.channel.basic.consume(queue, consumer=callback, no_ack=False) self.channel.basic.consume(queue, consumer=callback, no_ack=no_ack,
exclusive=exclusive)
@reconnect_decorator @reconnect_decorator
def basic_wait(self): def basic_wait(self):
...@@ -254,8 +255,8 @@ class AMQPHaighaClient(): ...@@ -254,8 +255,8 @@ class AMQPHaighaClient():
gevent.sleep(0) gevent.sleep(0)
@reconnect_decorator @reconnect_decorator
def basic_get(self, queue): def basic_get(self, queue, no_ack=False):
self.channel.basic.get(queue, no_ack=False) self.channel.basic.get(queue, no_ack=no_ack)
@reconnect_decorator @reconnect_decorator
def basic_ack(self, message): def basic_ack(self, message):
......
...@@ -304,7 +304,7 @@ class AMQPPukaClient(object): ...@@ -304,7 +304,7 @@ class AMQPPukaClient(object):
self.unsend.pop(body) self.unsend.pop(body)
@reconnect_decorator @reconnect_decorator
def basic_consume(self, queue, callback, prefetch_count=0): def basic_consume(self, queue, callback, no_ack=False, prefetch_count=0):
"""Consume from a queue. """Consume from a queue.
@type queue: string or list of strings @type queue: string or list of strings
...@@ -327,7 +327,8 @@ class AMQPPukaClient(object): ...@@ -327,7 +327,8 @@ class AMQPPukaClient(object):
consume_promise = \ consume_promise = \
self.client.basic_consume(queue=queue, self.client.basic_consume(queue=queue,
prefetch_count=prefetch_count, prefetch_count=prefetch_count,
callback=handle_delivery) callback=handle_delivery,
no_ack=no_ack)
self.consume_promises.append(consume_promise) self.consume_promises.append(consume_promise)
return consume_promise return consume_promise
...@@ -347,14 +348,14 @@ class AMQPPukaClient(object): ...@@ -347,14 +348,14 @@ class AMQPPukaClient(object):
return self.client.wait(self.consume_promises, timeout) return self.client.wait(self.consume_promises, timeout)
@reconnect_decorator @reconnect_decorator
def basic_get(self, queue): def basic_get(self, queue, no_ack=False):
"""Get a single message from a queue. """Get a single message from a queue.
This is a non-blocking method for getting messages from a queue. This is a non-blocking method for getting messages from a queue.
It will return None if the queue is empty. It will return None if the queue is empty.
""" """
get_promise = self.client.basic_get(queue=queue) get_promise = self.client.basic_get(queue=queue, no_ack=no_ack)
result = self.client.wait(get_promise) result = self.client.wait(get_promise)
if 'empty' in result: if 'empty' in result:
# The queue is empty # The queue is empty
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment