Finish testcase for amqp. Using a localhost installation running
authorChristos KK Loverdos <loverdos@gmail.com>
Tue, 22 Nov 2011 10:12:16 +0000 (12:12 +0200)
committerChristos KK Loverdos <loverdos@gmail.com>
Tue, 22 Nov 2011 10:12:16 +0000 (12:12 +0200)
logic/pom.xml
logic/src/main/scala/gr/grnet/aquarium/messaging/amqp/AMQPConsumer.scala
logic/src/main/scala/gr/grnet/aquarium/messaging/amqp/AMQPDeliveryAgent.scala [new file with mode: 0644]
logic/src/main/scala/gr/grnet/aquarium/messaging/amqp/AMQPDeliveryHandler.scala [new file with mode: 0644]
logic/src/main/scala/gr/grnet/aquarium/messaging/amqp/AMQPProducer.scala
logic/src/main/scala/gr/grnet/aquarium/messaging/amqp/rabbitmq/v091/RabbitMQConsumer.scala
logic/src/main/scala/gr/grnet/aquarium/messaging/amqp/rabbitmq/v091/RabbitMQDeliveryAgent.scala [new file with mode: 0644]
logic/src/main/scala/gr/grnet/aquarium/messaging/amqp/rabbitmq/v091/RabbitMQProducer.scala
logic/src/main/scala/gr/grnet/aquarium/messaging/amqp/rabbitmq/v091/confmodel/confmodel.scala
logic/src/test/resources/rabbitmq/configurations.xml
logic/src/test/scala/gr/grnet/aquarium/messaging/MessagingTest.scala

index 4e96029..5a7d506 100644 (file)
     <dependency>
       <groupId>com.ckkloverdos</groupId>
       <artifactId>streamresource_2.9.1</artifactId>
-      <version>0.1.1</version>
+      <version>0.2.0</version>
     </dependency>
 
     <dependency>
index 2cda8d5..f2a24db 100644 (file)
@@ -41,4 +41,6 @@ package gr.grnet.aquarium.messaging.amqp
  */
 trait AMQPConsumer {
   def name: String
+
+  def newDeliveryAgent(handler: AMQPDeliveryHandler): AMQPDeliveryAgent
 }
\ No newline at end of file
diff --git a/logic/src/main/scala/gr/grnet/aquarium/messaging/amqp/AMQPDeliveryAgent.scala b/logic/src/main/scala/gr/grnet/aquarium/messaging/amqp/AMQPDeliveryAgent.scala
new file mode 100644 (file)
index 0000000..4bc1b9b
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2011 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.messaging.amqp
+
+/**
+ * 
+ * @author Christos KK Loverdos <loverdos@gmail.com>.
+ */
+trait AMQPDeliveryAgent {
+  def deliverNext: Unit
+}
\ No newline at end of file
diff --git a/logic/src/main/scala/gr/grnet/aquarium/messaging/amqp/AMQPDeliveryHandler.scala b/logic/src/main/scala/gr/grnet/aquarium/messaging/amqp/AMQPDeliveryHandler.scala
new file mode 100644 (file)
index 0000000..9a1f3af
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2011 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.messaging.amqp
+
+import com.ckkloverdos.props.Props
+
+
+/**
+ * 
+ * @author Christos KK Loverdos <loverdos@gmail.com>.
+ */
+trait AMQPDeliveryHandler {
+  def handleStringDelivery(envelope: Props, headers: Props, content: String)
+}
\ No newline at end of file
index 46183fd..5dbf42c 100644 (file)
@@ -43,6 +43,6 @@ import parallel.Future
  */
 trait AMQPProducer {
   def name: String
-  def publish(message: String, headers: Map[String,  String] = Map()): Unit
-  def publishWithConfirm(message: String, headers: Map[String,  String] = Map()): Boolean
+  def publishString(message: String, headers: Map[String,  String] = Map()): Unit
+  def publishStringWithConfirm(message: String, headers: Map[String,  String] = Map()): Boolean
 }
\ No newline at end of file
index d9d0398..7367543 100644 (file)
@@ -39,14 +39,38 @@ package v091
 
 import confmodel.RabbitMQConsumerModel
 import com.rabbitmq.client.{Channel => JackRabbitChannel, Connection => JackRabbitConnection, ConnectionFactory => JackRabbitConnectionFactory}
+import gr.grnet.aquarium.util.Loggable
 
 /**
  * 
  * @author Christos KK Loverdos <loverdos@gmail.com>.
  */
