From 657e296ea6e3517af954ecd244a02ca45b8ecb74 Mon Sep 17 00:00:00 2001 From: Georgios Gousios Date: Thu, 8 Dec 2011 17:24:20 +0200 Subject: [PATCH] Correct parameters for producer/consumer construction --- .../gr/grnet/aquarium/messaging/AkkaAMQP.scala | 10 ++-- .../gr/grnet/aquarium/messaging/AkkaAMQPTest.scala | 48 +++++++++++++++----- 2 files changed, 42 insertions(+), 16 deletions(-) diff --git a/logic/src/main/scala/gr/grnet/aquarium/messaging/AkkaAMQP.scala b/logic/src/main/scala/gr/grnet/aquarium/messaging/AkkaAMQP.scala index 7680217..256460d 100644 --- a/logic/src/main/scala/gr/grnet/aquarium/messaging/AkkaAMQP.scala +++ b/logic/src/main/scala/gr/grnet/aquarium/messaging/AkkaAMQP.scala @@ -62,19 +62,21 @@ trait AkkaAMQP { //TODO: Methods to load configuration from {file, other resource} - def consumer(routekey: String, queue: String, receiver: ActorRef) = + def consumer(routekey: String, queue: String, exchange: String, recipient: ActorRef, selfAck: Boolean) = AMQP.newConsumer( connection = connection, consumerParameters = ConsumerParameters( routingKey = routekey, - deliveryHandler = receiver, + exchangeParameters = Some(ExchangeParameters(exchange, Topic, decl)), + deliveryHandler = recipient, queueName = Some(queue), - queueDeclaration = decl + queueDeclaration = decl, + selfAcknowledging = selfAck )) def producer(exchange: String) = AMQP.newProducer( connection = connection, producerParameters = ProducerParameters( exchangeParameters = Some(ExchangeParameters(exchange, Topic, decl)), - channelParameters = Some(ChannelParameters(prefetchSize = 1)))) + channelParameters = Some(ChannelParameters(prefetchSize = 0)))) } \ No newline at end of file diff --git a/logic/src/test/scala/gr/grnet/aquarium/messaging/AkkaAMQPTest.scala b/logic/src/test/scala/gr/grnet/aquarium/messaging/AkkaAMQPTest.scala index 31bad59..97dd326 100644 --- a/logic/src/test/scala/gr/grnet/aquarium/messaging/AkkaAMQPTest.scala +++ b/logic/src/test/scala/gr/grnet/aquarium/messaging/AkkaAMQPTest.scala @@ -39,10 +39,14 @@ import gr.grnet.aquarium.util.RandomEventGenerator import org.junit.Test import akka.actor.Actor import java.util.concurrent.{TimeUnit, CountDownLatch} -import akka.amqp.{AMQP, Message, Delivery} +import akka.amqp._ +import akka.config.Supervision.Permanent +import java.util.concurrent.atomic.AtomicInteger +import org.junit.Assume._ +import gr.grnet.aquarium.LogicTestsAssumptions /** - * + * * * @author Georgios Gousios */ @@ -50,29 +54,49 @@ import akka.amqp.{AMQP, Message, Delivery} class AkkaAMQPTest extends RandomEventGenerator { @Test - def testSendReceive() = { + def testSendReceive() : Unit = { + + assumeTrue(LogicTestsAssumptions.EnableRabbitMQTests) + val numMsg = 100 - val countDown = new CountDownLatch(numMsg) - + val msgs = new AtomicInteger(0) + val publisher = producer("aquarium") class Consumer extends Actor { + + self.lifeCycle = Permanent + def receive = { - case Delivery(payload, _, _, _, _, _) => - println("Got message: " + payload) - countDown.countDown() - case _ => + case Delivery(payload, routingKey, deliveryTag, isRedeliver, properties, sender) => + println(this + " Got message: %s (%d)".format(new String(payload), deliveryTag) + + (if(isRedeliver) " - redelivered (%d)".format(deliveryTag) else "")) + if (msgs.incrementAndGet() == 15) throw new Exception("Messed up") + if (msgs.incrementAndGet() == 55) sender ! Reject(deliveryTag, true) + sender ! Acknowledge(deliveryTag) + case Acknowledged(deliveryTag) => println("Acked: " + deliveryTag) + case _ => println("Unknown delivery") + } + + override def preRestart(reason: Throwable) { + println("Actor restarted: " + reason) + } + + override def postRestart(reason: Throwable) { + // reinit stable state after restart } } - consumer("akka-amqp-test", "foo.*", Actor.actorOf(new Consumer)) + consumer("foo.#", "akka-amqp-test", "aquarium", Actor.actorOf(new Consumer), false) + Thread.sleep(2000) (1 to numMsg).foreach{ i => publisher ! new Message(i.toString.getBytes(), "foo.bar") } - countDown.await(10, TimeUnit.SECONDS) - AMQP.shutdownAll + Thread.sleep(5000) + + //AMQP.shutdownAll Actor.registry.shutdownAll } } \ No newline at end of file -- 1.7.10.4