Messaging implementation based on akka-amqp
authorGeorgios Gousios <gousiosg@gmail.com>
Wed, 7 Dec 2011 18:03:03 +0000 (20:03 +0200)
committerGeorgios Gousios <gousiosg@gmail.com>
Wed, 7 Dec 2011 18:03:03 +0000 (20:03 +0200)
logic/pom.xml
logic/src/main/scala/gr/grnet/aquarium/messaging/AMQPProducer.scala
logic/src/main/scala/gr/grnet/aquarium/messaging/AkkaAMQP.scala [new file with mode: 0644]
logic/src/main/scala/gr/grnet/aquarium/messaging/rabbitmq/v091/RabbitMQProducer.scala
logic/src/test/resources/akka.conf
logic/src/test/scala/gr/grnet/aquarium/messaging/AkkaAMQPTest.scala [new file with mode: 0644]
logic/src/test/scala/gr/grnet/aquarium/util/RandomEventGenerator.scala

index 18f2603..eeb4a8f 100644 (file)
     <dependency>
       <groupId>com.rabbitmq</groupId>
       <artifactId>amqp-client</artifactId>
-      <version>2.7.0</version>
+      <version>2.5.0</version>
     </dependency>
-
     <dependency>
       <groupId>com.ckkloverdos</groupId>
       <artifactId>streamresource_2.9.1</artifactId>
 
     <dependency>
       <groupId>se.scalablesolutions.akka</groupId>
+      <artifactId>akka-amqp</artifactId>
+      <version>1.3-RC2</version>
+    </dependency>
+
+    <dependency>
+      <groupId>se.scalablesolutions.akka</groupId>
       <artifactId>akka-camel</artifactId>
       <version>1.3-RC2</version>
     </dependency>
index 5dbf42c..5c4568e 100644 (file)
@@ -44,5 +44,5 @@ import parallel.Future
 trait AMQPProducer {
   def name: String
   def publishString(message: String, headers: Map[String,  String] = Map()): Unit
-  def publishStringWithConfirm(message: String, headers: Map[String,  String] = Map()): Boolean
+  //def publishStringWithConfirm(message: String, headers: Map[String,  String] = Map()): Boolean
 }
\ No newline at end of file
diff --git a/logic/src/main/scala/gr/grnet/aquarium/messaging/AkkaAMQP.scala b/logic/src/main/scala/gr/grnet/aquarium/messaging/AkkaAMQP.scala
new file mode 100644 (file)
index 0000000..7680217
--- /dev/null
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2011 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.messaging
+
+import com.rabbitmq.client.Address
+import akka.actor._
+import akka.amqp.{Topic, AMQP}
+import akka.amqp.AMQP._
+
+/**
+ * Functionality for working with queues.
+ *
+ * @author Georgios Gousios <gousiosg@gmail.com>
+ */
+
+trait AkkaAMQP {
+
+  private lazy val connection = AMQP.newConnection(
+    ConnectionParameters(
+      Array(new Address("1.2.3.4",5672)),
+      "foob",
+      "bar",
+      "/",
+      5000,
+      None))
+
+  //Queues and exchnages are by default durable and persistent
+  val decl = ActiveDeclaration(durable = true, autoDelete = false)
+
+  //TODO: Methods to load configuration from {file, other resource}
+
+  def consumer(routekey: String, queue: String, receiver: ActorRef) =
+    AMQP.newConsumer(
+      connection = connection,
+      consumerParameters = ConsumerParameters(
+        routingKey = routekey,
+        deliveryHandler = receiver,
+        queueName = Some(queue),
+        queueDeclaration = decl
+        ))
+
+  def producer(exchange: String) = AMQP.newProducer(
+      connection = connection,
+      producerParameters = ProducerParameters(
+        exchangeParameters = Some(ExchangeParameters(exchange, Topic, decl)),
+        channelParameters = Some(ChannelParameters(prefetchSize = 1))))
+}
\ No newline at end of file
index 4b3c475..082c22d 100644 (file)
@@ -79,13 +79,13 @@ class RabbitMQProducer(private[v091] val owner: RabbitMQConnection, val confMode
     _publish(message, headers){_ =>}{_ => ()} {_ => logger.error("publish() from producer %s".format())}
   }
 
