AsyncTerminatedMessageStream: send_message
authorGuido Trotter <ultrotter@google.com>
Fri, 25 Jun 2010 15:43:21 +0000 (17:43 +0200)
committerGuido Trotter <ultrotter@google.com>
Tue, 29 Jun 2010 11:30:30 +0000 (12:30 +0100)
This function adds the ability for a AsyncTerminatedMessageStream to
have a thread-safe message delivery function.

Signed-off-by: Guido Trotter <ultrotter@google.com>
Reviewed-by: Michael Hanselmann <hansmi@google.com>

lib/daemon.py
test/ganeti.daemon_unittest.py

index 84d4a43..98b9fce 100644 (file)
@@ -24,6 +24,7 @@
 
 import asyncore
 import asynchat
+import collections
 import grp
 import os
 import pwd
@@ -199,6 +200,7 @@ class AsyncTerminatedMessageStream(asynchat.async_chat):
     self.set_terminator(terminator)
     self.ibuffer = []
     self.next_incoming_message = 0
+    self.oqueue = collections.deque()
 
   # this method is overriding an asynchat.async_chat method
   def collect_incoming_data(self, data):
@@ -225,6 +227,36 @@ class AsyncTerminatedMessageStream(asynchat.async_chat):
     # TODO: move this method to raise NotImplementedError
     # raise NotImplementedError
 
+  def send_message(self, message):
+    """Send a message to the remote peer. This function is thread-safe.
+
+    @type message: string
+    @param message: message to send, without the terminator
+
+    @warning: If calling this function from a thread different than the one
+    performing the main asyncore loop, remember that you have to wake that one
+    up.
+
+    """
+    # If we just append the message we received to the output queue, this
+    # function can be safely called by multiple threads at the same time, and
+    # we don't need locking, since deques are thread safe.
+    self.oqueue.append(message)
+
+  # this method is overriding an asyncore.dispatcher method
+  def writable(self):
+    # the output queue may become full just after we called writable. This only
+    # works if we know we'll have something else waking us up from the select,
+    # in such case, anyway.
+    return asynchat.async_chat.writable(self) or self.oqueue
+
+  # this method is overriding an asyncore.dispatcher method
+  def handle_write(self):
+    if self.oqueue:
+      data = self.oqueue.popleft()
+      self.push(data + self.terminator)
+    self.initiate_send()
+
   def close_log(self):
     logging.info("Closing connection from %s",
                  FormatAddress(self.family, self.peer_address))
index 472ceb7..1c9160e 100755 (executable)
@@ -470,6 +470,30 @@ class TestAsyncStreamServerTCP(testutils.GanetiTestCase):
     self.mainloop.Run()
     self.assertEquals(len(self.connections), 1)
 
+  def testSendMessage(self):
+    self.connect_terminate_count = None
+    self.message_terminate_count = 3
+    client1 = self.getClient()
+    client2 = self.getClient()
+    client1.send("one\3composed\3message\3")
+    self.mainloop.Run()
+    self.assertEquals(self.messages[0], ["one", "composed", "message"])
+    self.assertFalse(self.connections[0].writable())
+    self.assertFalse(self.connections[1].writable())
+    self.connections[0].send_message("r0")
+    self.assert_(self.connections[0].writable())
+    self.assertFalse(self.connections[1].writable())
+    self.connections[0].send_message("r1")
+    self.connections[0].send_message("r2")
+    # We currently have no way to terminate the mainloop on write events, but
+    # let's assume handle_write will be called if writable() is True.
+    while self.connections[0].writable():
+      self.connections[0].handle_write()
+    client1.setblocking(0)
+    client2.setblocking(0)
+    self.assertEquals(client1.recv(4096), "r0\3r1\3r2\3")
+    self.assertRaises(socket.error, client2.recv, 4096)
+
 
 class TestAsyncStreamServerUnixPath(TestAsyncStreamServerTCP):
   """Test daemon.AsyncStreamServer with a Unix path connection"""