From bfbbc22328ad29f71d2f1eceec40c2782fb553b2 Mon Sep 17 00:00:00 2001
From: Iustin Pop <iustin@google.com>
Date: Wed, 7 Apr 2010 15:01:57 +0200
Subject: [PATCH] ConfdClient: add synchronous wait for replies mode

Currently, there is no way for a user of the confd client library to
know how many replies there should be, whether all have been received,
etc. This is bad since we can't reliably detect the consistency of the
results.

This patch attempts to fix this by adding a synchronous WaitForReply
function that will wait until either a timeout expires, or until a
minimum number of replies have been received (interested users should
add similar functionality for the async case). The callback
functionality will still do call-backs into the user-provided code
during the wait, but after this function has returned, we know that we
received all possible replies.

Note: To account for the interval between initial send of the request,
and calling of this function, we modify the expiration time of the
request.

Signed-off-by: Iustin Pop <iustin@google.com>
Reviewed-by: Guido Trotter <ultrotter@google.com>
---
 lib/confd/client.py | 88 +++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 86 insertions(+), 2 deletions(-)

diff --git a/lib/confd/client.py b/lib/confd/client.py
index 47c000936..de2dc4d4e 100644
--- a/lib/confd/client.py
+++ b/lib/confd/client.py
@@ -91,12 +91,16 @@ class _Request(object):
   @ivar request: the request data
   @ivar args: any extra arguments for the callback
   @ivar expiry: the expiry timestamp of the request
+  @ivar sent: the set of contacted peers
+  @ivar rcvd: the set of peers who replied
 
   """
-  def __init__(self, request, args, expiry):
+  def __init__(self, request, args, expiry, sent):
     self.request = request
     self.args = args
     self.expiry = expiry
+    self.sent = frozenset(sent)
+    self.rcvd = set()
 
 
 class ConfdClient:
@@ -233,7 +237,8 @@ class ConfdClient:
         raise errors.ConfdClientError("Request too big")
 
     expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
-    self._requests[request.rsalt] = _Request(request, args, expire_time)
+    self._requests[request.rsalt] = _Request(request, args, expire_time,
+                                             targets)
 
     if not async:
       self.FlushSendQueue()
@@ -259,6 +264,8 @@ class ConfdClient:
           self._logger.debug("Discarding unknown (expired?) reply: %s" % err)
         return
 
+      rq.rcvd.add(ip)
+
       client_reply = ConfdUpcallPayload(salt=salt,
                                         type=UPCALL_REPLY,
                                         server_reply=answer,
@@ -293,6 +300,83 @@ class ConfdClient:
     """
     return self._socket.process_next_packet(timeout=timeout)
 
+  @staticmethod
+  def _NeededReplies(peer_cnt):
+    """Compute the minimum safe number of replies for a query.
+
+    The algorithm is designed to work well for both small and big
+    number of peers:
+        - for less than three, we require all responses
+        - for less than five, we allow one miss
+        - otherwise, half the number plus one
+
+    This guarantees that we progress monotonically: 1->1, 2->2, 3->2,
+    4->2, 5->3, 6->3, 7->4, etc.
+
+    @type peer_cnt: int
+    @param peer_cnt: the number of peers contacted
+    @rtype: int
+    @return: the number of replies which should give a safe coverage
+
+    """
+    if peer_cnt < 3:
+      return peer_cnt
+    elif peer_cnt < 5:
+      return peer_cnt - 1
+    else:
+      return int(peer_cnt/2) + 1
+
+  def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT):
+    """Wait for replies to a given request.
+
+    This method will wait until either the timeout expires or a
+    minimum number (computed using L{_NeededReplies}) of replies are
+    received for the given salt. It is useful when doing synchronous
+    calls to this library.
+
+    @param salt: the salt of the request we want responses for
+    @param timeout: the maximum timeout (should be less or equal to
+        L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT}
+    @rtype: tuple
+    @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the
+        request is unknown, timed_out will be true and the counters
+        will be zero
+
+    """
+    def _CheckResponse():
+      if salt not in self._requests:
+        # expired?
+        if self._logger:
+          self._logger.debug("Discarding unknown/expired request: %s" % salt)
+        return MISSING
+      rq = self._requests[salt]
+      if len(rq.rcvd) >= expected:
+        # already got all replies
+        return (False, len(rq.sent), len(rq.rcvd))
+      # else wait, using default timeout
+      self.ReceiveReply()
+      raise utils.RetryAgain()
+
+    MISSING = (True, 0, 0)
+
+    if salt not in self._requests:
+      return MISSING
+    # extend the expire time with the current timeout, so that we
+    # don't get the request expired from under us
+    rq = self._requests[salt]
+    rq.expiry += timeout
+    sent = len(rq.sent)
+    expected = self._NeededReplies(sent)
+
+    try:
+      return utils.Retry(_CheckResponse, 0, timeout)
+    except utils.RetryTimeout:
+      if salt in self._requests:
+        rq = self._requests[salt]
+        return (True, len(rq.sent), len(rq.rcvd))
+      else:
+        return MISSING
+
 
 # UPCALL_REPLY: server reply upcall
 # has all ConfdUpcallPayload fields populated
-- 
GitLab