Commit 392ca296 authored by Guido Trotter's avatar Guido Trotter
Browse files

Implement ConfdFilterCallback

This callback can be stacked with another one, and will filter duplicate
or old results, making handling of results easier.
Signed-off-by: default avatarGuido Trotter <>
Reviewed-by: default avatarMichael Hanselmann <>
parent cf7b0cc4
......@@ -38,6 +38,10 @@ Example usage:
# And your callback will be called by asyncore, when your query gets a
# response, or when it expires.
You can use the provided ConfdFilterCallback to act as a filter, only passing
"newer" answer to your callback, and filtering out outdated ones, or ones
confirming what you already got.
import socket
import time
......@@ -288,3 +292,94 @@ class ConfdClientRequest(objects.ConfdRequest):
if self.type not in constants.CONFD_REQS:
raise errors.ConfdClientError("Invalid request type")
class ConfdFilterCallback:
"""Callback that calls another callback, but filters duplicate results.
def __init__(self, callback, logger=None):
"""Constructor for ConfdFilterCallback
@type callback: f(L{ConfdUpcallPayload})
@param callback: function to call when getting answers
@type logger: L{logging.Logger}
@keyword logger: optional logger for internal conditions
if not callable(callback):
raise errors.ProgrammerError("callback must be callable")
self._callback = callback
self._logger = logger
# answers contains a dict of salt -> answer
self._answers = {}
def _LogFilter(self, salt, new_reply, old_reply):
if not self._logger:
if new_reply.serial > old_reply.serial:
self._logger.debug("Filtering confirming answer, with newer"
" serial for query %s" % salt)
elif new_reply.serial == old_reply.serial:
if new_reply.answer != old_reply.answer:
self._logger.warning("Got incoherent answers for query %s"
" (serial: %s)" % (salt, new_reply.serial))
self._logger.debug("Filtering confirming answer, with same"
" serial for query %s" % salt)
self._logger.debug("Filtering outdated answer for query %s"
" serial: (%d < %d)" % (salt, old_reply.serial,
def _HandleExpire(self, up):
# if we have no answer we have received none, before the expiration.
if salt in self._answers:
del self._answers[salt]
def _HandleReply(self, up):
"""Handle a single confd reply, and decide whether to filter it.
@rtype: boolean
@return: True if the reply should be filtered, False if it should be passed
on to the up-callback
filter_upcall = False
salt = up.salt
if salt not in self._answers:
# first answer for a query (don't filter, and record)
self._answers[salt] = up.server_reply
elif up.server_reply.serial > self._answers[salt].serial:
# newer answer (record, and compare contents)
old_answer = self._answers[salt]
self._answers[salt] = up.server_reply
if up.server_reply.answer == old_answer.answer:
# same content (filter) (version upgrade was unrelated)
filter_upcall = True
self._LogFilter(salt, up.server_reply, old_answer)
# else: different content, pass up a second answer
# older or same-version answer (duplicate or outdated, filter)
filter_upcall = True
self._LogFilter(salt, up.server_reply, self._answers[salt])
return filter_upcall
def __call__(self, up):
"""Filtering callback
@type up: L{ConfdUpcallPayload}
@param up: upper callback
filter_upcall = False
if up.type == UPCALL_REPLY:
filter_upcall = self._HandleReply(up)
elif up.type == UPCALL_EXPIRE:
if not filter_upcall:
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment