1 package gr.grnet.aquarium.connector.rabbitmq
3 import conf.{RabbitMQKeys, RabbitMQConsumerConf}
4 import conf.RabbitMQKeys.{RabbitMQConfKeys, RabbitMQConKeys}
5 import gr.grnet.aquarium._
6 import com.rabbitmq.client._
7 import com.ckkloverdos.props.Props
8 import gr.grnet.aquarium.converter.StdConverters
9 import gr.grnet.aquarium.util.Lock
10 import gr.grnet.aquarium.store.memory.MemStoreProvider
12 import com.ckkloverdos.resource.FileStreamResource
14 import collection.immutable.{TreeMap, TreeSet}
18 * Copyright 2011-2012 GRNET S.A. All rights reserved.
20 * Redistribution and use in source and binary forms, with or
21 * without modification, are permitted provided that the following
24 * 1. Redistributions of source code must retain the above
25 * copyright notice, this list of conditions and the following
28 * 2. Redistributions in binary form must reproduce the above
29 * copyright notice, this list of conditions and the following
30 * disclaimer in the documentation and/or other materials
31 * provided with the distribution.
33 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
34 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
35 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
36 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
37 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
38 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
39 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
40 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
41 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
43 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
44 * POSSIBILITY OF SUCH DAMAGE.
46 * The views and conclusions contained in the software and
47 * documentation are those of the authors and should not be
48 * interpreted as representing official policies, either expressed
49 * or implied, of GRNET S.A.
54 * @author Prodromos Gerakios <pgerakios@grnet.gr>
57 class RabbitMQProducer extends Configurable {
58 private[this] var _conf: RabbitMQConsumerConf = _
59 private[this] var _factory: ConnectionFactory = _
60 private[this] var _connection: Connection = _
61 private[this] var _channel: Channel = _
62 private[this] var _servers : Array[Address] = _
63 private[this] final val lock = new Lock()
64 private[this] var _exchangeName : String = _
65 private[this] var _routingKey :String = _
67 def propertyPrefix: Option[String] = Some(RabbitMQKeys.PropertiesPrefix)
68 // Some(RabbitMQConfKeys.imevents_credit)
71 @volatile private[this] var _unconfirmedSet = new TreeSet[Long]()
72 @volatile private[this] var _unconfirmedMessages = new TreeMap[Long,String]()
74 def configure(props: Props): Unit = {
75 val propName = RabbitMQConfKeys.imevents_credit
76 def exn () = throw new AquariumInternalError(new Exception, "While obtaining value for key %s in properties".format(propName))
77 val prop = props.get(propName).getOr(exn())
78 if (prop.isEmpty) exn()
79 val connectionConf = RabbitMQKeys.makeConnectionConf(props)
80 val Array(exchangeName, routingKey) = prop.split(":")
81 _exchangeName = exchangeName
82 _routingKey = routingKey
83 _factory = new ConnectionFactory
84 _factory.setConnectionTimeout(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
85 _factory.setUsername(connectionConf(RabbitMQConKeys.username))
86 _factory.setPassword(connectionConf(RabbitMQConKeys.password))
87 _factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
88 _factory.setRequestedHeartbeat(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
89 _servers = connectionConf(RabbitMQConKeys.servers)
90 _connection =_factory.newConnection(_servers)
91 _channel = _connection.createChannel
92 _channel.confirmSelect
93 _channel.addConfirmListener(new ConfirmListener {
95 private [this] def subset(seqNo:Long,multiple:Boolean) : TreeMap[Long,String] = {
96 val set = if (multiple)
97 _unconfirmedSet.range(0,seqNo+1)
99 _unconfirmedSet.range(seqNo,seqNo)
100 _unconfirmedSet = _unconfirmedSet -- set
101 val ret : TreeMap[Long,String] = set.foldLeft(TreeMap[Long,String]())({(map,seq)=>
102 _unconfirmedMessages.get(seq) match{
104 case Some(s) => map + ((seq,s))
106 _unconfirmedMessages = _unconfirmedMessages -- set
111 def handleAck(seqNo:Long,multiple:Boolean) = {
113 Console.err.println("Received ack for msg " + _unconfirmedMessages.get(seqNo) )
114 subset(seqNo,multiple)
118 def handleNack(seqNo:Long,multiple:Boolean) = {
120 Console.err.println("Received Nack for msg " + _unconfirmedMessages.get(seqNo) )
121 for { (_,msg) <- subset(seqNo,multiple) }
128 private[this] def withChannel[A]( next : => A) = {
131 if (_connection == null ||_connection.isOpen == false )
132 _connection =_factory.newConnection(_servers)
133 if (_channel == null ||_channel.isOpen == false )
134 _channel = _connection.createChannel
135 assert(_connection.isOpen && _channel.isOpen)
144 def sendMessage(payload:String) =
146 var seq : Long = _channel.getNextPublishSeqNo()
147 _unconfirmedSet += seq
148 _unconfirmedMessages += ((seq,payload))
149 _channel.basicPublish(_exchangeName,_routingKey,
150 MessageProperties.PERSISTENT_TEXT_PLAIN,
155 object RabbitMQProducer {
156 def main(args: Array[String]) = {
157 val propsfile = new FileStreamResource(new File("aquarium.properties"))
158 var _props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
159 val aquarium = new AquariumBuilder(_props, ResourceLocator.DefaultPolicyModel).
160 update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
161 update(Aquarium.EnvKeys.eventsStoreFolder, Some(new File(".."))).
163 aquarium(Aquarium.EnvKeys.rabbitMQProducer).sendMessage("Test string !!!!")
164 Console.err.println("Message sent")