+++ /dev/null
-/*
- * Copyright 2011 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.messaging.amqp
-
-import com.ckkloverdos.maybe.Maybe
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-trait AMQPConfiguration {
- def name: String
- def connections: List[AMQPConnection]
-
- def connectionNames = connections.map(_.name)
-
- def findConnection(name: String) = connections.find(_.name == name)
-}
+++ /dev/null
-/*
- * Copyright 2011 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.messaging.amqp
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-trait AMQPConfigurations {
- def configurations: List[AMQPConfiguration]
-
- def configurationNames = configurations.map(_.name)
-
- def findConfiguration(name: String): Option[AMQPConfiguration] = configurations.find(_.name == name)
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright 2011 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.messaging.amqp
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-trait AMQPConnection {
- def name: String
-
- def producers: List[AMQPProducer]
- def consumers: List[AMQPConsumer]
-
- def producerNames = producers.map(_.name)
- def consumerNames = consumers.map(_.name)
-
- def findProducer(name: String): Option[AMQPProducer] = producers.find(_.name == name)
- def findConsumer(name: String): Option[AMQPConsumer] = consumers.find(_.name == name)
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright 2011 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.messaging.amqp
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-trait AMQPConsumer {
- def name: String
-
- def newDeliveryAgent(handler: AMQPDeliveryHandler): AMQPDeliveryAgent
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright 2011 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.messaging.amqp
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-trait AMQPDeliveryAgent {
- def deliverNext: Unit
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright 2011 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.messaging.amqp
-
-import com.ckkloverdos.props.Props
-
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-trait AMQPDeliveryHandler {
- /**
- * Returns true if the message was handled OK.
- */
- def handleStringDelivery(envelope: Props, headers: Props, content: String): Boolean
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright 2011 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.messaging.amqp
-
-import parallel.Future
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-trait AMQPProducer {
- def name: String
- def publishString(message: String, headers: Map[String, String] = Map()): Unit
- //def publishStringWithConfirm(message: String, headers: Map[String, String] = Map()): Boolean
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright 2011 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.messaging
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-object MessagingHelpers {
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright 2011 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.messaging.amqp
-package rabbitmq
-package v091
-
-import com.rabbitmq.client.{Channel => JackRabbitChannel, Connection => JackRabbitConnection, ConnectionFactory => JackRabbitConnectionFactory}
-import confmodel.RabbitMQConfigurationModel
-import gr.grnet.aquarium.util.Loggable
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-class RabbitMQConfiguration(val confModel: RabbitMQConfigurationModel) extends AMQPConfiguration with Loggable {
- private[v091] lazy val _rabbitConnectionFactory = {
- val _cf = new JackRabbitConnectionFactory
-
- logger.debug("Using (username, host, port, virtualHost) = (%s, %s, %s, %s)".format(confModel.username, confModel.host, confModel.port, confModel.virtualHost))
- _cf.setUsername(confModel.username)
- _cf.setPassword(confModel.password)
- _cf.setHost(confModel.host)
- _cf.setPort(confModel.port)
- _cf.setVirtualHost(confModel.virtualHost)
-
- _cf
- }
-
- private val _name = confModel.name
- def name = _name
-
- private lazy val _connections = confModel.connections.map(new RabbitMQConnection(this, _))
- def connections = _connections
-
- override def toString = {
- "RabbitMQConfiguration(%s)".format(name)
- }
-}
-
-
-object RabbitMQConfiguration {
- object RCFolders {
- val rabbitmq = "rabbitmq"
- }
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright 2011 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.messaging.amqp
-package rabbitmq
-package v091
-
-
-import confmodel.RabbitMQConfigurationsModel
-import gr.grnet.aquarium.util.xstream.XStreamHelpers
-import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
-import gr.grnet.aquarium.util.{Loggable, shortNameOfClass}
-import com.ckkloverdos.resource.{StreamResource, StreamResourceContext}
-import com.thoughtworks.xstream.XStream
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-class RabbitMQConfigurations(val confModel: RabbitMQConfigurationsModel) extends AMQPConfigurations {
- private val _configurations = confModel.configurations.map(new RabbitMQConfiguration(_))
- def configurations = _configurations
-}
-
-object RabbitMQConfigurations extends Loggable {
- object RCFolders {
- val rabbitmq = "rabbitmq"
-
- val producers = "producers"
- val consumers = "consumers"
- }
-
- object PropFiles {
- val configurations = "configurations.xml"
- }
-
- def apply(resource: StreamResource, xs: XStream): Maybe[RabbitMQConfigurations] = {
- logger.info("Reading rabbitmq configuration from %s".format(resource.url))
- val maybeConfsModel = XStreamHelpers.parseType[RabbitMQConfigurationsModel](resource, xs)
-
- maybeConfsModel match {
- case Just(confsModel) =>
- // parsed <configurations>.xml (like: rabbitmq/configurations.xml)
- // now have a RabbitMQConfigurationsModel
- logger.debug("rabbitmq configuration from %s is\n%s".format(resource.url, resource.stringContent.getOr("")))
- val confsModelErrors = confsModel.validateConfModel
-
- if(confsModelErrors.size > 0) {
- val errorMsg = "%s has %s error(s)".format(shortNameOfClass(confsModel.getClass))
- logger.error(errorMsg)
- for(error <- confsModelErrors) {
- logger.error(error)
- }
- Failed(new Exception(errorMsg))
- } else {
- Just(new RabbitMQConfigurations(confsModel))
- }
- case NoVal =>
- NoVal
- case Failed(e, m) =>
- Failed(e, m)
- }
- }
-
- def apply(baseRC: StreamResourceContext, xs: XStream = XStreamHelpers.DefaultXStream): Maybe[RabbitMQConfigurations] = {
- val rabbitMQRC = baseRC / RCFolders.rabbitmq
-
- val maybeConfsResource = rabbitMQRC.getResource(PropFiles.configurations)
- maybeConfsResource.flatMap(this(_, xs))
- }
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright 2011 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.messaging.amqp
-package rabbitmq
-package v091
-
-import confmodel.RabbitMQConnectionModel
-import gr.grnet.aquarium.messaging.amqp.AMQPConnection
-
-import com.rabbitmq.client.{Channel => JackRabbitChannel, Connection => JackRabbitConnection, ConnectionFactory => JackRabbitConnectionFactory}
-
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-class RabbitMQConnection(private[v091] val owner: RabbitMQConfiguration, val confModel: RabbitMQConnectionModel) extends AMQPConnection {
- private[v091] lazy val _rabbitConnection: JackRabbitConnection = owner._rabbitConnectionFactory.newConnection()
-
- private val _name = confModel.name
- def name = _name
-
- private lazy val _producers = confModel.producers.map(new RabbitMQProducer(this, _))
- def producers = _producers
-
- private lazy val _consumers = confModel.consumers.map(new RabbitMQConsumer(this, _))
- def consumers = _consumers
-
- override def toString = {
- "RabbitMQConnection(%s/%s)".format(owner.name, name)
- }
-}
+++ /dev/null
-/*
- * Copyright 2011 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.messaging.amqp
-package rabbitmq
-package v091
-
-import confmodel.RabbitMQConsumerModel
-import com.rabbitmq.client.{Channel => JackRabbitChannel, Connection => JackRabbitConnection, ConnectionFactory => JackRabbitConnectionFactory}
-import gr.grnet.aquarium.util.Loggable
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-class RabbitMQConsumer(private[v091] val owner: RabbitMQConnection, val confModel: RabbitMQConsumerModel) extends AMQPConsumer with Loggable {
- private[v091] val _rabbitChannel = {
- val _ch = owner._rabbitConnection.createChannel()
-
- logger.info("Created rabbit channel '%s' for '%s'".format(_ch, this.toString))
- val exchange = owner.confModel.exchange
-// val exchangeType = owner.confModel.exchangeType
-// val exchangeIsDurable = owner.confModel.isDurable
- val queue = confModel.queue
- val routingKey = confModel.routingKey
- val queueIsDurable = confModel.queueIsDurable
- val queueIsAutoDelete = confModel.queueIsAutoDelete
- val queueIsExclusive = confModel.queueIsExclusive
-
-// val ed = _ch.exchangeDeclare(exchange, exchangeType, exchangeIsDurable)
-// logger.info("Declared exchange '%s' for %s and got result %s".format(exchange, this, ed))
-
- val qd = _ch.queueDeclare(queue, queueIsDurable, queueIsExclusive, queueIsAutoDelete, null)
- // (D)urable (E)xclusive (A)uto(D)elete --> DEAD
- val dead = "(%sdurable, %sexclusive, %sautodelete)".format(
- Seq(queueIsDurable, queueIsExclusive, queueIsAutoDelete).map(if(_) "" else "not "): _*
- )
- logger.info("Declared %s queue '%s' for %s and got result %s".format(dead, queue, this, qd))
-
- val qb = _ch.queueBind(queue, exchange, routingKey)
- logger.info("Bound queue '%s' to exchange '%s' with routing key '%s' for %s and got result %s".format(queue, exchange, routingKey, this, qb))
- _ch
- }
-
- def name = confModel.name
-
- def newDeliveryAgent(handler: AMQPDeliveryHandler) = new RabbitMQDeliveryAgent(this, handler)
-
- override def toString = {
- val connName = owner.name
- val confName = owner.owner.name
- "RabbitMQConsumer(%s/%s/%s)".format(confName, connName, name)
- }
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright 2011 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.messaging.amqp
-package rabbitmq
-package v091
-
-import java.lang.String
-import com.rabbitmq.client.AMQP.BasicProperties
-import com.ckkloverdos.props.Props
-import com.rabbitmq.client.{QueueingConsumer, Envelope}
-import gr.grnet.aquarium.util.Loggable
-import gr.grnet.aquarium.util.safeToStringOrNull
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-class RabbitMQDeliveryAgent(consumer: RabbitMQConsumer, handler: AMQPDeliveryHandler) extends AMQPDeliveryAgent with Loggable {
- import RabbitMQDeliveryAgent.{EnvelopeKeys, BasicPropsKeys}
-
- lazy val underlyingHandler = new QueueingConsumer(consumer._rabbitChannel) {
- override def handleDelivery(
- consumerTag: String,
- envelope: Envelope,
- properties: BasicProperties,
- body: Array[Byte]) = {
-
- val propsEnvelope = new Props(
- Map(
- EnvelopeKeys.consumerTag -> consumerTag,
- EnvelopeKeys.deliveryTag -> envelope.getDeliveryTag.toString,
- EnvelopeKeys.exchange -> envelope.getExchange,
- EnvelopeKeys.routingKey -> envelope.getRoutingKey,
- EnvelopeKeys.redeliver -> envelope.isRedeliver.toString
- )
- )
-
- val propsHeader = new Props(
- Map(
- BasicPropsKeys.contentType -> properties.getContentType,
- BasicPropsKeys.contentEncoding -> properties.getContentEncoding,
-// BasicPropsKeys.headers -> properties.get,
- BasicPropsKeys.deliveryMode -> safeToStringOrNull(properties.getDeliveryMode),
- BasicPropsKeys.priority -> safeToStringOrNull(properties.getPriority),
- BasicPropsKeys.correlationId -> properties.getCorrelationId,
- BasicPropsKeys.replyTo -> properties.getReplyTo,
- BasicPropsKeys.expiration -> properties.getExpiration,
- BasicPropsKeys.messageId -> properties.getMessageId,
- BasicPropsKeys.timestamp -> safeToStringOrNull(properties.getTimestamp),
- BasicPropsKeys.`type` -> properties.getType,
- BasicPropsKeys.userId -> properties.getUserId,
- BasicPropsKeys.appId -> properties.getAppId,
- BasicPropsKeys.clusterId -> properties.getClusterId
- )
- )
-
- val handledOK = handler.handleStringDelivery(propsEnvelope, propsHeader, new String(body, "UTF-8"))
-
- if(handledOK && !consumer.confModel.autoAck) {
- val deliveryTag = envelope.getDeliveryTag
- logger.debug("Message with delivertTag %s handled OK, sending ACK".format(deliveryTag))
- consumer._rabbitChannel.basicAck(deliveryTag, false);
- }
- }
- }
-
- def deliverNext = {
- val queue = consumer.confModel.queue
- val autoAck = consumer.confModel.autoAck
-
- consumer._rabbitChannel.basicConsume(queue, autoAck, underlyingHandler)
- val delivery = underlyingHandler.nextDelivery()
- logger.debug("Got delivery %s".format(delivery))
- }
-}
-object RabbitMQDeliveryAgent {
- object EnvelopeKeys {
- val consumerTag = "consumerTag"
- val deliveryTag = "deliveryTag"
- val redeliver = "redeliver"
- val exchange = "exchange"
- val routingKey = "routingKey"
-
- }
-
- object BasicPropsKeys {
- val contentType = "contentType"
- val contentEncoding = "contentEncoding"
-// val headers = "headers"
- val deliveryMode = "deliveryMode"
- val priority = "priority"
- val correlationId = "correlationId"
- val replyTo = "replyTo"
- val expiration = "expiration"
- val messageId = "messageId"
- val timestamp = "timestamp"
- val `type` = "type"
- val userId = "userId"
- val appId = "appId"
- val clusterId = "clusterId"
- }
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright 2011 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.messaging.amqp
-package rabbitmq
-package v091
-
-import confmodel.RabbitMQProducerModel
-import gr.grnet.aquarium.messaging.amqp.AMQPProducer
-import com.rabbitmq.client.{Channel => JackRabbitChannel, Connection => JackRabbitConnection, ConnectionFactory => JackRabbitConnectionFactory}
-import com.rabbitmq.client.AMQP.BasicProperties
-import gr.grnet.aquarium.util.Loggable
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-class RabbitMQProducer(private[v091] val owner: RabbitMQConnection, val confModel: RabbitMQProducerModel) extends AMQPProducer with Loggable {
- private[v091] lazy val _rabbitChannel = {
- val _ch = owner._rabbitConnection.createChannel()
- val exchange = owner.confModel.exchange
- val exchangeType = owner.confModel.exchangeType
- val isDurable = owner.confModel.isDurable
-
- val ed = _ch.exchangeDeclare(exchange, exchangeType, isDurable)
- logger.info("Declared exchange '%s' of type '%s' for %s with result %s".format(exchange, exchangeType, this, ed))
- _ch
- }
-
- def name = confModel.name
-
- private def _publish[A](message: String, headers: Map[String, String])(pre: JackRabbitChannel => Any)(post: JackRabbitChannel => A)(onError: => Exception => Any): A = {
- import scala.collection.JavaConversions._
-
- val jrChannel = _rabbitChannel
- val exchange = owner.confModel.exchange
- val routingKey = confModel.routingKey
- val jrProps = new BasicProperties.Builder().headers(headers).build()
-
- pre(jrChannel)
- jrChannel.basicPublish(exchange, routingKey, jrProps, message.getBytes("UTF-8"))
- logger.debug("To exchange: '%s', routingKey: '%s' published message %s".format(exchange, routingKey, message))
- post(jrChannel)
- }
-
- def publishString(message: String, headers: Map[String, String] = Map()) = {
- _publish(message, headers){_ =>}{_ => ()} {_ => logger.error("publish() from producer %s".format())}
- }
-
-/* def publishStringWithConfirm(message: String, headers: Map[String, String] = Map()) = {
- _publish(message, headers) {
- _.confirmSelect() }{
- logger.debug("Waiting for confirmation")
- _.waitForConfirms() } {_ => logger.error("publishWithConfirm() from producer %s".format())}
- }
-*/
- override def toString = {
- "RabbitMQProducer(%s/%s/%s)".format(owner.owner.name, owner.name, name)
- }
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright 2011 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.messaging.amqp
-package rabbitmq
-package v091
-package confmodel
-
-import gr.grnet.aquarium.util.ConfModel
-import gr.grnet.aquarium.util.ConfModel.ConfModelError
-
-sealed trait RabbitMQConfModel extends ConfModel {
- def validateConfModel: List[ConfModelError] = Nil
-}
-
-case class RabbitMQConfigurationsModel(configurations: List[RabbitMQConfigurationModel]) extends RabbitMQConfModel
-
-case class RabbitMQConfigurationModel(
- name: String,
- username: String,
- password: String,
- host: String,
- port: Int,
- addresses: List[String],
- virtualHost: String,
- connections: List[RabbitMQConnectionModel]
-) extends RabbitMQConfModel {
-
- override def validateConfModel = {
- ConfModel.applyValidations(
- (() => host.size > 0 || addresses.size > 0, "At least one of host, addresses must be provided"),
- (() => connections.size > 0 , "At least one connection must be specified")
- )
- }
-}
-
-case class RabbitMQConnectionModel(
- name: String,
- exchange: String,
- exchangeType: String,
- isDurable: Boolean,
- producers: List[RabbitMQProducerModel],
- consumers: List[RabbitMQConsumerModel]
-) extends RabbitMQConfModel {
-
-}
-
-case class RabbitMQProducerModel(
- name: String,
- routingKey: String) extends RabbitMQConfModel
-
-case class RabbitMQConsumerModel(
- name: String,
- queue: String,
- routingKey: String,
- autoAck: Boolean,
- queueIsDurable: Boolean,
- queueIsExclusive: Boolean,
- queueIsAutoDelete: Boolean) extends RabbitMQConfModel
import com.thoughtworks.xstream.XStream
import com.ckkloverdos.maybe.{Failed, Just, Maybe}
import com.ckkloverdos.resource.StreamResource
-import gr.grnet.aquarium.messaging.amqp.rabbitmq.v091.confmodel._
import gr.grnet.aquarium.store.mongodb.confmodel._
}
def prepareXStreamAliases(xs: XStream): XStream = {
- // RabbitMQ
- prepareXStreamAlias[RabbitMQConfigurationsModel](xs)
- prepareXStreamAlias[RabbitMQConfigurationModel](xs)
- prepareXStreamAlias[RabbitMQConnectionModel](xs)
- prepareXStreamAlias[RabbitMQProducerModel](xs)
- prepareXStreamAlias[RabbitMQConsumerModel](xs)
-
// MongoDB
prepareXStreamAlias[MongoDBConnectionModel](xs)
prepareXStreamAlias[MongoDBCollectionModel](xs)
+++ /dev/null
-<RabbitMQConfigurationsModel>
- <configurations class="List">
- <!-- aquarium.dev.grnet.gr -->
- <RabbitMQConfigurationModel>
- <name>aquarium.dev.grnet.gr</name>
- <username>***</username>
- <password>***</password>
- <host>aquarium.dev.grnet.gr</host>
- <port>5672</port>
- <addresses class="Nil"/>
- <virtualHost>/</virtualHost>
- <connections class="List">
- <!-- test_connection -->
- <RabbitMQConnectionModel>
- <name>test_connection</name>
- <exchange>aquarium</exchange>
- <exchangeType>topic</exchangeType>
- <isDurable>true</isDurable>
- <producers class="List">
- <RabbitMQProducerModel>
- <name>test_producer</name>
- <routingKey>aquarium.test</routingKey>
- </RabbitMQProducerModel>
- </producers>
- <consumers class="List">
- <RabbitMQConsumerModel>
- <name>test_consumer</name>
- <queue>aquarium</queue>
- <routingKey>aquarium.test</routingKey>
- <autoAck>false</autoAck>
- <queueIsDurable>true</queueIsDurable>
- <queueIsAutoDelete>false</queueIsAutoDelete>
- <queueIsExclusive>false</queueIsExclusive>
- </RabbitMQConsumerModel>
- </consumers>
- </RabbitMQConnectionModel>
- </connections>
- </RabbitMQConfigurationModel>
-
- <!-- localhost -->
- <RabbitMQConfigurationModel>
- <name>localhost</name>
- <username>***</username>
- <password>***</password>
- <host>localhost</host>
- <port>5672</port>
- <addresses class="Nil"/>
- <virtualHost>/</virtualHost>
- <connections class="List">
- <!-- test_connection -->
- <RabbitMQConnectionModel>
- <name>test_connection</name>
- <exchange>aquarium</exchange>
- <exchangeType>topic</exchangeType>
- <isDurable>true</isDurable>
- <producers class="List">
- <RabbitMQProducerModel>
- <name>test_producer</name>
- <routingKey>aquarium.test</routingKey>
- </RabbitMQProducerModel>
- </producers>
- <consumers class="List">
- <RabbitMQConsumerModel>
- <name>test_consumer</name>
- <queue>aquarium</queue>
- <routingKey>aquarium.test</routingKey>
- <autoAck>false</autoAck>
- <queueIsDurable>true</queueIsDurable>
- <queueIsAutoDelete>false</queueIsAutoDelete>
- <queueIsExclusive>false</queueIsExclusive>
- </RabbitMQConsumerModel>
- </consumers>
- </RabbitMQConnectionModel>
-
- </connections>
- </RabbitMQConfigurationModel>
- </configurations>
-</RabbitMQConfigurationsModel>
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright 2011 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.messaging
-
-import amqp.AMQPDeliveryHandler
-import amqp.rabbitmq.v091.confmodel._
-import amqp.rabbitmq.v091.RabbitMQConfigurations
-import amqp.rabbitmq.v091.RabbitMQConfigurations.{PropFiles, RCFolders}
-import org.junit.Test
-import org.junit.Assert._
-import com.ckkloverdos.resource.DefaultResourceContext
-import gr.grnet.aquarium.util.xstream.XStreamHelpers
-import com.ckkloverdos.props.Props
-import com.ckkloverdos.maybe.{Failed, NoVal, Just}
-import org.junit.Assume._
-import gr.grnet.aquarium.{PropertyNames, LogicTestsAssumptions}
-import com.ckkloverdos.sys.SysProp
-import gr.grnet.aquarium.util.{LogUtils, Loggable}
-import java.util.Random
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-class RabbitMQTest extends Loggable {
-
- val xs = XStreamHelpers.DefaultXStream
- val baseRC = DefaultResourceContext
- val rabbitmqRC = baseRC / RCFolders.rabbitmq
-
- def getProp(logger: org.slf4j.Logger, msg: String, name: String, default: String): String = {
- SysProp(name).value match {
- case Just(value) =>
- logger.debug("[%s] Found value '%s' for system property '%s'".format(msg, value, name))
- value
- case Failed(e, m) =>
- logger.error("[%s][%s]: %s".format(m, e.getClass.getName, e.getMessage))
- logger.error("[%s] Error loading system property '%s'. Using default value '%s'".format(msg, name, default))
- default
- case NoVal =>
- logger.debug("[%s] Found no value for system property '%s'. Using default '%s'".format(msg, name, default))
- default
- }
- }
-
- // The configuration file we use for the rabbitmq tests
- lazy val RabbitMQPropFile = {
- val filename = LogUtils.getSysProp(logger, "Loading rabbitmq configurations", PropertyNames.RabbitMQConfFile, PropFiles.configurations)
- logger.debug("Using rabbitmq configurations from %s".format(filename))
- filename
- }
-
- // The specific setup we use.
- // This is defined in the configuration file
- lazy val RabbitMQSpecificConfName = {
- val confname = LogUtils.getSysProp(logger, "Getting specific rabbitmq configuration", PropertyNames.RabbitMQSpecificConf, Names.aquarium_dev_grnet_gr)
- logger.debug("Using specific rabbitmq configuration: %s".format(confname))
- confname
- }
-
- // The connection we use for testing
- lazy val RabbitMQConnectionName = {
- val conname = LogUtils.getSysProp(logger, "Getting rabbitmq connection", PropertyNames.RabbitMQConnection, Names.test_connection)
- logger.debug("Using rabbitmq connection %s".format(conname))
- conname
- }
-
- // The producer we use for testing
- lazy val RabbitMQProducerName = {
- val pname = LogUtils.getSysProp(logger, "Getting rabbitmq producer", PropertyNames.RabbitMQProducer, Names.test_producer)
- logger.debug("Using rabbitmq producer %s".format(pname))
- pname
- }
-
- // The producer we use for testing
- lazy val RabbitMQConsumerName = {
- val cname = LogUtils.getSysProp(logger, "Getting rabbitmq consumer", PropertyNames.RabbitMQConsumer, Names.test_consumer)
- logger.debug("Using rabbitmq producer %s".format(cname))
- cname
- }
-
- object Names {
-// val default_connection = "default_connection"
- val test_connection = "test_connection"
- val aquarium_dev_grnet_gr = "aquarium.dev.grnet.gr"
- val test_producer = "test_producer"
- val test_consumer = "test_consumer"
- }
-
- private def _genTestConf: String = {
- val consmod1 = new RabbitMQConsumerModel("consumer1", "queue1", "routing.key.all", true, true, false, false)
- val prodmod1 = new RabbitMQProducerModel("producer1", "routing.key.all")
- val conn1 = new RabbitMQConnectionModel(
- "local_connection",
- "aquarium_exchange",
- "direct",
- true,
- List(prodmod1),
- List(consmod1)
- )
- val conf1 = new RabbitMQConfigurationModel(
- "localhost_aquarium",
- "aquarium",
- "aquarium",
- "localhost",
- 5672,
- Nil,
- "/",
- List(conn1)
- )
-
- val model = new RabbitMQConfigurationsModel(List(conf1))
- val xs = XStreamHelpers.newXStream
- val xml = xs.toXML(model)
-
- xml
- }
-
- @Test
- def testConfigurationsExist {
- assertTrue(rabbitmqRC.getResource(RabbitMQPropFile).isJust)
- }
-
- @Test
- def testProducer {
- assumeTrue(LogicTestsAssumptions.EnableRabbitMQTests)
-
- val maybeResource = rabbitmqRC.getResource(RabbitMQPropFile)
- assertTrue(maybeResource.isJust)
- val maybeProducer = for {
- resource <- maybeResource
- confs <- RabbitMQConfigurations(resource, xs)
- conf <- confs.findConfiguration(RabbitMQSpecificConfName)
- conn <- conf.findConnection(RabbitMQConnectionName)
- producer <- conn.findProducer(RabbitMQProducerName)
- } yield {
- producer
- }
-
- logger.debug("Found producer %s".format(maybeProducer))
-
- maybeProducer match {
- case Just(producer) =>
- logger.debug("Using %s to publish a message".format(producer))
- val message = "Test message " + new java.util.Random(System.currentTimeMillis()).nextInt
- producer.publishString(message)
- logger.debug("Used %s to publish message %s".format(producer, message))
- case NoVal =>
- fail("No producer named %s".format(RabbitMQProducerName))
- case Failed(e, m) =>
- fail("%s: %s".format(m, e.getMessage))
- }
- }
-
- @Test
- def testConsumer {
- assumeTrue(LogicTestsAssumptions.EnableRabbitMQTests)
-
- val maybeResource = rabbitmqRC.getResource(RabbitMQPropFile)
- assertTrue(maybeResource.isJust)
-
- val maybeConsumer = for {
- resource <- maybeResource
- confs <- RabbitMQConfigurations(resource, xs)
- conf <- confs.findConfiguration(RabbitMQSpecificConfName)
- conn <- conf.findConnection(RabbitMQConnectionName)
- consumer <- conn.findConsumer(RabbitMQConsumerName)
- } yield {
- consumer
- }
-
- maybeConsumer match {
- case Just(consumer) =>
- logger.debug("Receiving a message from %s".format(consumer))
- val agent = consumer.newDeliveryAgent(new AMQPDeliveryHandler {
- def handleStringDelivery(envelope: Props, headers: Props, content: String): Boolean = {
- logger.debug("Received message with")
- logger.debug(" envelope: %s".format(envelope))
- logger.debug(" headers : %s".format(headers))
- logger.debug(" body : %s".format(content))
-
- true
- }
- })
- // wait until delivery
- agent.deliverNext
- case NoVal =>
- fail("No consumer named %s".format(RabbitMQConsumerName))
- case Failed(e, m) =>
- fail("%s: %s".format(m, e.getMessage))
- }
- }
-
-}
\ No newline at end of file