-class RabbitMQConsumer(private[v091] val owner: RabbitMQConnection, val confModel: RabbitMQConsumerModel) extends AMQPConsumer {
+class RabbitMQConsumer(private[v091] val owner: RabbitMQConnection, val confModel: RabbitMQConsumerModel) extends AMQPConsumer with Loggable  {
+  private[v091] val _rabbitChannel = {
+    val _ch = owner._rabbitConnection.createChannel()
+    logger.info("Created rabbit channel %s for %s".format(_ch, this.toString))
+    val exchange = owner.confModel.exchange
+    val exchangeType = owner.confModel.exchangeType
+    val exchangeIsDurable = owner.confModel.isDurable
+    val queue = confModel.queue
+    val routingKey = confModel.routingKey
+    val queueIsDurable = confModel.queueIsDurable
+    val queueIsAutoDelete = confModel.queueIsAutoDelete
+    val queueIsExclusive = confModel.queueIsExclusive
+
+    val ed = _ch.exchangeDeclare(exchange, exchangeType, exchangeIsDurable)
+    logger.info("Declared exchange %s for %s with result %s".format(exchange, this, ed))
+
+    _ch.queueDeclare(queue, queueIsDurable, queueIsExclusive, queueIsAutoDelete, null)
+
+    _ch.queueBind(queue, exchange, routingKey)
+    _ch
+  }
+
   def name = confModel.name
 
+  def newDeliveryAgent(handler: AMQPDeliveryHandler) = new RabbitMQDeliveryAgent(this, handler)
+
   override def toString = {
     val connName = owner.name
     val confName = owner.owner.name
diff --git a/logic/src/main/scala/gr/grnet/aquarium/messaging/amqp/rabbitmq/v091/RabbitMQDeliveryAgent.scala b/logic/src/main/scala/gr/grnet/aquarium/messaging/amqp/rabbitmq/v091/RabbitMQDeliveryAgent.scala
new file mode 100644 (file)
index 0000000..6a0721d
--- /dev/null
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2011 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.messaging.amqp
+package rabbitmq
+package v091
+
+import scala.collection.JavaConversions._
+import java.lang.String
+import com.rabbitmq.client.{Envelope, DefaultConsumer}
+import com.rabbitmq.client.AMQP.BasicProperties
+import com.ckkloverdos.props.Props
+
+/**
+ * 
+ * @author Christos KK Loverdos <loverdos@gmail.com>.
+ */
+class RabbitMQDeliveryAgent(consumer: RabbitMQConsumer, handler: AMQPDeliveryHandler) extends AMQPDeliveryAgent {
+  import RabbitMQDeliveryAgent.{EnvelopeKeys, BasicPropsKeys}
+
+  val underlyingHandler = new DefaultConsumer(consumer._rabbitChannel) {
+    override def handleDelivery(
+        consumerTag: String,
+        envelope: Envelope,
+        properties: BasicProperties,
+        body: Array[Byte]) = {
+
+      val propsEnvelope = new Props(
+        Map(
+          EnvelopeKeys.consumerTag -> consumerTag,
+          EnvelopeKeys.deliveryTag -> envelope.getDeliveryTag.toString,
+          EnvelopeKeys.exchange    -> envelope.getExchange,
+          EnvelopeKeys.routingKey  -> envelope.getRoutingKey,
+          EnvelopeKeys.redeliver   -> envelope.isRedeliver.toString
+        )
+      )
+
+      val propsHeader = new Props(
+        Map(
+          BasicPropsKeys.contentType -> properties.getContentType,
+          BasicPropsKeys.contentEncoding -> properties.getContentEncoding,
+//          BasicPropsKeys.headers -> properties.get,
+          BasicPropsKeys.deliveryMode -> properties.getDeliveryMode.toString,
+          BasicPropsKeys.priority -> properties.getPriority.toString,
+          BasicPropsKeys.correlationId -> properties.getCorrelationId,
+          BasicPropsKeys.replyTo -> properties.getReplyTo,
+          BasicPropsKeys.expiration -> properties.getExpiration,
+          BasicPropsKeys.messageId -> properties.getMessageId,
+          BasicPropsKeys.timestamp -> properties.getTimestamp.toString,
+          BasicPropsKeys.`type` -> properties.getType,
+          BasicPropsKeys.userId -> properties.getUserId,
+          BasicPropsKeys.appId -> properties.getAppId,
+          BasicPropsKeys.clusterId -> properties.getClusterId
+        )
+      )
+      handler.handleStringDelivery(propsEnvelope, propsHeader, new String(body, "UTF-8"))
+    }
+  }
+  def deliverNext = {
+    val queue = consumer.confModel.queue
+    val autoAck = consumer.confModel.autoAck
+
+    consumer._rabbitChannel.basicConsume(queue, autoAck, underlyingHandler)
+  }
+}
+object RabbitMQDeliveryAgent {
+  object EnvelopeKeys {
+    val consumerTag = "consumerTag"
+    val deliveryTag = "deliveryTag"
+    val redeliver   = "redeliver"
+    val exchange    = "exchange"
+    val routingKey  = "routingKey"
+
+  }
+
+  object BasicPropsKeys {
+    val contentType = "contentType"
+    val contentEncoding = "contentEncoding"
+//    val headers = "headers"
+    val deliveryMode = "deliveryMode"
+    val priority = "priority"
+    val correlationId = "correlationId"
+    val replyTo = "replyTo"
+    val expiration = "expiration"
+    val messageId = "messageId"
+    val timestamp = "timestamp"
+    val `type` = "type"
+    val userId = "userId"
+    val appId = "appId"
+    val clusterId = "clusterId"
+  }
+}
\ No newline at end of file
index 5fbd46a..3e32e36 100644 (file)
@@ -48,7 +48,17 @@ import gr.grnet.aquarium.util.Loggable
  * @author Christos KK Loverdos <loverdos@gmail.com>.
  */
 class RabbitMQProducer(private[v091] val owner: RabbitMQConnection, val confModel: RabbitMQProducerModel) extends AMQPProducer with Loggable {
-  private[v091] val _rabbitChannel = owner._rabbitConnection.createChannel()
+  private[v091] val _rabbitChannel = {
+    val _ch = owner._rabbitConnection.createChannel()
+    logger.info("Created rabbit channel %s for %s".format(_ch, this.toString))
+    val exchange = owner.confModel.exchange
+    val exchangeType = owner.confModel.exchangeType
+    val isDurable = owner.confModel.isDurable
+
+    val ed = _ch.exchangeDeclare(exchange, exchangeType, isDurable)
+    logger.info("Declared exchange %s for %s with result %s".format(exchange, this, ed))
+    _ch
+  }
 
   def name = confModel.name
 
@@ -65,11 +75,11 @@ class RabbitMQProducer(private[v091] val owner: RabbitMQConnection, val confMode
     post(jrChannel)
   }
 
-  def publish(message: String, headers: Map[String, String] = Map()) = {
+  def publishString(message: String, headers: Map[String, String] = Map()) = {
     _publish(message, headers){_ =>}{_ => ()} {_ => logger.error("publish() from producer %s".format())}
   }
 
-  def publishWithConfirm(message: String, headers: Map[String, String] = Map()) = {
+  def publishStringWithConfirm(message: String, headers: Map[String, String] = Map()) = {
     _publish(message, headers) { _.confirmSelect() }{ _.waitForConfirms() } {_ => logger.error("publishWithConfirm() from producer %s".format())}
   }
 
index 70d62cc..ae5ff21 100644 (file)
@@ -67,15 +67,25 @@ case class RabbitMQConfigurationModel(
 }
 
 case class RabbitMQConnectionModel(
-  name: String,
-  exchange: String,
-  exchangeType: String,
-  isDurable: Boolean,
-  producers: List[RabbitMQProducerModel],
-  consumers: List[RabbitMQConsumerModel]
+    name: String,
+    exchange: String,
+    exchangeType: String,
+    isDurable: Boolean,
+    producers: List[RabbitMQProducerModel],
+    consumers: List[RabbitMQConsumerModel]
 ) extends RabbitMQConfModel {
 
 }
 
-case class RabbitMQProducerModel(name: String, routingKey: String) extends RabbitMQConfModel
-case class RabbitMQConsumerModel(name: String, queue: String) extends RabbitMQConfModel
+case class RabbitMQProducerModel(
+    name: String,
+    routingKey: String) extends RabbitMQConfModel
+
+case class RabbitMQConsumerModel(
+    name: String,
+    queue: String,
+    routingKey: String,
+    autoAck: Boolean,
+    queueIsDurable: Boolean,
+    queueIsExclusive: Boolean,
+    queueIsAutoDelete: Boolean) extends RabbitMQConfModel
index e27f70d..7caa8d7 100644 (file)
             <RabbitMQConsumerModel>
               <name>consumer1</name>
               <queue>queue1</queue>
+              <routingKey>routing.key.all</routingKey>
+              <autoAck>true</autoAck>
+              <queueIsDurable>true</queueIsDurable>
+              <queueIsAutoDelete>true</queueIsAutoDelete>
+              <queueIsExclusive>false</queueIsExclusive>
             </RabbitMQConsumerModel>
           </consumers>
         </RabbitMQConnectionModel>
index a1391c7..b813c58 100644 (file)
@@ -35,6 +35,7 @@
 
 package gr.grnet.aquarium.messaging
 
+import amqp.AMQPDeliveryHandler
 import amqp.rabbitmq.v091.confmodel._
 import amqp.rabbitmq.v091.RabbitMQConfigurations.{PropFiles, RCFolders}
 import amqp.rabbitmq.v091.{RabbitMQConsumer, RabbitMQConfigurations}
@@ -43,6 +44,8 @@ import org.junit.Assert._
 import com.ckkloverdos.resource.DefaultResourceContext
 import gr.grnet.aquarium.util.xstream.XStreamHelpers
 import gr.grnet.aquarium.util.Loggable
+import com.ckkloverdos.props.Props
+import com.ckkloverdos.maybe.{Failed, NoVal, Just}
 
 /**
  * 
@@ -53,8 +56,21 @@ class MessagingTest extends Loggable {
   val baseRC = DefaultResourceContext
   val rabbitmqRC = baseRC / RCFolders.rabbitmq
 
+  object Names {
+    val consumer1 = "consumer1"
+    val producer1 = "producer1"
+    val queue1 = "queue1"
+    val routing_key_all = "routing.key.all"
+    val local_connection = "local_connection"
+    val aquarium_exchange = "aquarium_exchange"
+    val direct = "direct"
+    val localhost_aquarium = "localhost_aquarium"
+    val aquarium = "aquarium"
+    val localhost = "localhost"
+  }
+
   private def _genTestConf: String = {
-    val consmod1 = new RabbitMQConsumerModel("consumer1", "queue1")
+    val consmod1 = new RabbitMQConsumerModel("consumer1", "queue1", "routing.key.all", true, true, false, false)
     val prodmod1 = new RabbitMQProducerModel("producer1", "routing.key.all")
     val conn1 = new RabbitMQConnectionModel(
       "local_connection",
@@ -90,14 +106,55 @@ class MessagingTest extends Loggable {
   def testLocalProducer {
     val maybeConfs = RabbitMQConfigurations(baseRC)
     assertTrue(maybeConfs.isJust)
-    for {
+    val maybeProducer = for {
       confs    <- maybeConfs
-      conf     <- confs.findConfiguration("localhost_aquarium")
-      conn     <- conf.findConnection("local_connection")
-      producer <- conn.findProducer("producer1")
+      conf     <- confs.findConfiguration(Names.localhost_aquarium)
+      conn     <- conf.findConnection(Names.local_connection)
+      producer <- conn.findProducer(Names.producer1)
     } yield {
-      logger.debug("Publishing a message from %s".format(producer))
-      producer.publish("Test")
+      producer
+    }
+
+    maybeProducer match {
+      case Just(producer) =>
+        logger.debug("Publishing a message from %s".format(producer))
+        producer.publishString("Test")
+      case NoVal =>
+        fail("No producer named %s".format(Names.producer1))
+      case Failed(e, m) =>
+        fail("%s: %s".format(m, e.getMessage))
+    }
+  }
+
+  @Test
+  def testLocalConsumer {
+    val maybeConfs = RabbitMQConfigurations(baseRC)
+    assertTrue(maybeConfs.isJust)
+
+    val maybeConsumer = for {
+      confs    <- maybeConfs
+      conf     <- confs.findConfiguration(Names.localhost_aquarium)
+      conn     <- conf.findConnection(Names.local_connection)
+      consumer <- conn.findConsumer(Names.consumer1)
+    } yield {
+      consumer
+    }
+
+    maybeConsumer match {
+      case Just(consumer) =>
+        logger.debug("Receiving a message from %s".format(consumer))
+        consumer.newDeliveryAgent(new AMQPDeliveryHandler {
+          def handleStringDelivery(envelope: Props, headers: Props, content: String) = {
+            logger.debug("Received message with")
+            logger.debug("  envelope: %s".format(envelope))
+            logger.debug("  headers : %s".format(headers))
+            logger.debug("  body    : %s".format(content))
+          }
+        })
+      case NoVal =>
+        fail("No consumer named %s".format(Names.consumer1))
+      case Failed(e, m) =>
+        fail("%s: %s".format(m, e.getMessage))
     }
   }