This is the counterpart of the AsyncStreamServer can be used to handle
connected sockets returned from connected clients if the protocol is a
terminator separated message stream. Nothing in this class is server
specific though: it can be used as a client as well, if the client is
implemented inside an asyncore daemon.
Signed-off-by: default avatarGuido Trotter <>
Reviewed-by: default avatarMichael Hanselmann <>
......@@ -23,6 +23,7 @@
import asyncore
import asynchat
import os
import signal
import logging
......@@ -160,6 +161,82 @@ class AsyncStreamServer(GanetiBaseAsyncoreDispatcher):
raise NotImplementedError
class AsyncTerminatedMessageStream(asynchat.async_chat):
"""A terminator separated message stream asyncore module.
Handles a stream connection receiving messages terminated by a defined
separator. For each complete message handle_message is called.
def __init__(self, connected_socket, peer_address, terminator, family):
"""AsyncTerminatedMessageStream constructor.
@type connected_socket: socket.socket
@param connected_socket: connected stream socket to receive messages from
@param peer_address: family-specific peer address
@type terminator: string
@param terminator: terminator separating messages in the stream
@type family: integer
@param family: socket family
# python 2.4/2.5 uses conn=... while 2.6 has sock=... we have to cheat by
# using a positional argument rather than a keyword one.
asynchat.async_chat.__init__(self, connected_socket)
self.connected_socket = connected_socket
# on python 2.4 there is no "family" attribute for the socket class
# FIXME: when we move to python 2.5 or above remove the family parameter = = family
self.peer_address = peer_address
self.terminator = terminator
self.ibuffer = []
self.next_incoming_message = 0
# this method is overriding an asynchat.async_chat method
def collect_incoming_data(self, data):
# this method is overriding an asynchat.async_chat method
def found_terminator(self):
message = "".join(self.ibuffer)
self.ibuffer = []
message_id = self.next_incoming_message
self.next_incoming_message += 1
self.handle_message(message, message_id)
def handle_message(self, message, message_id):
"""Handle a terminated message.
@type message: string
@param message: message to handle
@type message_id: integer
@param message_id: stream's message sequence number
# TODO: move this method to raise NotImplementedError
# raise NotImplementedError
def close_log(self):"Closing connection from %s",
FormatAddress(, self.peer_address))
# this method is overriding an asyncore.dispatcher method
def handle_expt(self):
# this method is overriding an asyncore.dispatcher method
def handle_error(self):
"""Log an error in handling any request, and proceed.
logging.exception("Error while handling asyncore request")
class AsyncUDPSocket(GanetiBaseAsyncoreDispatcher):
"""An improved asyncore udp socket.
