diff --git a/lib/confd/client.py b/lib/confd/client.py index 4cffcd30d6943c35e614d187cf485ef34717e3f1..803fbb5ab14ae7bd6746ff3e9c9822c7a7ecaad7 100644 --- a/lib/confd/client.py +++ b/lib/confd/client.py @@ -38,6 +38,10 @@ Example usage: # And your callback will be called by asyncore, when your query gets a # response, or when it expires. +You can use the provided ConfdFilterCallback to act as a filter, only passing +"newer" answer to your callback, and filtering out outdated ones, or ones +confirming what you already got. + """ import socket import time @@ -288,3 +292,94 @@ class ConfdClientRequest(objects.ConfdRequest): if self.type not in constants.CONFD_REQS: raise errors.ConfdClientError("Invalid request type") + +class ConfdFilterCallback: + """Callback that calls another callback, but filters duplicate results. + + """ + def __init__(self, callback, logger=None): + """Constructor for ConfdFilterCallback + + @type callback: f(L{ConfdUpcallPayload}) + @param callback: function to call when getting answers + @type logger: L{logging.Logger} + @keyword logger: optional logger for internal conditions + + """ + if not callable(callback): + raise errors.ProgrammerError("callback must be callable") + + self._callback = callback + self._logger = logger + # answers contains a dict of salt -> answer + self._answers = {} + + def _LogFilter(self, salt, new_reply, old_reply): + if not self._logger: + return + + if new_reply.serial > old_reply.serial: + self._logger.debug("Filtering confirming answer, with newer" + " serial for query %s" % salt) + elif new_reply.serial == old_reply.serial: + if new_reply.answer != old_reply.answer: + self._logger.warning("Got incoherent answers for query %s" + " (serial: %s)" % (salt, new_reply.serial)) + else: + self._logger.debug("Filtering confirming answer, with same" + " serial for query %s" % salt) + else: + self._logger.debug("Filtering outdated answer for query %s" + " serial: (%d < %d)" % (salt, old_reply.serial, + new_reply.serial)) + + def _HandleExpire(self, up): + # if we have no answer we have received none, before the expiration. + if salt in self._answers: + del self._answers[salt] + + def _HandleReply(self, up): + """Handle a single confd reply, and decide whether to filter it. + + @rtype: boolean + @return: True if the reply should be filtered, False if it should be passed + on to the up-callback + + """ + filter_upcall = False + salt = up.salt + if salt not in self._answers: + # first answer for a query (don't filter, and record) + self._answers[salt] = up.server_reply + elif up.server_reply.serial > self._answers[salt].serial: + # newer answer (record, and compare contents) + old_answer = self._answers[salt] + self._answers[salt] = up.server_reply + if up.server_reply.answer == old_answer.answer: + # same content (filter) (version upgrade was unrelated) + filter_upcall = True + self._LogFilter(salt, up.server_reply, old_answer) + # else: different content, pass up a second answer + else: + # older or same-version answer (duplicate or outdated, filter) + filter_upcall = True + self._LogFilter(salt, up.server_reply, self._answers[salt]) + + return filter_upcall + + def __call__(self, up): + """Filtering callback + + @type up: L{ConfdUpcallPayload} + @param up: upper callback + + """ + filter_upcall = False + if up.type == UPCALL_REPLY: + filter_upcall = self._HandleReply(up) + elif up.type == UPCALL_EXPIRE: + self._HandleExpire(up) + + if not filter_upcall: + self._callback(up) +