1 package gr.grnet.aquarium.connector.rabbitmq
3 import conf.RabbitMQConsumerConf
4 import conf.RabbitMQConsumerConf
5 import conf.{RabbitMQKeys, RabbitMQConsumerConf}
6 import conf.RabbitMQKeys.{RabbitMQConfKeys, RabbitMQConKeys}
7 import gr.grnet.aquarium._
8 import com.rabbitmq.client._
9 import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
10 import com.ckkloverdos.props.Props
11 import gr.grnet.aquarium.converter.StdConverters
12 import gr.grnet.aquarium.util.{Lock, Tags}
13 import gr.grnet.aquarium.store.memory.MemStoreProvider
15 import com.ckkloverdos.resource.FileStreamResource
20 * Copyright 2011-2012 GRNET S.A. All rights reserved.
22 * Redistribution and use in source and binary forms, with or
23 * without modification, are permitted provided that the following
26 * 1. Redistributions of source code must retain the above
27 * copyright notice, this list of conditions and the following
30 * 2. Redistributions in binary form must reproduce the above
31 * copyright notice, this list of conditions and the following
32 * disclaimer in the documentation and/or other materials
33 * provided with the distribution.
35 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
36 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
37 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
38 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
39 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
40 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
41 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
42 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
43 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
44 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
45 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
46 * POSSIBILITY OF SUCH DAMAGE.
48 * The views and conclusions contained in the software and
49 * documentation are those of the authors and should not be
50 * interpreted as representing official policies, either expressed
51 * or implied, of GRNET S.A.
56 * @author Prodromos Gerakios <pgerakios@grnet.gr>
59 class RabbitMQProducer extends Configurable {
60 private[this] var _conf: RabbitMQConsumerConf = _
61 private[this] var _factory: ConnectionFactory = _
62 private[this] var _connection: Connection = _
63 private[this] var _channel: Channel = _
64 private[this] var _servers : Array[Address] = _
65 private[this] final val lock = new Lock()
66 private[this] var _exchangeName : String = _
67 private[this] var _routingKey :String = _
69 def propertyPrefix: Option[String] = Some(RabbitMQKeys.PropertiesPrefix)
70 // Some(RabbitMQConfKeys.imevents_credit)
73 def configure(props: Props): Unit = {
74 val propName = RabbitMQConfKeys.imevents_credit
75 def exn () = throw new AquariumInternalError(new Exception, "While obtaining value for key %s in properties".format(propName))
76 val prop = props.get(propName).getOr(exn())
77 if (prop.isEmpty) exn()
78 val connectionConf = RabbitMQKeys.makeConnectionConf(props)
79 val Array(exchangeName, routingKey) = prop.split(":")
80 _exchangeName = exchangeName
81 _routingKey = routingKey
82 _factory = new ConnectionFactory
83 _factory.setConnectionTimeout(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
84 _factory.setUsername(connectionConf(RabbitMQConKeys.username))
85 _factory.setPassword(connectionConf(RabbitMQConKeys.password))
86 _factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
87 _factory.setRequestedHeartbeat(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
88 _servers = connectionConf(RabbitMQConKeys.servers)
91 private[this] def withChannel[A]( next : => A) = {
94 if (_connection == null ||_connection.isOpen == false )
95 _connection =_factory.newConnection(_servers)
96 if (_channel == null ||_channel.isOpen == false )
97 _channel = _connection.createChannel
98 assert(_connection.isOpen && _channel.isOpen)
107 def sendMessage(payload:String) =
109 _channel.basicPublish(_exchangeName,_routingKey,
110 MessageProperties.PERSISTENT_TEXT_PLAIN,