-  def publishStringWithConfirm(message: String, headers: Map[String, String] = Map()) = {
+/*  def publishStringWithConfirm(message: String, headers: Map[String, String] = Map()) = {
     _publish(message, headers) {
       _.confirmSelect() }{
       logger.debug("Waiting for confirmation")
       _.waitForConfirms() } {_ => logger.error("publishWithConfirm() from producer %s".format())}
   }
-
+*/
   override def toString = {
     "RabbitMQProducer(%s/%s/%s)".format(owner.owner.name, owner.name, name)
   }
index cd0882f..04ed09f 100644 (file)
@@ -8,7 +8,7 @@
 akka {
   version = "1.3-RC2"   # Akka version, checked against the runtime version of Akka.
 
-  enabled-modules = ["remote", "http"]       # Comma separated list of the enabled modules. Options: ["remote", "camel", "http"]
+  enabled-modules = ["remote", "http", "amqp"]       # Comma separated list of the enabled modules. Options: ["remote", "camel", "http"]
 
   time-unit = "seconds"      # Time unit for all timeout properties throughout the config
 
diff --git a/logic/src/test/scala/gr/grnet/aquarium/messaging/AkkaAMQPTest.scala b/logic/src/test/scala/gr/grnet/aquarium/messaging/AkkaAMQPTest.scala
new file mode 100644 (file)
index 0000000..31bad59
--- /dev/null
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2011 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.messaging
+
+import gr.grnet.aquarium.util.RandomEventGenerator
+import org.junit.Test
+import akka.actor.Actor
+import java.util.concurrent.{TimeUnit, CountDownLatch}
+import akka.amqp.{AMQP, Message, Delivery}
+
+/**
+ * 
+ *
+ * @author Georgios Gousios <gousiosg@gmail.com>
+ */
+
+class AkkaAMQPTest extends RandomEventGenerator {
+
+  @Test
+  def testSendReceive() = {
+    val numMsg = 100
+    val countDown = new CountDownLatch(numMsg)
+    
+    val publisher = producer("aquarium")
+
+    class Consumer extends Actor {
+      def receive = {
+        case Delivery(payload, _, _, _, _, _) =>
+          println("Got message: " + payload)
+          countDown.countDown()
+        case _ =>
+      }
+    }
+
+    consumer("akka-amqp-test", "foo.*", Actor.actorOf(new Consumer))
+
+    (1 to numMsg).foreach{
+      i =>  publisher ! new Message(i.toString.getBytes(), "foo.bar")
+    }
+
+    countDown.await(10, TimeUnit.SECONDS)
+    AMQP.shutdownAll
+    Actor.registry.shutdownAll
+  }
+}
\ No newline at end of file
index 4123c17..865b62a 100644 (file)
@@ -35,42 +35,60 @@ package gr.grnet.aquarium.util
  * or implied, of GRNET S.A.
  */
 
+import akka.amqp._
 import java.util.Date
 import gr.grnet.aquarium.logic.events.ResourceEvent
+import gr.grnet.aquarium.messaging.AkkaAMQP
+import util.Random
 
 /**
  *  Generates random resource events to use as input for testing and
- *  injects them to the specified queue.
+ *  injects them to the specified exchange.
  *
  * @author Georgios Gousios <gousiosg@gmail.com>
  */
-trait RandomEventGenerator {
+trait RandomEventGenerator extends AkkaAMQP {
 
   val userIds = 1 to 100
   val clientIds = 1 to 4
+  val vmIds = 1 to 4000
   val resources = List("bandwidthup", "bandwidthdown", "vmtime", "diskspace")
-  val dateFrom = new Date(1293840000000L) //1/1/2011 0:00:00 GMT
-  val dateTo = new Date(1325376000000L) //1/1/2012 0:00:00 GMT
+  val tsFrom = 1293840000000L //1/1/2011 0:00:00 GMT
+  val tsTo = 1325376000000L   //1/1/2012 0:00:00 GMT
   val eventVersion = 1 to 4
 
+  private val seed = 0xdeadbeef
+  private lazy val rnd = new Random(seed)
+
   /**
-   * Get a next random message
+   * Get the next random message
    */
   def nextResourceEvent() : ResourceEvent = {
+    val res = rnd.shuffle(resources).head
+
+    val extra = res match {
+      case "vmtime" => Map("vmid" -> rnd.nextInt(vmIds.max).toString)
+      case _ => Map[String, String]()
+    }
 
+    val ts = tsFrom + (scala.math.random * ((tsTo - tsFrom) + 1)).asInstanceOf[Long]
 
-    ResourceEvent(0,0,"",0L,0,Map())
+    ResourceEvent(
+      rnd.nextInt(userIds.max),
+      rnd.nextInt(clientIds.max),
+      res,ts,1,extra)
   }
 
   /**
-   * Generate messages and add them to the queue
+   * Generate resource events and publish them to the queue
    */
-  def genAdd(num: Int) = {
+  def genPublish(num: Int) = {
 
     assert(num > 0)
+    val publisher = producer("aquarium")
 
     (1 to num).foreach {
-      n =>
+      n => publisher ! Message(nextResourceEvent.toBytes, "test.msg")
     }
   }
 }
\ No newline at end of file