From b66ab6291aa52a4dc5a25ba1d3c6a415d7e2e198 Mon Sep 17 00:00:00 2001 From: Guido Trotter <ultrotter@google.com> Date: Mon, 24 May 2010 17:24:20 +0100 Subject: [PATCH] daemon.AsyncTerminatedMessageStream This is the counterpart of the AsyncStreamServer can be used to handle connected sockets returned from connected clients if the protocol is a terminator separated message stream. Nothing in this class is server specific though: it can be used as a client as well, if the client is implemented inside an asyncore daemon. Signed-off-by: Guido Trotter <ultrotter@google.com> Reviewed-by: Michael Hanselmann <hansmi@google.com> --- lib/daemon.py | 77 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/lib/daemon.py b/lib/daemon.py index 0e6e042b3..567de3433 100644 --- a/lib/daemon.py +++ b/lib/daemon.py @@ -23,6 +23,7 @@ import asyncore +import asynchat import os import signal import logging @@ -160,6 +161,82 @@ class AsyncStreamServer(GanetiBaseAsyncoreDispatcher): raise NotImplementedError +class AsyncTerminatedMessageStream(asynchat.async_chat): + """A terminator separated message stream asyncore module. + + Handles a stream connection receiving messages terminated by a defined + separator. For each complete message handle_message is called. + + """ + def __init__(self, connected_socket, peer_address, terminator, family): + """AsyncTerminatedMessageStream constructor. + + @type connected_socket: socket.socket + @param connected_socket: connected stream socket to receive messages from + @param peer_address: family-specific peer address + @type terminator: string + @param terminator: terminator separating messages in the stream + @type family: integer + @param family: socket family + + """ + # python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by + # using a positional argument rather than a keyword one. + asynchat.async_chat.__init__(self, connected_socket) + self.connected_socket = connected_socket + # on python 2.4 there is no "family" attribute for the socket class + # FIXME: when we move to python 2.5 or above remove the family parameter + #self.family = self.connected_socket.family + self.family = family + self.peer_address = peer_address + self.terminator = terminator + self.set_terminator(terminator) + self.ibuffer = [] + self.next_incoming_message = 0 + + # this method is overriding an asynchat.async_chat method + def collect_incoming_data(self, data): + self.ibuffer.append(data) + + # this method is overriding an asynchat.async_chat method + def found_terminator(self): + message = "".join(self.ibuffer) + self.ibuffer = [] + message_id = self.next_incoming_message + self.next_incoming_message += 1 + self.handle_message(message, message_id) + + def handle_message(self, message, message_id): + """Handle a terminated message. + + @type message: string + @param message: message to handle + @type message_id: integer + @param message_id: stream's message sequence number + + """ + pass + # TODO: move this method to raise NotImplementedError + # raise NotImplementedError + + def close_log(self): + logging.info("Closing connection from %s", + FormatAddress(self.family, self.peer_address)) + self.close() + + # this method is overriding an asyncore.dispatcher method + def handle_expt(self): + self.close_log() + + # this method is overriding an asyncore.dispatcher method + def handle_error(self): + """Log an error in handling any request, and proceed. + + """ + logging.exception("Error while handling asyncore request") + self.close_log() + + class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher): """An improved asyncore udp socket. -- GitLab