From 1e063ccd7d0791271ed6b0317e61b7bb9bf25877 Mon Sep 17 00:00:00 2001 From: Guido Trotter Date: Fri, 25 Jun 2010 17:43:21 +0200 Subject: [PATCH] AsyncTerminatedMessageStream: send_message This function adds the ability for a AsyncTerminatedMessageStream to have a thread-safe message delivery function. Signed-off-by: Guido Trotter Reviewed-by: Michael Hanselmann --- lib/daemon.py | 32 ++++++++++++++++++++++++++++++++ test/ganeti.daemon_unittest.py | 24 ++++++++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/lib/daemon.py b/lib/daemon.py index 84d4a43..98b9fce 100644 --- a/lib/daemon.py +++ b/lib/daemon.py @@ -24,6 +24,7 @@ import asyncore import asynchat +import collections import grp import os import pwd @@ -199,6 +200,7 @@ class AsyncTerminatedMessageStream(asynchat.async_chat): self.set_terminator(terminator) self.ibuffer = [] self.next_incoming_message = 0 + self.oqueue = collections.deque() # this method is overriding an asynchat.async_chat method def collect_incoming_data(self, data): @@ -225,6 +227,36 @@ class AsyncTerminatedMessageStream(asynchat.async_chat): # TODO: move this method to raise NotImplementedError # raise NotImplementedError + def send_message(self, message): + """Send a message to the remote peer. This function is thread-safe. + + @type message: string + @param message: message to send, without the terminator + + @warning: If calling this function from a thread different than the one + performing the main asyncore loop, remember that you have to wake that one + up. + + """ + # If we just append the message we received to the output queue, this + # function can be safely called by multiple threads at the same time, and + # we don't need locking, since deques are thread safe. + self.oqueue.append(message) + + # this method is overriding an asyncore.dispatcher method + def writable(self): + # the output queue may become full just after we called writable. This only + # works if we know we'll have something else waking us up from the select, + # in such case, anyway. + return asynchat.async_chat.writable(self) or self.oqueue + + # this method is overriding an asyncore.dispatcher method + def handle_write(self): + if self.oqueue: + data = self.oqueue.popleft() + self.push(data + self.terminator) + self.initiate_send() + def close_log(self): logging.info("Closing connection from %s", FormatAddress(self.family, self.peer_address)) diff --git a/test/ganeti.daemon_unittest.py b/test/ganeti.daemon_unittest.py index 472ceb7..1c9160e 100755 --- a/test/ganeti.daemon_unittest.py +++ b/test/ganeti.daemon_unittest.py @@ -470,6 +470,30 @@ class TestAsyncStreamServerTCP(testutils.GanetiTestCase): self.mainloop.Run() self.assertEquals(len(self.connections), 1) + def testSendMessage(self): + self.connect_terminate_count = None + self.message_terminate_count = 3 + client1 = self.getClient() + client2 = self.getClient() + client1.send("one\3composed\3message\3") + self.mainloop.Run() + self.assertEquals(self.messages[0], ["one", "composed", "message"]) + self.assertFalse(self.connections[0].writable()) + self.assertFalse(self.connections[1].writable()) + self.connections[0].send_message("r0") + self.assert_(self.connections[0].writable()) + self.assertFalse(self.connections[1].writable()) + self.connections[0].send_message("r1") + self.connections[0].send_message("r2") + # We currently have no way to terminate the mainloop on write events, but + # let's assume handle_write will be called if writable() is True. + while self.connections[0].writable(): + self.connections[0].handle_write() + client1.setblocking(0) + client2.setblocking(0) + self.assertEquals(client1.recv(4096), "r0\3r1\3r2\3") + self.assertRaises(socket.error, client2.recv, 4096) + class TestAsyncStreamServerUnixPath(TestAsyncStreamServerTCP): """Test daemon.AsyncStreamServer with a Unix path connection""" -- 1.7.10.4