From da8194d5e6edce02a645aba62ebef019b3d4c716 Mon Sep 17 00:00:00 2001 From: Georgios Gousios Date: Wed, 7 Dec 2011 20:03:03 +0200 Subject: [PATCH] Messaging implementation based on akka-amqp --- logic/pom.xml | 9 ++- .../gr/grnet/aquarium/messaging/AMQPProducer.scala | 2 +- .../gr/grnet/aquarium/messaging/AkkaAMQP.scala | 80 ++++++++++++++++++++ .../messaging/rabbitmq/v091/RabbitMQProducer.scala | 4 +- logic/src/test/resources/akka.conf | 2 +- .../gr/grnet/aquarium/messaging/AkkaAMQPTest.scala | 78 +++++++++++++++++++ .../grnet/aquarium/util/RandomEventGenerator.scala | 36 ++++++--- 7 files changed, 196 insertions(+), 15 deletions(-) create mode 100644 logic/src/main/scala/gr/grnet/aquarium/messaging/AkkaAMQP.scala create mode 100644 logic/src/test/scala/gr/grnet/aquarium/messaging/AkkaAMQPTest.scala diff --git a/logic/pom.xml b/logic/pom.xml index 18f2603..eeb4a8f 100644 --- a/logic/pom.xml +++ b/logic/pom.xml @@ -121,9 +121,8 @@ com.rabbitmq amqp-client - 2.7.0 + 2.5.0 - com.ckkloverdos streamresource_2.9.1 @@ -164,6 +163,12 @@ se.scalablesolutions.akka + akka-amqp + 1.3-RC2 + + + + se.scalablesolutions.akka akka-camel 1.3-RC2 diff --git a/logic/src/main/scala/gr/grnet/aquarium/messaging/AMQPProducer.scala b/logic/src/main/scala/gr/grnet/aquarium/messaging/AMQPProducer.scala index 5dbf42c..5c4568e 100644 --- a/logic/src/main/scala/gr/grnet/aquarium/messaging/AMQPProducer.scala +++ b/logic/src/main/scala/gr/grnet/aquarium/messaging/AMQPProducer.scala @@ -44,5 +44,5 @@ import parallel.Future trait AMQPProducer { def name: String def publishString(message: String, headers: Map[String, String] = Map()): Unit - def publishStringWithConfirm(message: String, headers: Map[String, String] = Map()): Boolean + //def publishStringWithConfirm(message: String, headers: Map[String, String] = Map()): Boolean } \ No newline at end of file diff --git a/logic/src/main/scala/gr/grnet/aquarium/messaging/AkkaAMQP.scala b/logic/src/main/scala/gr/grnet/aquarium/messaging/AkkaAMQP.scala new file mode 100644 index 0000000..7680217 --- /dev/null +++ b/logic/src/main/scala/gr/grnet/aquarium/messaging/AkkaAMQP.scala @@ -0,0 +1,80 @@ +/* + * Copyright 2011 GRNET S.A. All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS + * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and + * documentation are those of the authors and should not be + * interpreted as representing official policies, either expressed + * or implied, of GRNET S.A. + */ + +package gr.grnet.aquarium.messaging + +import com.rabbitmq.client.Address +import akka.actor._ +import akka.amqp.{Topic, AMQP} +import akka.amqp.AMQP._ + +/** + * Functionality for working with queues. + * + * @author Georgios Gousios + */ + +trait AkkaAMQP { + + private lazy val connection = AMQP.newConnection( + ConnectionParameters( + Array(new Address("1.2.3.4",5672)), + "foob", + "bar", + "/", + 5000, + None)) + + //Queues and exchnages are by default durable and persistent + val decl = ActiveDeclaration(durable = true, autoDelete = false) + + //TODO: Methods to load configuration from {file, other resource} + + def consumer(routekey: String, queue: String, receiver: ActorRef) = + AMQP.newConsumer( + connection = connection, + consumerParameters = ConsumerParameters( + routingKey = routekey, + deliveryHandler = receiver, + queueName = Some(queue), + queueDeclaration = decl + )) + + def producer(exchange: String) = AMQP.newProducer( + connection = connection, + producerParameters = ProducerParameters( + exchangeParameters = Some(ExchangeParameters(exchange, Topic, decl)), + channelParameters = Some(ChannelParameters(prefetchSize = 1)))) +} \ No newline at end of file diff --git a/logic/src/main/scala/gr/grnet/aquarium/messaging/rabbitmq/v091/RabbitMQProducer.scala b/logic/src/main/scala/gr/grnet/aquarium/messaging/rabbitmq/v091/RabbitMQProducer.scala index 4b3c475..082c22d 100644 --- a/logic/src/main/scala/gr/grnet/aquarium/messaging/rabbitmq/v091/RabbitMQProducer.scala +++ b/logic/src/main/scala/gr/grnet/aquarium/messaging/rabbitmq/v091/RabbitMQProducer.scala @@ -79,13 +79,13 @@ class RabbitMQProducer(private[v091] val owner: RabbitMQConnection, val confMode _publish(message, headers){_ =>}{_ => ()} {_ => logger.error("publish() from producer %s".format())} } - def publishStringWithConfirm(message: String, headers: Map[String, String] = Map()) = { +/* def publishStringWithConfirm(message: String, headers: Map[String, String] = Map()) = { _publish(message, headers) { _.confirmSelect() }{ logger.debug("Waiting for confirmation") _.waitForConfirms() } {_ => logger.error("publishWithConfirm() from producer %s".format())} } - +*/ override def toString = { "RabbitMQProducer(%s/%s/%s)".format(owner.owner.name, owner.name, name) } diff --git a/logic/src/test/resources/akka.conf b/logic/src/test/resources/akka.conf index cd0882f..04ed09f 100644 --- a/logic/src/test/resources/akka.conf +++ b/logic/src/test/resources/akka.conf @@ -8,7 +8,7 @@ akka { version = "1.3-RC2" # Akka version, checked against the runtime version of Akka. - enabled-modules = ["remote", "http"] # Comma separated list of the enabled modules. Options: ["remote", "camel", "http"] + enabled-modules = ["remote", "http", "amqp"] # Comma separated list of the enabled modules. Options: ["remote", "camel", "http"] time-unit = "seconds" # Time unit for all timeout properties throughout the config diff --git a/logic/src/test/scala/gr/grnet/aquarium/messaging/AkkaAMQPTest.scala b/logic/src/test/scala/gr/grnet/aquarium/messaging/AkkaAMQPTest.scala new file mode 100644 index 0000000..31bad59 --- /dev/null +++ b/logic/src/test/scala/gr/grnet/aquarium/messaging/AkkaAMQPTest.scala @@ -0,0 +1,78 @@ +/* + * Copyright 2011 GRNET S.A. All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS + * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and + * documentation are those of the authors and should not be + * interpreted as representing official policies, either expressed + * or implied, of GRNET S.A. + */ + +package gr.grnet.aquarium.messaging + +import gr.grnet.aquarium.util.RandomEventGenerator +import org.junit.Test +import akka.actor.Actor +import java.util.concurrent.{TimeUnit, CountDownLatch} +import akka.amqp.{AMQP, Message, Delivery} + +/** + * + * + * @author Georgios Gousios + */ + +class AkkaAMQPTest extends RandomEventGenerator { + + @Test + def testSendReceive() = { + val numMsg = 100 + val countDown = new CountDownLatch(numMsg) + + val publisher = producer("aquarium") + + class Consumer extends Actor { + def receive = { + case Delivery(payload, _, _, _, _, _) => + println("Got message: " + payload) + countDown.countDown() + case _ => + } + } + + consumer("akka-amqp-test", "foo.*", Actor.actorOf(new Consumer)) + + (1 to numMsg).foreach{ + i => publisher ! new Message(i.toString.getBytes(), "foo.bar") + } + + countDown.await(10, TimeUnit.SECONDS) + AMQP.shutdownAll + Actor.registry.shutdownAll + } +} \ No newline at end of file diff --git a/logic/src/test/scala/gr/grnet/aquarium/util/RandomEventGenerator.scala b/logic/src/test/scala/gr/grnet/aquarium/util/RandomEventGenerator.scala index 4123c17..865b62a 100644 --- a/logic/src/test/scala/gr/grnet/aquarium/util/RandomEventGenerator.scala +++ b/logic/src/test/scala/gr/grnet/aquarium/util/RandomEventGenerator.scala @@ -35,42 +35,60 @@ package gr.grnet.aquarium.util * or implied, of GRNET S.A. */ +import akka.amqp._ import java.util.Date import gr.grnet.aquarium.logic.events.ResourceEvent +import gr.grnet.aquarium.messaging.AkkaAMQP +import util.Random /** * Generates random resource events to use as input for testing and - * injects them to the specified queue. + * injects them to the specified exchange. * * @author Georgios Gousios */ -trait RandomEventGenerator { +trait RandomEventGenerator extends AkkaAMQP { val userIds = 1 to 100 val clientIds = 1 to 4 + val vmIds = 1 to 4000 val resources = List("bandwidthup", "bandwidthdown", "vmtime", "diskspace") - val dateFrom = new Date(1293840000000L) //1/1/2011 0:00:00 GMT - val dateTo = new Date(1325376000000L) //1/1/2012 0:00:00 GMT + val tsFrom = 1293840000000L //1/1/2011 0:00:00 GMT + val tsTo = 1325376000000L //1/1/2012 0:00:00 GMT val eventVersion = 1 to 4 + private val seed = 0xdeadbeef + private lazy val rnd = new Random(seed) + /** - * Get a next random message + * Get the next random message */ def nextResourceEvent() : ResourceEvent = { + val res = rnd.shuffle(resources).head + + val extra = res match { + case "vmtime" => Map("vmid" -> rnd.nextInt(vmIds.max).toString) + case _ => Map[String, String]() + } + val ts = tsFrom + (scala.math.random * ((tsTo - tsFrom) + 1)).asInstanceOf[Long] - ResourceEvent(0,0,"",0L,0,Map()) + ResourceEvent( + rnd.nextInt(userIds.max), + rnd.nextInt(clientIds.max), + res,ts,1,extra) } /** - * Generate messages and add them to the queue + * Generate resource events and publish them to the queue */ - def genAdd(num: Int) = { + def genPublish(num: Int) = { assert(num > 0) + val publisher = producer("aquarium") (1 to num).foreach { - n => + n => publisher ! Message(nextResourceEvent.toBytes, "test.msg") } } } \ No newline at end of file -- 1.7.10.4