Commit af434257 authored by Christos Stavrakakis's avatar Christos Stavrakakis
Browse files

Make amqp_puka get logger as argument

parent 4f5c00ee
......@@ -52,8 +52,6 @@ from functools import wraps
from ordereddict import OrderedDict
from synnefo import settings
logger = logging.getLogger("amqp")
def reconnect_decorator(func):
"""
......@@ -65,7 +63,7 @@ def reconnect_decorator(func):
try:
return func(self, *args, **kwargs)
except (socket_error, spec_exceptions.ConnectionForced) as e:
logger.error('Connection Closed while in %s: %s', func.__name__, e)
self.log.error('Connection Closed while in %s: %s', func.__name__, e)
self.connect()
return wrapper
......@@ -77,7 +75,7 @@ class AMQPPukaClient(object):
"""
def __init__(self, hosts=settings.AMQP_HOSTS, max_retries=30,
confirms=True, confirm_buffer=100):
confirms=True, confirm_buffer=100, logger=None):
"""
Format hosts as "amqp://username:pass@host:port"
max_retries=0 defaults to unlimited retries
......@@ -98,10 +96,16 @@ class AMQPPukaClient(object):
self.unsend = OrderedDict()
self.consume_promises = []
self.exchanges = []
if logger:
self.log = logger
else:
logger = logging.getLogger("amqp")
logging.basicConfig()
self.log = logger
def connect(self, retries=0):
if self.max_retries and retries >= self.max_retries:
logger.error("Aborting after %d retries", retries)
self.log.error("Aborting after %d retries", retries)
raise AMQPConnectionError('Aborting after %d connection failures.'\
% retries)
return
......@@ -113,20 +117,20 @@ class AMQPPukaClient(object):
self.client = Client(host, pubacks=self.confirms)
host = host.split('@')[-1]
logger.debug('Connecting to node %s' % host)
self.log.debug('Connecting to node %s' % host)
try:
promise = self.client.connect()
self.client.wait(promise)
except socket_error as e:
if retries < len(self.hosts):
logger.warning('Cannot connect to host %s: %s', host, e)
self.log.warning('Cannot connect to host %s: %s', host, e)
else:
logger.error('Cannot connect to host %s: %s', host, e)
self.log.error('Cannot connect to host %s: %s', host, e)
sleep(1)
return self.connect(retries + 1)
logger.info('Successfully connected to host: %s', host)
self.log.info('Successfully connected to host: %s', host)
# Setup TCP keepalive option
self.client.sd.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
......@@ -137,7 +141,7 @@ class AMQPPukaClient(object):
# Keepalive retry
self.client.sd.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 10)
logger.info('Creating channel')
self.log.info('Creating channel')
# Clear consume_promises each time connecting, since they are related
# to the connection object
......@@ -172,7 +176,7 @@ class AMQPPukaClient(object):
@param exhange_type: one of 'direct', 'topic', 'fanout'
"""
logger.info('Declaring %s exchange: %s', type, exchange)
self.log.info('Declaring %s exchange: %s', type, exchange)
promise = self.client.exchange_declare(exchange=exchange,
type=type,
durable=True,
......@@ -200,7 +204,7 @@ class AMQPPukaClient(object):
and not host IP. example: [node1@rabbit,node2@rabbit]
"""
logger.info('Declaring queue: %s', queue)
self.log.info('Declaring queue: %s', queue)
if mirrored:
if mirrored_nodes == 'all':
......@@ -223,8 +227,8 @@ class AMQPPukaClient(object):
self.client.wait(promise)
def queue_bind(self, queue, exchange, routing_key):
logger.debug('Binding queue %s to exchange %s with key %s'
% (queue, exchange, routing_key))
self.log.debug('Binding queue %s to exchange %s with key %s'
% (queue, exchange, routing_key))
promise = self.client.queue_bind(exchange=exchange, queue=queue,
routing_key=routing_key)
self.client.wait(promise)
......@@ -282,7 +286,7 @@ class AMQPPukaClient(object):
msgs = self.unacked.values()
self.unacked.clear()
for exchange, routing_key, body in msgs:
logger.debug('Resending message %s' % body)
self.log.debug('Resending message %s' % body)
self.basic_publish(exchange, routing_key, body)
@reconnect_decorator
......@@ -311,7 +315,7 @@ class AMQPPukaClient(object):
if 'body' in msg:
callback(self, msg)
else:
logger.debug("Message without body %s" % msg)
self.log.debug("Message without body %s" % msg)
raise socket_error
consume_promise = \
......@@ -372,15 +376,14 @@ class AMQPPukaClient(object):
def close(self):
"""Check that messages have been send and close the connection."""
logger.debug("Closing connection to %s", self.client.host)
self.log.debug("Closing connection to %s", self.client.host)
try:
if self.confirms:
self.get_confirms()
close_promise = self.client.close()
self.client.wait(close_promise)
except (socket_error, spec_exceptions.ConnectionForced) as e:
logger.error('Connection closed while closing connection:%s',
e)
self.log.error('Connection closed while closing connection:%s', e)
def queue_delete(self, queue, if_unused=True, if_empty=True):
"""Delete a queue.
......@@ -394,7 +397,7 @@ class AMQPPukaClient(object):
self.client.wait(promise)
return True
except spec_exceptions.NotFound:
logger.info("Queue %s does not exist", queue)
self.log.info("Queue %s does not exist", queue)
return False
def exchange_delete(self, exchange, if_unused=True):
......@@ -406,7 +409,7 @@ class AMQPPukaClient(object):
self.client.wait(promise)
return True
except spec_exceptions.NotFound:
logger.info("Exchange %s does not exist", exchange)
self.log.info("Exchange %s does not exist", exchange)
return False
@reconnect_decorator
......
......@@ -108,7 +108,7 @@ class Dispatcher:
def _init(self):
log.info("Initializing")
self.client = AMQPClient()
self.client = AMQPClient(logger=log_amqp)
# Connect to AMQP host
self.client.connect()
......
......@@ -67,6 +67,7 @@ from synnefo import settings
from synnefo.lib.amqp import AMQPClient
def get_time_from_status(op, job):
"""Generate a unique message identifier for a ganeti job.
......@@ -125,7 +126,7 @@ class JobFileHandler(pyinotify.ProcessEvent):
# Set max_retries to 0 for unlimited retries.
self.client = AMQPClient(hosts=settings.AMQP_HOSTS, confirm_buffer=25,
max_retries=0)
max_retries=0, logger=logger)
handler_logger.info("Attempting to connect to RabbitMQ hosts")
......
......@@ -165,7 +165,8 @@ class GanetiHook():
# FIXME: We need a reconciliation mechanism between the DB and
# Ganeti, for cases exactly like this.
self.client = AMQPClient(hosts=settings.AMQP_HOSTS,
max_retries = 2 * len(settings.AMQP_HOSTS))
max_retries=2 * len(settings.AMQP_HOSTS),
logger=logger)
self.client.connect()
def on_master(self):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment