diff --git a/lib/confd/client.py b/lib/confd/client.py index 0ceb6c725d28a16719d6a550397ab003a1114f36..f5276866a7f1391e2e8a101d405cf0cc6ceb873a 100644 --- a/lib/confd/client.py +++ b/lib/confd/client.py @@ -65,23 +65,28 @@ class ConfdClient: through asyncore or with your own handling. """ - def __init__(self, hmac_key, peers): + def __init__(self, hmac_key, peers, callback): """Constructor for ConfdClient @type hmac_key: string @param hmac_key: hmac key to talk to confd @type peers: list @param peers: list of peer nodes + @type callback: f(L{ConfdUpcallPayload}) + @param callback: function to call when getting answers """ if not isinstance(peers, list): raise errors.ProgrammerError("peers must be a list") + if not callable(callback): + raise errors.ProgrammerError("callback must be callable") self._peers = peers self._hmac_key = hmac_key self._socket = ConfdAsyncUDPClient(self) - self._callbacks = {} - self._expire_callbacks = [] + self._callback = callback + self._requests = {} + self._expire_requests = [] self._confd_port = utils.GetDaemonPort(constants.CONFD) def _PackRequest(self, request, now=None): @@ -102,26 +107,30 @@ class ConfdClient: (answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key) return answer, salt - def _ExpireCallbacks(self): - """Delete all the expired callbacks. + def ExpireRequests(self): + """Delete all the expired requests. """ now = time.time() - while self._expire_callbacks: - expire_time, rsalt = self._expire_callbacks[0] + while self._expire_requests: + expire_time, rsalt = self._expire_requests[0] if now >= expire_time: - self._expire_callbacks.pop() - del self._callbacks[rsalt] + self._expire_requests.pop(0) + (request, args) = self._requests[rsalt] + del self._requests[rsalt] + client_reply = ConfdUpcallPayload(salt=rsalt, + type=UPCALL_EXPIRE, + orig_request=request, + extra_args=args) + self._callback(client_reply) else: break - def SendRequest(self, request, callback, args=None, coverage=None): + def SendRequest(self, request, args=None, coverage=None): """Send a confd request to some MCs @type request: L{objects.ConfdRequest} @param request: the request to send - @type callback: f(answer, req_type, req_query, salt, ip, port, args) - @param callback: answer callback @type args: tuple @keyword args: additional callback arguments @type coverage: integer @@ -131,9 +140,6 @@ class ConfdClient: if coverage is None: coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE) - if not callable(callback): - raise errors.ConfdClientError("callback must be callable") - if coverage > len(self._peers): raise errors.ConfdClientError("Not enough MCs known to provide the" " desired coverage") @@ -141,8 +147,8 @@ class ConfdClient: if not request.rsalt: raise errors.ConfdClientError("Missing request rsalt") - self._ExpireCallbacks() - if request.rsalt in self._callbacks: + self.ExpireRequests() + if request.rsalt in self._requests: raise errors.ConfdClientError("Duplicate request rsalt") if request.type not in constants.CONFD_REQS: @@ -160,10 +166,9 @@ class ConfdClient: except errors.UdpDataSizeError: raise errors.ConfdClientError("Request too big") - self._callbacks[request.rsalt] = (callback, request.type, - request.query, args) + self._requests[request.rsalt] = (request, args) expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT - self._expire_callbacks.append((expire_time, request.rsalt)) + self._expire_requests.append((expire_time, request.rsalt)) def HandleResponse(self, payload, ip, port): """Asynchronous handler for a confd reply @@ -178,16 +183,65 @@ class ConfdClient: return try: - (callback, type, query, args) = self._callbacks[salt] + (request, args) = self._requests[salt] except KeyError: # If the salt is unkown the answer is probably a replay of an old # expired query. Ignoring it. - pass - else: - callback(answer, type, query, salt, ip, port, args) + return + + client_reply = ConfdUpcallPayload(salt=salt, + type=UPCALL_REPLY, + server_reply=answer, + orig_request=request, + server_ip=ip, + server_port=port, + extra_args=args) + self._callback(client_reply) finally: - self._ExpireCallbacks() + self.ExpireRequests() + + +# UPCALL_REPLY: server reply upcall +# has all ConfdUpcallPayload fields populated +UPCALL_REPLY = 1 +# UPCALL_EXPIRE: internal library request expire +# has only salt, type, orig_request and extra_args +UPCALL_EXPIRE = 2 +CONFD_UPCALL_TYPES = frozenset([ + UPCALL_REPLY, + UPCALL_EXPIRE, + ]) + + +class ConfdUpcallPayload(objects.ConfigObject): + """Callback argument for confd replies + + @type salt: string + @ivar salt: salt associated with the query + @type type: one of confd.client.CONFD_UPCALL_TYPES + @ivar type: upcall type (server reply, expired request, ...) + @type orig_request: L{objects.ConfdRequest} + @ivar orig_request: original request + @type server_reply: L{objects.ConfdReply} + @ivar server_reply: server reply + @type server_ip: string + @ivar server_ip: answering server ip address + @type server_port: int + @ivar server_port: answering server port + @type extra_args: any + @ivar extra_args: 'args' argument of the SendRequest function + + """ + __slots__ = [ + "salt", + "type", + "orig_request", + "server_reply", + "server_ip", + "server_port", + "extra_args", + ] class ConfdClientRequest(objects.ConfdRequest):