def testNoDoubleBind(self):
self.assertRaises(socket.error, self.client.bind, ("127.0.0.1", self.port))
- def _ThreadedClient(self, payload):
- self.client.enqueue_send("127.0.0.1", self.port, payload)
- print "sending %s" % payload
- while self.client.writable():
- self.client.handle_write()
-
def testAsyncClientServer(self):
self.client.enqueue_send("127.0.0.1", self.port, "p1")
self.client.enqueue_send("127.0.0.1", self.port, "p2")
self.assertEquals(self.server.received, ["p1", "p2", "terminate"])
def testSyncClientServer(self):
+ self.client.handle_write()
self.client.enqueue_send("127.0.0.1", self.port, "p1")
self.client.enqueue_send("127.0.0.1", self.port, "p2")
while self.client.writable():