Commit 1e063ccd authored by Guido Trotter's avatar Guido Trotter
Browse files

AsyncTerminatedMessageStream: send_message



This function adds the ability for a AsyncTerminatedMessageStream to
have a thread-safe message delivery function.
Signed-off-by: default avatarGuido Trotter <ultrotter@google.com>
Reviewed-by: default avatarMichael Hanselmann <hansmi@google.com>
parent aefbe369
......@@ -24,6 +24,7 @@
import asyncore
import asynchat
import collections
import grp
import os
import pwd
......@@ -199,6 +200,7 @@ class AsyncTerminatedMessageStream(asynchat.async_chat):
self.set_terminator(terminator)
self.ibuffer = []
self.next_incoming_message = 0
self.oqueue = collections.deque()
# this method is overriding an asynchat.async_chat method
def collect_incoming_data(self, data):
......@@ -225,6 +227,36 @@ class AsyncTerminatedMessageStream(asynchat.async_chat):
# TODO: move this method to raise NotImplementedError
# raise NotImplementedError
def send_message(self, message):
"""Send a message to the remote peer. This function is thread-safe.
@type message: string
@param message: message to send, without the terminator
@warning: If calling this function from a thread different than the one
performing the main asyncore loop, remember that you have to wake that one
up.
"""
# If we just append the message we received to the output queue, this
# function can be safely called by multiple threads at the same time, and
# we don't need locking, since deques are thread safe.
self.oqueue.append(message)
# this method is overriding an asyncore.dispatcher method
def writable(self):
# the output queue may become full just after we called writable. This only
# works if we know we'll have something else waking us up from the select,
# in such case, anyway.
return asynchat.async_chat.writable(self) or self.oqueue
# this method is overriding an asyncore.dispatcher method
def handle_write(self):
if self.oqueue:
data = self.oqueue.popleft()
self.push(data + self.terminator)
self.initiate_send()
def close_log(self):
logging.info("Closing connection from %s",
FormatAddress(self.family, self.peer_address))
......
......@@ -470,6 +470,30 @@ class TestAsyncStreamServerTCP(testutils.GanetiTestCase):
self.mainloop.Run()
self.assertEquals(len(self.connections), 1)
def testSendMessage(self):
self.connect_terminate_count = None
self.message_terminate_count = 3
client1 = self.getClient()
client2 = self.getClient()
client1.send("one\3composed\3message\3")
self.mainloop.Run()
self.assertEquals(self.messages[0], ["one", "composed", "message"])
self.assertFalse(self.connections[0].writable())
self.assertFalse(self.connections[1].writable())
self.connections[0].send_message("r0")
self.assert_(self.connections[0].writable())
self.assertFalse(self.connections[1].writable())
self.connections[0].send_message("r1")
self.connections[0].send_message("r2")
# We currently have no way to terminate the mainloop on write events, but
# let's assume handle_write will be called if writable() is True.
while self.connections[0].writable():
self.connections[0].handle_write()
client1.setblocking(0)
client2.setblocking(0)
self.assertEquals(client1.recv(4096), "r0\3r1\3r2\3")
self.assertRaises(socket.error, client2.recv, 4096)
class TestAsyncStreamServerUnixPath(TestAsyncStreamServerTCP):
"""Test daemon.AsyncStreamServer with a Unix path connection"""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment