//TODO: Methods to load configuration from {file, other resource}
- def consumer(routekey: String, queue: String, receiver: ActorRef) =
+ def consumer(routekey: String, queue: String, exchange: String, recipient: ActorRef, selfAck: Boolean) =
AMQP.newConsumer(
connection = connection,
consumerParameters = ConsumerParameters(
routingKey = routekey,
- deliveryHandler = receiver,
+ exchangeParameters = Some(ExchangeParameters(exchange, Topic, decl)),
+ deliveryHandler = recipient,
queueName = Some(queue),
- queueDeclaration = decl
+ queueDeclaration = decl,
+ selfAcknowledging = selfAck
))
def producer(exchange: String) = AMQP.newProducer(
connection = connection,
producerParameters = ProducerParameters(
exchangeParameters = Some(ExchangeParameters(exchange, Topic, decl)),
- channelParameters = Some(ChannelParameters(prefetchSize = 1))))
+ channelParameters = Some(ChannelParameters(prefetchSize = 0))))
}
\ No newline at end of file
import org.junit.Test
import akka.actor.Actor
import java.util.concurrent.{TimeUnit, CountDownLatch}
-import akka.amqp.{AMQP, Message, Delivery}
+import akka.amqp._
+import akka.config.Supervision.Permanent
+import java.util.concurrent.atomic.AtomicInteger
+import org.junit.Assume._
+import gr.grnet.aquarium.LogicTestsAssumptions
/**
- *
+ *
*
* @author Georgios Gousios <gousiosg@gmail.com>
*/
class AkkaAMQPTest extends RandomEventGenerator {
@Test
- def testSendReceive() = {
+ def testSendReceive() : Unit = {
+
+ assumeTrue(LogicTestsAssumptions.EnableRabbitMQTests)
+
val numMsg = 100
- val countDown = new CountDownLatch(numMsg)
-
+ val msgs = new AtomicInteger(0)
+
val publisher = producer("aquarium")
class Consumer extends Actor {
+
+ self.lifeCycle = Permanent
+
def receive = {
- case Delivery(payload, _, _, _, _, _) =>
- println("Got message: " + payload)
- countDown.countDown()
- case _ =>
+ case Delivery(payload, routingKey, deliveryTag, isRedeliver, properties, sender) =>
+ println(this + " Got message: %s (%d)".format(new String(payload), deliveryTag) +
+ (if(isRedeliver) " - redelivered (%d)".format(deliveryTag) else ""))
+ if (msgs.incrementAndGet() == 15) throw new Exception("Messed up")
+ if (msgs.incrementAndGet() == 55) sender ! Reject(deliveryTag, true)
+ sender ! Acknowledge(deliveryTag)
+ case Acknowledged(deliveryTag) => println("Acked: " + deliveryTag)
+ case _ => println("Unknown delivery")
+ }
+
+ override def preRestart(reason: Throwable) {
+ println("Actor restarted: " + reason)
+ }
+
+ override def postRestart(reason: Throwable) {
+ // reinit stable state after restart
}
}
- consumer("akka-amqp-test", "foo.*", Actor.actorOf(new Consumer))
+ consumer("foo.#", "akka-amqp-test", "aquarium", Actor.actorOf(new Consumer), false)
+ Thread.sleep(2000)
(1 to numMsg).foreach{
i => publisher ! new Message(i.toString.getBytes(), "foo.bar")
}
- countDown.await(10, TimeUnit.SECONDS)
- AMQP.shutdownAll
+ Thread.sleep(5000)
+
+ //AMQP.shutdownAll
Actor.registry.shutdownAll
}
}
\ No newline at end of file