<dependency>
<groupId>com.ckkloverdos</groupId>
<artifactId>streamresource_2.9.1</artifactId>
- <version>0.1.1</version>
+ <version>0.2.0</version>
</dependency>
<dependency>
*/
trait AMQPConsumer {
def name: String
+
+ def newDeliveryAgent(handler: AMQPDeliveryHandler): AMQPDeliveryAgent
}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright 2011 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.messaging.amqp
+
+/**
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>.
+ */
+trait AMQPDeliveryAgent {
+ def deliverNext: Unit
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright 2011 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.messaging.amqp
+
+import com.ckkloverdos.props.Props
+
+
+/**
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>.
+ */
+trait AMQPDeliveryHandler {
+ def handleStringDelivery(envelope: Props, headers: Props, content: String)
+}
\ No newline at end of file
*/
trait AMQPProducer {
def name: String
- def publish(message: String, headers: Map[String, String] = Map()): Unit
- def publishWithConfirm(message: String, headers: Map[String, String] = Map()): Boolean
+ def publishString(message: String, headers: Map[String, String] = Map()): Unit
+ def publishStringWithConfirm(message: String, headers: Map[String, String] = Map()): Boolean
}
\ No newline at end of file
import confmodel.RabbitMQConsumerModel
import com.rabbitmq.client.{Channel => JackRabbitChannel, Connection => JackRabbitConnection, ConnectionFactory => JackRabbitConnectionFactory}
+import gr.grnet.aquarium.util.Loggable
/**
*
* @author Christos KK Loverdos <loverdos@gmail.com>.
*/
-class RabbitMQConsumer(private[v091] val owner: RabbitMQConnection, val confModel: RabbitMQConsumerModel) extends AMQPConsumer {
+class RabbitMQConsumer(private[v091] val owner: RabbitMQConnection, val confModel: RabbitMQConsumerModel) extends AMQPConsumer with Loggable {
+ private[v091] val _rabbitChannel = {
+ val _ch = owner._rabbitConnection.createChannel()
+ logger.info("Created rabbit channel %s for %s".format(_ch, this.toString))
+ val exchange = owner.confModel.exchange
+ val exchangeType = owner.confModel.exchangeType
+ val exchangeIsDurable = owner.confModel.isDurable
+ val queue = confModel.queue
+ val routingKey = confModel.routingKey
+ val queueIsDurable = confModel.queueIsDurable
+ val queueIsAutoDelete = confModel.queueIsAutoDelete
+ val queueIsExclusive = confModel.queueIsExclusive
+
+ val ed = _ch.exchangeDeclare(exchange, exchangeType, exchangeIsDurable)
+ logger.info("Declared exchange %s for %s with result %s".format(exchange, this, ed))
+
+ _ch.queueDeclare(queue, queueIsDurable, queueIsExclusive, queueIsAutoDelete, null)
+
+ _ch.queueBind(queue, exchange, routingKey)
+ _ch
+ }
+
def name = confModel.name
+ def newDeliveryAgent(handler: AMQPDeliveryHandler) = new RabbitMQDeliveryAgent(this, handler)
+
override def toString = {
val connName = owner.name
val confName = owner.owner.name
--- /dev/null
+/*
+ * Copyright 2011 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.messaging.amqp
+package rabbitmq
+package v091
+
+import scala.collection.JavaConversions._
+import java.lang.String
+import com.rabbitmq.client.{Envelope, DefaultConsumer}
+import com.rabbitmq.client.AMQP.BasicProperties
+import com.ckkloverdos.props.Props
+
+/**
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>.
+ */
+class RabbitMQDeliveryAgent(consumer: RabbitMQConsumer, handler: AMQPDeliveryHandler) extends AMQPDeliveryAgent {
+ import RabbitMQDeliveryAgent.{EnvelopeKeys, BasicPropsKeys}
+
+ val underlyingHandler = new DefaultConsumer(consumer._rabbitChannel) {
+ override def handleDelivery(
+ consumerTag: String,
+ envelope: Envelope,
+ properties: BasicProperties,
+ body: Array[Byte]) = {
+
+ val propsEnvelope = new Props(
+ Map(
+ EnvelopeKeys.consumerTag -> consumerTag,
+ EnvelopeKeys.deliveryTag -> envelope.getDeliveryTag.toString,
+ EnvelopeKeys.exchange -> envelope.getExchange,
+ EnvelopeKeys.routingKey -> envelope.getRoutingKey,
+ EnvelopeKeys.redeliver -> envelope.isRedeliver.toString
+ )
+ )
+
+ val propsHeader = new Props(
+ Map(
+ BasicPropsKeys.contentType -> properties.getContentType,
+ BasicPropsKeys.contentEncoding -> properties.getContentEncoding,
+// BasicPropsKeys.headers -> properties.get,
+ BasicPropsKeys.deliveryMode -> properties.getDeliveryMode.toString,
+ BasicPropsKeys.priority -> properties.getPriority.toString,
+ BasicPropsKeys.correlationId -> properties.getCorrelationId,
+ BasicPropsKeys.replyTo -> properties.getReplyTo,
+ BasicPropsKeys.expiration -> properties.getExpiration,
+ BasicPropsKeys.messageId -> properties.getMessageId,
+ BasicPropsKeys.timestamp -> properties.getTimestamp.toString,
+ BasicPropsKeys.`type` -> properties.getType,
+ BasicPropsKeys.userId -> properties.getUserId,
+ BasicPropsKeys.appId -> properties.getAppId,
+ BasicPropsKeys.clusterId -> properties.getClusterId
+ )
+ )
+ handler.handleStringDelivery(propsEnvelope, propsHeader, new String(body, "UTF-8"))
+ }
+ }
+ def deliverNext = {
+ val queue = consumer.confModel.queue
+ val autoAck = consumer.confModel.autoAck
+
+ consumer._rabbitChannel.basicConsume(queue, autoAck, underlyingHandler)
+ }
+}
+object RabbitMQDeliveryAgent {
+ object EnvelopeKeys {
+ val consumerTag = "consumerTag"
+ val deliveryTag = "deliveryTag"
+ val redeliver = "redeliver"
+ val exchange = "exchange"
+ val routingKey = "routingKey"
+
+ }
+
+ object BasicPropsKeys {
+ val contentType = "contentType"
+ val contentEncoding = "contentEncoding"
+// val headers = "headers"
+ val deliveryMode = "deliveryMode"
+ val priority = "priority"
+ val correlationId = "correlationId"
+ val replyTo = "replyTo"
+ val expiration = "expiration"
+ val messageId = "messageId"
+ val timestamp = "timestamp"
+ val `type` = "type"
+ val userId = "userId"
+ val appId = "appId"
+ val clusterId = "clusterId"
+ }
+}
\ No newline at end of file
* @author Christos KK Loverdos <loverdos@gmail.com>.
*/
class RabbitMQProducer(private[v091] val owner: RabbitMQConnection, val confModel: RabbitMQProducerModel) extends AMQPProducer with Loggable {
- private[v091] val _rabbitChannel = owner._rabbitConnection.createChannel()
+ private[v091] val _rabbitChannel = {
+ val _ch = owner._rabbitConnection.createChannel()
+ logger.info("Created rabbit channel %s for %s".format(_ch, this.toString))
+ val exchange = owner.confModel.exchange
+ val exchangeType = owner.confModel.exchangeType
+ val isDurable = owner.confModel.isDurable
+
+ val ed = _ch.exchangeDeclare(exchange, exchangeType, isDurable)
+ logger.info("Declared exchange %s for %s with result %s".format(exchange, this, ed))
+ _ch
+ }
def name = confModel.name
post(jrChannel)
}
- def publish(message: String, headers: Map[String, String] = Map()) = {
+ def publishString(message: String, headers: Map[String, String] = Map()) = {
_publish(message, headers){_ =>}{_ => ()} {_ => logger.error("publish() from producer %s".format())}
}
- def publishWithConfirm(message: String, headers: Map[String, String] = Map()) = {
+ def publishStringWithConfirm(message: String, headers: Map[String, String] = Map()) = {
_publish(message, headers) { _.confirmSelect() }{ _.waitForConfirms() } {_ => logger.error("publishWithConfirm() from producer %s".format())}
}
}
case class RabbitMQConnectionModel(
- name: String,
- exchange: String,
- exchangeType: String,
- isDurable: Boolean,
- producers: List[RabbitMQProducerModel],
- consumers: List[RabbitMQConsumerModel]
+ name: String,
+ exchange: String,
+ exchangeType: String,
+ isDurable: Boolean,
+ producers: List[RabbitMQProducerModel],
+ consumers: List[RabbitMQConsumerModel]
) extends RabbitMQConfModel {
}
-case class RabbitMQProducerModel(name: String, routingKey: String) extends RabbitMQConfModel
-case class RabbitMQConsumerModel(name: String, queue: String) extends RabbitMQConfModel
+case class RabbitMQProducerModel(
+ name: String,
+ routingKey: String) extends RabbitMQConfModel
+
+case class RabbitMQConsumerModel(
+ name: String,
+ queue: String,
+ routingKey: String,
+ autoAck: Boolean,
+ queueIsDurable: Boolean,
+ queueIsExclusive: Boolean,
+ queueIsAutoDelete: Boolean) extends RabbitMQConfModel
<RabbitMQConsumerModel>
<name>consumer1</name>
<queue>queue1</queue>
+ <routingKey>routing.key.all</routingKey>
+ <autoAck>true</autoAck>
+ <queueIsDurable>true</queueIsDurable>
+ <queueIsAutoDelete>true</queueIsAutoDelete>
+ <queueIsExclusive>false</queueIsExclusive>
</RabbitMQConsumerModel>
</consumers>
</RabbitMQConnectionModel>
package gr.grnet.aquarium.messaging
+import amqp.AMQPDeliveryHandler
import amqp.rabbitmq.v091.confmodel._
import amqp.rabbitmq.v091.RabbitMQConfigurations.{PropFiles, RCFolders}
import amqp.rabbitmq.v091.{RabbitMQConsumer, RabbitMQConfigurations}
import com.ckkloverdos.resource.DefaultResourceContext
import gr.grnet.aquarium.util.xstream.XStreamHelpers
import gr.grnet.aquarium.util.Loggable
+import com.ckkloverdos.props.Props
+import com.ckkloverdos.maybe.{Failed, NoVal, Just}
/**
*
val baseRC = DefaultResourceContext
val rabbitmqRC = baseRC / RCFolders.rabbitmq
+ object Names {
+ val consumer1 = "consumer1"
+ val producer1 = "producer1"
+ val queue1 = "queue1"
+ val routing_key_all = "routing.key.all"
+ val local_connection = "local_connection"
+ val aquarium_exchange = "aquarium_exchange"
+ val direct = "direct"
+ val localhost_aquarium = "localhost_aquarium"
+ val aquarium = "aquarium"
+ val localhost = "localhost"
+ }
+
private def _genTestConf: String = {
- val consmod1 = new RabbitMQConsumerModel("consumer1", "queue1")
+ val consmod1 = new RabbitMQConsumerModel("consumer1", "queue1", "routing.key.all", true, true, false, false)
val prodmod1 = new RabbitMQProducerModel("producer1", "routing.key.all")
val conn1 = new RabbitMQConnectionModel(
"local_connection",
def testLocalProducer {
val maybeConfs = RabbitMQConfigurations(baseRC)
assertTrue(maybeConfs.isJust)
- for {
+ val maybeProducer = for {
confs <- maybeConfs
- conf <- confs.findConfiguration("localhost_aquarium")
- conn <- conf.findConnection("local_connection")
- producer <- conn.findProducer("producer1")
+ conf <- confs.findConfiguration(Names.localhost_aquarium)
+ conn <- conf.findConnection(Names.local_connection)
+ producer <- conn.findProducer(Names.producer1)
} yield {
- logger.debug("Publishing a message from %s".format(producer))
- producer.publish("Test")
+ producer
+ }
+
+ maybeProducer match {
+ case Just(producer) =>
+ logger.debug("Publishing a message from %s".format(producer))
+ producer.publishString("Test")
+ case NoVal =>
+ fail("No producer named %s".format(Names.producer1))
+ case Failed(e, m) =>
+ fail("%s: %s".format(m, e.getMessage))
+ }
+ }
+
+ @Test
+ def testLocalConsumer {
+ val maybeConfs = RabbitMQConfigurations(baseRC)
+ assertTrue(maybeConfs.isJust)
+
+ val maybeConsumer = for {
+ confs <- maybeConfs
+ conf <- confs.findConfiguration(Names.localhost_aquarium)
+ conn <- conf.findConnection(Names.local_connection)
+ consumer <- conn.findConsumer(Names.consumer1)
+ } yield {
+ consumer
+ }
+
+ maybeConsumer match {
+ case Just(consumer) =>
+ logger.debug("Receiving a message from %s".format(consumer))
+ consumer.newDeliveryAgent(new AMQPDeliveryHandler {
+ def handleStringDelivery(envelope: Props, headers: Props, content: String) = {
+ logger.debug("Received message with")
+ logger.debug(" envelope: %s".format(envelope))
+ logger.debug(" headers : %s".format(headers))
+ logger.debug(" body : %s".format(content))
+ }
+ })
+ case NoVal =>
+ fail("No consumer named %s".format(Names.consumer1))
+ case Failed(e, m) =>
+ fail("%s: %s".format(m, e.getMessage))
}
}