import asyncore
import asynchat
+import collections
import grp
import os
import pwd
self.set_terminator(terminator)
self.ibuffer = []
self.next_incoming_message = 0
+ self.oqueue = collections.deque()
# this method is overriding an asynchat.async_chat method
def collect_incoming_data(self, data):
# TODO: move this method to raise NotImplementedError
# raise NotImplementedError
+ def send_message(self, message):
+ """Send a message to the remote peer. This function is thread-safe.
+
+ @type message: string
+ @param message: message to send, without the terminator
+
+ @warning: If calling this function from a thread different than the one
+ performing the main asyncore loop, remember that you have to wake that one
+ up.
+
+ """
+ # If we just append the message we received to the output queue, this
+ # function can be safely called by multiple threads at the same time, and
+ # we don't need locking, since deques are thread safe.
+ self.oqueue.append(message)
+
+ # this method is overriding an asyncore.dispatcher method
+ def writable(self):
+ # the output queue may become full just after we called writable. This only
+ # works if we know we'll have something else waking us up from the select,
+ # in such case, anyway.
+ return asynchat.async_chat.writable(self) or self.oqueue
+
+ # this method is overriding an asyncore.dispatcher method
+ def handle_write(self):
+ if self.oqueue:
+ data = self.oqueue.popleft()
+ self.push(data + self.terminator)
+ self.initiate_send()
+
def close_log(self):
logging.info("Closing connection from %s",
FormatAddress(self.family, self.peer_address))
self.mainloop.Run()
self.assertEquals(len(self.connections), 1)
+ def testSendMessage(self):
+ self.connect_terminate_count = None
+ self.message_terminate_count = 3
+ client1 = self.getClient()
+ client2 = self.getClient()
+ client1.send("one\3composed\3message\3")
+ self.mainloop.Run()
+ self.assertEquals(self.messages[0], ["one", "composed", "message"])
+ self.assertFalse(self.connections[0].writable())
+ self.assertFalse(self.connections[1].writable())
+ self.connections[0].send_message("r0")
+ self.assert_(self.connections[0].writable())
+ self.assertFalse(self.connections[1].writable())
+ self.connections[0].send_message("r1")
+ self.connections[0].send_message("r2")
+ # We currently have no way to terminate the mainloop on write events, but
+ # let's assume handle_write will be called if writable() is True.
+ while self.connections[0].writable():
+ self.connections[0].handle_write()
+ client1.setblocking(0)
+ client2.setblocking(0)
+ self.assertEquals(client1.recv(4096), "r0\3r1\3r2\3")
+ self.assertRaises(socket.error, client2.recv, 4096)
+
class TestAsyncStreamServerUnixPath(TestAsyncStreamServerTCP):
"""Test daemon.AsyncStreamServer with a Unix path connection"""