Commit 96e03b0b authored by Guido Trotter's avatar Guido Trotter

Confd client: Change callback model

We move to one callback in total, rather than one per call, and call it
both for server replies and request expiring.
Signed-off-by: default avatarGuido Trotter <ultrotter@google.com>
Reviewed-by: default avatarMichael Hanselmann <hansmi@google.com>
parent 90469357
......@@ -65,23 +65,28 @@ class ConfdClient:
through asyncore or with your own handling.
"""
def __init__(self, hmac_key, peers):
def __init__(self, hmac_key, peers, callback):
"""Constructor for ConfdClient
@type hmac_key: string
@param hmac_key: hmac key to talk to confd
@type peers: list
@param peers: list of peer nodes
@type callback: f(L{ConfdUpcallPayload})
@param callback: function to call when getting answers
"""
if not isinstance(peers, list):
raise errors.ProgrammerError("peers must be a list")
if not callable(callback):
raise errors.ProgrammerError("callback must be callable")
self._peers = peers
self._hmac_key = hmac_key
self._socket = ConfdAsyncUDPClient(self)
self._callbacks = {}
self._expire_callbacks = []
self._callback = callback
self._requests = {}
self._expire_requests = []
self._confd_port = utils.GetDaemonPort(constants.CONFD)
def _PackRequest(self, request, now=None):
......@@ -102,26 +107,30 @@ class ConfdClient:
(answer, salt) = serializer.LoadSignedJson(in_payload, self._hmac_key)
return answer, salt
def _ExpireCallbacks(self):
"""Delete all the expired callbacks.
def ExpireRequests(self):
"""Delete all the expired requests.
"""
now = time.time()
while self._expire_callbacks:
expire_time, rsalt = self._expire_callbacks[0]
while self._expire_requests:
expire_time, rsalt = self._expire_requests[0]
if now >= expire_time:
self._expire_callbacks.pop()
del self._callbacks[rsalt]
self._expire_requests.pop(0)
(request, args) = self._requests[rsalt]
del self._requests[rsalt]
client_reply = ConfdUpcallPayload(salt=rsalt,
type=UPCALL_EXPIRE,
orig_request=request,
extra_args=args)
self._callback(client_reply)
else:
break
def SendRequest(self, request, callback, args=None, coverage=None):
def SendRequest(self, request, args=None, coverage=None):
"""Send a confd request to some MCs
@type request: L{objects.ConfdRequest}
@param request: the request to send
@type callback: f(answer, req_type, req_query, salt, ip, port, args)
@param callback: answer callback
@type args: tuple
@keyword args: additional callback arguments
@type coverage: integer
......@@ -131,9 +140,6 @@ class ConfdClient:
if coverage is None:
coverage = min(len(self._peers), constants.CONFD_DEFAULT_REQ_COVERAGE)
if not callable(callback):
raise errors.ConfdClientError("callback must be callable")
if coverage > len(self._peers):
raise errors.ConfdClientError("Not enough MCs known to provide the"
" desired coverage")
......@@ -141,8 +147,8 @@ class ConfdClient:
if not request.rsalt:
raise errors.ConfdClientError("Missing request rsalt")
self._ExpireCallbacks()
if request.rsalt in self._callbacks:
self.ExpireRequests()
if request.rsalt in self._requests:
raise errors.ConfdClientError("Duplicate request rsalt")
if request.type not in constants.CONFD_REQS:
......@@ -160,10 +166,9 @@ class ConfdClient:
except errors.UdpDataSizeError:
raise errors.ConfdClientError("Request too big")
self._callbacks[request.rsalt] = (callback, request.type,
request.query, args)
self._requests[request.rsalt] = (request, args)
expire_time = now + constants.CONFD_CLIENT_EXPIRE_TIMEOUT
self._expire_callbacks.append((expire_time, request.rsalt))
self._expire_requests.append((expire_time, request.rsalt))
def HandleResponse(self, payload, ip, port):
"""Asynchronous handler for a confd reply
......@@ -178,16 +183,65 @@ class ConfdClient:
return
try:
(callback, type, query, args) = self._callbacks[salt]
(request, args) = self._requests[salt]
except KeyError:
# If the salt is unkown the answer is probably a replay of an old
# expired query. Ignoring it.
pass
else:
callback(answer, type, query, salt, ip, port, args)
return
client_reply = ConfdUpcallPayload(salt=salt,
type=UPCALL_REPLY,
server_reply=answer,
orig_request=request,
server_ip=ip,
server_port=port,
extra_args=args)
self._callback(client_reply)
finally:
self._ExpireCallbacks()
self.ExpireRequests()
# UPCALL_REPLY: server reply upcall
# has all ConfdUpcallPayload fields populated
UPCALL_REPLY = 1
# UPCALL_EXPIRE: internal library request expire
# has only salt, type, orig_request and extra_args
UPCALL_EXPIRE = 2
CONFD_UPCALL_TYPES = frozenset([
UPCALL_REPLY,
UPCALL_EXPIRE,
])
class ConfdUpcallPayload(objects.ConfigObject):
"""Callback argument for confd replies
@type salt: string
@ivar salt: salt associated with the query
@type type: one of confd.client.CONFD_UPCALL_TYPES
@ivar type: upcall type (server reply, expired request, ...)
@type orig_request: L{objects.ConfdRequest}
@ivar orig_request: original request
@type server_reply: L{objects.ConfdReply}
@ivar server_reply: server reply
@type server_ip: string
@ivar server_ip: answering server ip address
@type server_port: int
@ivar server_port: answering server port
@type extra_args: any
@ivar extra_args: 'args' argument of the SendRequest function
"""
__slots__ = [
"salt",
"type",
"orig_request",
"server_reply",
"server_ip",
"server_port",
"extra_args",
]
class ConfdClientRequest(objects.ConfdRequest):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment