1 package gr.grnet.aquarium.connector.rabbitmq
3 import conf.RabbitMQConsumerConf
4 import conf.RabbitMQConsumerConf
5 import conf.{RabbitMQKeys, RabbitMQConsumerConf}
6 import conf.RabbitMQKeys.{RabbitMQConfKeys, RabbitMQConKeys}
7 import gr.grnet.aquarium._
8 import com.rabbitmq.client._
9 import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
10 import com.ckkloverdos.props.Props
11 import gr.grnet.aquarium.converter.StdConverters
12 import gr.grnet.aquarium.util.{Lock, Tags}
13 import gr.grnet.aquarium.store.memory.MemStoreProvider
15 import com.ckkloverdos.resource.FileStreamResource
17 import collection.immutable.{TreeMap, SortedSet, TreeSet}
18 import java.util.Collections
22 * Copyright 2011-2012 GRNET S.A. All rights reserved.
24 * Redistribution and use in source and binary forms, with or
25 * without modification, are permitted provided that the following
28 * 1. Redistributions of source code must retain the above
29 * copyright notice, this list of conditions and the following
32 * 2. Redistributions in binary form must reproduce the above
33 * copyright notice, this list of conditions and the following
34 * disclaimer in the documentation and/or other materials
35 * provided with the distribution.
37 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
38 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
39 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
40 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
41 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
42 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
43 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
44 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
45 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
46 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
47 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
48 * POSSIBILITY OF SUCH DAMAGE.
50 * The views and conclusions contained in the software and
51 * documentation are those of the authors and should not be
52 * interpreted as representing official policies, either expressed
53 * or implied, of GRNET S.A.
58 * @author Prodromos Gerakios <pgerakios@grnet.gr>
61 class RabbitMQProducer extends Configurable {
62 private[this] var _conf: RabbitMQConsumerConf = _
63 private[this] var _factory: ConnectionFactory = _
64 private[this] var _connection: Connection = _
65 private[this] var _channel: Channel = _
66 private[this] var _servers : Array[Address] = _
67 private[this] final val lock = new Lock()
68 private[this] var _exchangeName : String = _
69 private[this] var _routingKey :String = _
71 def propertyPrefix: Option[String] = Some(RabbitMQKeys.PropertiesPrefix)
72 // Some(RabbitMQConfKeys.imevents_credit)
75 @volatile private[this] var _unconfirmedSet = new TreeSet[Long]()
76 @volatile private[this] var _unconfirmedMessages = new TreeMap[Long,String]()
78 def configure(props: Props): Unit = {
79 val propName = RabbitMQConfKeys.imevents_credit
80 def exn () = throw new AquariumInternalError(new Exception, "While obtaining value for key %s in properties".format(propName))
81 val prop = props.get(propName).getOr(exn())
82 if (prop.isEmpty) exn()
83 val connectionConf = RabbitMQKeys.makeConnectionConf(props)
84 val Array(exchangeName, routingKey) = prop.split(":")
85 _exchangeName = exchangeName
86 _routingKey = routingKey
87 _factory = new ConnectionFactory
88 _factory.setConnectionTimeout(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
89 _factory.setUsername(connectionConf(RabbitMQConKeys.username))
90 _factory.setPassword(connectionConf(RabbitMQConKeys.password))
91 _factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
92 _factory.setRequestedHeartbeat(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
93 _servers = connectionConf(RabbitMQConKeys.servers)
94 _connection =_factory.newConnection(_servers)
95 _channel = _connection.createChannel
96 _channel.confirmSelect
97 _channel.addConfirmListener(new ConfirmListener {
99 private [this] def subset(seqNo:Long,multiple:Boolean) : TreeMap[Long,String] = {
100 val set = if (multiple)
101 _unconfirmedSet.range(0,seqNo+1)
103 _unconfirmedSet.range(seqNo,seqNo)
104 _unconfirmedSet = _unconfirmedSet -- set
105 val ret : TreeMap[Long,String] = set.foldLeft(TreeMap[Long,String]())({(map,seq)=>
106 _unconfirmedMessages.get(seq) match{
108 case Some(s) => map + ((seq,s))
110 _unconfirmedMessages = _unconfirmedMessages -- set
115 def handleAck(seqNo:Long,multiple:Boolean) = {
117 Console.err.println("Received ack for msg " + _unconfirmedMessages.get(seqNo) )
118 subset(seqNo,multiple)
122 def handleNack(seqNo:Long,multiple:Boolean) = {
124 Console.err.println("Received Nack for msg " + _unconfirmedMessages.get(seqNo) )
125 for { (_,msg) <- subset(seqNo,multiple) }
132 private[this] def withChannel[A]( next : => A) = {
135 if (_connection == null ||_connection.isOpen == false )
136 _connection =_factory.newConnection(_servers)
137 if (_channel == null ||_channel.isOpen == false )
138 _channel = _connection.createChannel
139 assert(_connection.isOpen && _channel.isOpen)
148 def sendMessage(payload:String) =
150 var seq : Long = _channel.getNextPublishSeqNo()
151 _unconfirmedSet += seq
152 _unconfirmedMessages += ((seq,payload))
153 _channel.basicPublish(_exchangeName,_routingKey,
154 MessageProperties.PERSISTENT_TEXT_PLAIN,
159 object RabbitMQProducer {
160 def main(args: Array[String]) = {
161 val propsfile = new FileStreamResource(new File("aquarium.properties"))
162 var _props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
163 val aquarium = new AquariumBuilder(_props, ResourceLocator.DefaultPolicyModel).
164 update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
165 update(Aquarium.EnvKeys.eventsStoreFolder, Some(new File(".."))).
167 aquarium(Aquarium.EnvKeys.rabbitMQProducer).sendMessage("Test string !!!!")
168 Console.err.println("Message sent")