Correct parameters for producer/consumer construction
authorGeorgios Gousios <gousiosg@gmail.com>
Thu, 8 Dec 2011 15:24:20 +0000 (17:24 +0200)
committerGeorgios Gousios <gousiosg@gmail.com>
Thu, 8 Dec 2011 15:24:20 +0000 (17:24 +0200)
logic/src/main/scala/gr/grnet/aquarium/messaging/AkkaAMQP.scala
logic/src/test/scala/gr/grnet/aquarium/messaging/AkkaAMQPTest.scala

index 7680217..256460d 100644 (file)
@@ -62,19 +62,21 @@ trait AkkaAMQP {
 
   //TODO: Methods to load configuration from {file, other resource}
 
-  def consumer(routekey: String, queue: String, receiver: ActorRef) =
+  def consumer(routekey: String, queue: String, exchange: String, recipient: ActorRef, selfAck: Boolean) =
     AMQP.newConsumer(
       connection = connection,
       consumerParameters = ConsumerParameters(
         routingKey = routekey,
-        deliveryHandler = receiver,
+        exchangeParameters = Some(ExchangeParameters(exchange, Topic, decl)),
+        deliveryHandler = recipient,
         queueName = Some(queue),
-        queueDeclaration = decl
+        queueDeclaration = decl,
+        selfAcknowledging = selfAck
         ))
 
   def producer(exchange: String) = AMQP.newProducer(
       connection = connection,
       producerParameters = ProducerParameters(
         exchangeParameters = Some(ExchangeParameters(exchange, Topic, decl)),
-        channelParameters = Some(ChannelParameters(prefetchSize = 1))))
+        channelParameters = Some(ChannelParameters(prefetchSize = 0))))
 }
\ No newline at end of file
index 31bad59..97dd326 100644 (file)
@@ -39,10 +39,14 @@ import gr.grnet.aquarium.util.RandomEventGenerator
 import org.junit.Test
 import akka.actor.Actor
 import java.util.concurrent.{TimeUnit, CountDownLatch}
-import akka.amqp.{AMQP, Message, Delivery}
+import akka.amqp._
+import akka.config.Supervision.Permanent
+import java.util.concurrent.atomic.AtomicInteger
+import org.junit.Assume._
+import gr.grnet.aquarium.LogicTestsAssumptions
 
 /**
- * 
+ *
  *
  * @author Georgios Gousios <gousiosg@gmail.com>
  */
@@ -50,29 +54,49 @@ import akka.amqp.{AMQP, Message, Delivery}
 class AkkaAMQPTest extends RandomEventGenerator {
 
   @Test
-  def testSendReceive() = {
+  def testSendReceive() : Unit = {
+
+    assumeTrue(LogicTestsAssumptions.EnableRabbitMQTests)
+
     val numMsg = 100
-    val countDown = new CountDownLatch(numMsg)
-    
+    val msgs = new AtomicInteger(0)
+
     val publisher = producer("aquarium")
 
     class Consumer extends Actor {
+
+      self.lifeCycle = Permanent
+
       def receive = {
-        case Delivery(payload, _, _, _, _, _) =>
-          println("Got message: " + payload)
-          countDown.countDown()
-        case _ =>
+        case Delivery(payload, routingKey, deliveryTag, isRedeliver, properties, sender) =>
+          println(this + " Got message: %s (%d)".format(new String(payload), deliveryTag) +
+            (if(isRedeliver) " - redelivered (%d)".format(deliveryTag) else ""))
+          if (msgs.incrementAndGet() == 15) throw new Exception("Messed up")
+          if (msgs.incrementAndGet() == 55) sender ! Reject(deliveryTag, true)
+          sender ! Acknowledge(deliveryTag)
+        case Acknowledged(deliveryTag) => println("Acked: " + deliveryTag)
+        case _ => println("Unknown delivery")
+      }
+
+      override def preRestart(reason: Throwable) {
+        println("Actor restarted: " + reason)
+      }
+
+      override def postRestart(reason: Throwable) {
+        // reinit stable state after restart
       }
     }
 
-    consumer("akka-amqp-test", "foo.*", Actor.actorOf(new Consumer))
+    consumer("foo.#", "akka-amqp-test", "aquarium", Actor.actorOf(new Consumer), false)
+    Thread.sleep(2000)
 
     (1 to numMsg).foreach{
       i =>  publisher ! new Message(i.toString.getBytes(), "foo.bar")
     }
 
-    countDown.await(10, TimeUnit.SECONDS)
-    AMQP.shutdownAll
+    Thread.sleep(5000)
+
+    //AMQP.shutdownAll
     Actor.registry.shutdownAll
   }
 }
\ No newline at end of file