From a4b605aea7d6de9efbb1f19d1047e6f2102f8d58 Mon Sep 17 00:00:00 2001 From: Guido Trotter <ultrotter@google.com> Date: Thu, 13 May 2010 18:32:25 +0100 Subject: [PATCH] daemon.AsyncStreamServer This is a new asyncore server which handles listening stream sockets by calling a non-implemented function for each connection it accepts. It's the stream-oriented cousing of the AsyncUDPSocket. Signed-off-by: Guido Trotter <ultrotter@google.com> Reviewed-by: Michael Hanselmann <hansmi@google.com> --- lib/daemon.py | 69 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/lib/daemon.py b/lib/daemon.py index e386d8c4e..0e6e042b3 100644 --- a/lib/daemon.py +++ b/lib/daemon.py @@ -91,6 +91,75 @@ class GanetiBaseAsyncoreDispatcher(asyncore.dispatcher): return False +def FormatAddress(family, address): + """Format a client's address + + @type family: integer + @param family: socket family (one of socket.AF_*) + @type address: family specific (usually tuple) + @param address: address, as reported by this class + + """ + if family == socket.AF_INET and len(address) == 2: + return "%s:%d" % address + elif family == socket.AF_UNIX and len(address) == 3: + return "pid=%s, uid=%s, gid=%s" % address + else: + return str(address) + + +class AsyncStreamServer(GanetiBaseAsyncoreDispatcher): + """A stream server to use with asyncore. + + Each request is accepted, and then dispatched to a separate asyncore + dispatcher to handle. + + """ + + _REQUEST_QUEUE_SIZE = 5 + + def __init__(self, family, address): + """Constructor for AsyncUnixStreamSocket + + @type family: integer + @param family: socket family (one of socket.AF_*) + @type address: address family dependent + @param address: address to bind the socket to + + """ + GanetiBaseAsyncoreDispatcher.__init__(self) + self.family = family + self.create_socket(self.family, socket.SOCK_STREAM) + self.set_reuse_addr() + self.bind(address) + self.listen(self._REQUEST_QUEUE_SIZE) + + # this method is overriding an asyncore.dispatcher method + def handle_accept(self): + """Accept a new client connection. + + Creates a new instance of the handler class, which will use asyncore to + serve the client. + + """ + accept_result = utils.IgnoreSignals(self.accept) + if accept_result is not None: + connected_socket, client_address = accept_result + if self.family == socket.AF_UNIX: + # override the client address, as for unix sockets nothing meaningful + # is passed in from accept anyway + client_address = utils.GetSocketCredentials(connected_socket) + logging.info("Accepted connection from %s", + FormatAddress(self.family, client_address)) + self.handle_connection(connected_socket, client_address) + + def handle_connection(self, connected_socket, client_address): + """Handle an already accepted connection. + + """ + raise NotImplementedError + + class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher): """An improved asyncore udp socket. -- GitLab