diff --git a/lib/confd/client.py b/lib/confd/client.py index 47c000936a8f62abfe80496dfa5d225e61c3a056..de2dc4d4e5a0a649f6efb5443fb013a785dd2767 100644 --- a/lib/confd/client.py +++ b/lib/confd/client.py @@ -91,12 +91,16 @@ class _Request(object): @ivar request: the request data @ivar args: any extra arguments for the callback @ivar expiry: the expiry timestamp of the request + @ivar sent: the set of contacted peers + @ivar rcvd: the set of peers who replied """ - def __init__(self, request, args, expiry): + def __init__(self, request, args, expiry, sent): self.request = request self.args = args self.expiry = expiry + self.sent = frozenset(sent) + self.rcvd = set() class ConfdClient: @@ -233,7 +237,8 @@ class ConfdClient: raise errors.ConfdClientError("Request too big") expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT - self._requests[request.rsalt] = _Request(request, args, expire_time) + self._requests[request.rsalt] = _Request(request, args, expire_time, + targets) if not async: self.FlushSendQueue() @@ -259,6 +264,8 @@ class ConfdClient: self._logger.debug("Discarding unknown (expired?) reply: %s" % err) return + rq.rcvd.add(ip) + client_reply = ConfdUpcallPayload(salt=salt, type=UPCALL_REPLY, server_reply=answer, @@ -293,6 +300,83 @@ class ConfdClient: """ return self._socket.process_next_packet(timeout=timeout) + @staticmethod + def _NeededReplies(peer_cnt): + """Compute the minimum safe number of replies for a query. + + The algorithm is designed to work well for both small and big + number of peers: + - for less than three, we require all responses + - for less than five, we allow one miss + - otherwise, half the number plus one + + This guarantees that we progress monotonically: 1->1, 2->2, 3->2, + 4->2, 5->3, 6->3, 7->4, etc. + + @type peer_cnt: int + @param peer_cnt: the number of peers contacted + @rtype: int + @return: the number of replies which should give a safe coverage + + """ + if peer_cnt < 3: + return peer_cnt + elif peer_cnt < 5: + return peer_cnt - 1 + else: + return int(peer_cnt/2) + 1 + + def WaitForReply(self, salt, timeout=constants.CONFD_CLIENT_EXPIRE_TIMEOUT): + """Wait for replies to a given request. + + This method will wait until either the timeout expires or a + minimum number (computed using L{_NeededReplies}) of replies are + received for the given salt. It is useful when doing synchronous + calls to this library. + + @param salt: the salt of the request we want responses for + @param timeout: the maximum timeout (should be less or equal to + L{ganeti.constants.CONFD_CLIENT_EXPIRE_TIMEOUT} + @rtype: tuple + @return: a tuple of (timed_out, sent_cnt, recv_cnt); if the + request is unknown, timed_out will be true and the counters + will be zero + + """ + def _CheckResponse(): + if salt not in self._requests: + # expired? + if self._logger: + self._logger.debug("Discarding unknown/expired request: %s" % salt) + return MISSING + rq = self._requests[salt] + if len(rq.rcvd) >= expected: + # already got all replies + return (False, len(rq.sent), len(rq.rcvd)) + # else wait, using default timeout + self.ReceiveReply() + raise utils.RetryAgain() + + MISSING = (True, 0, 0) + + if salt not in self._requests: + return MISSING + # extend the expire time with the current timeout, so that we + # don't get the request expired from under us + rq = self._requests[salt] + rq.expiry += timeout + sent = len(rq.sent) + expected = self._NeededReplies(sent) + + try: + return utils.Retry(_CheckResponse, 0, timeout) + except utils.RetryTimeout: + if salt in self._requests: + rq = self._requests[salt] + return (True, len(rq.sent), len(rq.rcvd)) + else: + return MISSING + # UPCALL_REPLY: server reply upcall # has all ConfdUpcallPayload fields populated