Commit bfbbc223 authored by Iustin Pop's avatar Iustin Pop
Browse files

ConfdClient: add synchronous wait for replies mode

Currently, there is no way for a user of the confd client library to
know how many replies there should be, whether all have been received,
etc. This is bad since we can't reliably detect the consistency of the

This patch attempts to fix this by adding a synchronous WaitForReply
function that will wait until either a timeout expires, or until a
minimum number of replies have been received (interested users should
add similar functionality for the async case). The callback
functionality will still do call-backs into the user-provided code
during the wait, but after this function has returned, we know that we
received all possible replies.

Note: To account for the interval between initial send of the request,
and calling of this function, we modify the expiration time of the
Signed-off-by: default avatarIustin Pop <>
Reviewed-by: default avatarGuido Trotter <>
parent 71e114da
...@@ -91,12 +91,16 @@ class _Request(object): ...@@ -91,12 +91,16 @@ class _Request(object):
@ivar request: the request data @ivar request: the request data
@ivar args: any extra arguments for the callback @ivar args: any extra arguments for the callback
@ivar expiry: the expiry timestamp of the request @ivar expiry: the expiry timestamp of the request
@ivar sent: the set of contacted peers
@ivar rcvd: the set of peers who replied
""" """
def __init__(self, request, args, expiry): def __init__(self, request, args, expiry, sent):
self.request = request self.request = request
self.args = args self.args = args
self.expiry = expiry self.expiry = expiry
self.sent = frozenset(sent)
self.rcvd = set()
class ConfdClient: class ConfdClient:
...@@ -233,7 +237,8 @@ class ConfdClient: ...@@ -233,7 +237,8 @@ class ConfdClient:
raise errors.ConfdClientError("Request too big") raise errors.ConfdClientError("Request too big")
expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
self._requests[request.rsalt] = _Request(request, args, expire_time) self._requests[request.rsalt] = _Request(request, args, expire_time,
if not async: if not async:
self.FlushSendQueue() self.FlushSendQueue()
...@@ -259,6 +264,8 @@ class ConfdClient: ...@@ -259,6 +264,8 @@ class ConfdClient:
self._logger.debug("Discarding unknown (expired?) reply: %s" % err) self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
return return
client_reply = ConfdUpcallPayload(salt=salt, client_reply = ConfdUpcallPayload(salt=salt,
server_reply=answer, server_reply=answer,
...@@ -293,6 +300,83 @@ class ConfdClient: ...@@ -293,6 +300,83 @@ class ConfdClient:
""" """
return self._socket.process_next_packet(timeout=timeout) return self._socket.process_next_packet(timeout=timeout)
def _NeededReplies(peer_cnt):
"""Compute the minimum safe number of replies for a query.
The algorithm is designed to work well for both small and big
number of peers:
- for less than three, we require all responses
- for less than five, we allow one miss
- otherwise, half the number plus one
This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
4->2, 5->3, 6->3, 7->4, etc.
@type peer_cnt: int
@param peer_cnt: the number of peers contacted
@rtype: int
@return: the number of replies which should give a safe coverage
if peer_cnt < 3:
return peer_cnt
elif peer_cnt < 5:
return peer_cnt - 1
return int(peer_cnt/2) + 1
def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
"""Wait for replies to a given request.
This method will wait until either the timeout expires or a
minimum number (computed using L{_NeededReplies}) of replies are
received for the given salt. It is useful when doing synchronous
calls to this library.
@param salt: the salt of the request we want responses for
@param timeout: the maximum timeout (should be less or equal to
@rtype: tuple
@return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
request is unknown, timed_out will be true and the counters
will be zero
def _CheckResponse():
if salt not in self._requests:
# expired?
if self._logger:
self._logger.debug("Discarding unknown/expired request: %s" % salt)
return MISSING
rq = self._requests[salt]
if len(rq.rcvd) >= expected:
# already got all replies
return (False, len(rq.sent), len(rq.rcvd))
# else wait, using default timeout
raise utils.RetryAgain()
MISSING = (True, 0, 0)
if salt not in self._requests:
return MISSING
# extend the expire time with the current timeout, so that we
# don't get the request expired from under us
rq = self._requests[salt]
rq.expiry += timeout
sent = len(rq.sent)
expected = self._NeededReplies(sent)
return utils.Retry(_CheckResponse, 0, timeout)
except utils.RetryTimeout:
if salt in self._requests:
rq = self._requests[salt]
return (True, len(rq.sent), len(rq.rcvd))
return MISSING
# UPCALL_REPLY: server reply upcall # UPCALL_REPLY: server reply upcall
# has all ConfdUpcallPayload fields populated # has all ConfdUpcallPayload fields populated
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment