1 package gr.grnet.aquarium.connector.rabbitmq
3 import conf.{RabbitMQKeys, RabbitMQConsumerConf}
4 import conf.RabbitMQKeys.{RabbitMQConfKeys, RabbitMQConKeys}
5 import gr.grnet.aquarium._
6 import com.rabbitmq.client._
7 import com.ckkloverdos.props.Props
8 import gr.grnet.aquarium.converter.StdConverters
9 import util.{Loggable, Lock}
10 import gr.grnet.aquarium.store.memory.MemStoreProvider
12 import com.ckkloverdos.resource.FileStreamResource
14 import collection.immutable.{TreeMap, TreeSet}
15 import java.util.concurrent.atomic.AtomicLong
16 import akka.actor.{Actor, ActorRef}
17 import com.google.common.eventbus.Subscribe
18 import gr.grnet.aquarium.service.event.AquariumCreatedEvent
19 import collection.mutable
23 * Copyright 2011-2012 GRNET S.A. All rights reserved.
25 * Redistribution and use in source and binary forms, with or
26 * without modification, are permitted provided that the following
29 * 1. Redistributions of source code must retain the above
30 * copyright notice, this list of conditions and the following
33 * 2. Redistributions in binary form must reproduce the above
34 * copyright notice, this list of conditions and the following
35 * disclaimer in the documentation and/or other materials
36 * provided with the distribution.
38 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
39 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
40 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
41 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
42 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
43 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
44 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
45 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
46 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
47 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
48 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
49 * POSSIBILITY OF SUCH DAMAGE.
51 * The views and conclusions contained in the software and
52 * documentation are those of the authors and should not be
53 * interpreted as representing official policies, either expressed
54 * or implied, of GRNET S.A.
57 private class RabbitMQProducerActor extends Actor {
59 case sendMessage: Function0[_] =>
60 //Console.err.println("Executing msg ... " + sendMessage.hashCode)
61 sendMessage.asInstanceOf[()=>Unit]()
63 //Console.err.println("Dammit ..." + x.getClass.getSuperclass.getName)
64 throw new Exception("Unexpected value in RabbitMQProducerActor with type: " +
65 x.getClass.getSuperclass.getName)
71 * @author Prodromos Gerakios <pgerakios@grnet.gr>
74 class RabbitMQProducer extends AquariumAwareSkeleton with Configurable with Loggable {
75 private[this] var _conf: RabbitMQConsumerConf = _
76 private[this] var _factory: ConnectionFactory = _
77 private[this] var _connection: Connection = _
78 private[this] var _channel: Channel = _
79 private[this] var _servers : Array[Address] = _
80 private[this] final val lock = new Lock()
82 def propertyPrefix: Option[String] = Some(RabbitMQKeys.PropertiesPrefix)
83 // Some(RabbitMQConfKeys.imevents_credit)
86 @volatile private[this] var _unsentMessages = mutable.Queue[()=>Unit]()
87 @volatile private[this] var _unconfirmedSet = new TreeSet[Long]()
88 @volatile private[this] var _unconfirmedMessages = new TreeMap[Long,()=>Unit]()
91 @volatile private[this] var _actorRef : ActorRef = _
92 private[this] var _resendPeriodMillis = 1000L
96 override def awareOfAquarium(event: AquariumCreatedEvent) = {
97 super.awareOfAquarium(event)
98 assert(aquarium!=null && aquarium.akkaService != null)
99 resendMessages // start our daemon
102 private[this] def resendMessages() : Unit = {
103 aquarium.timerService.scheduleOnce(
104 "RabbitMQProducer.resendMessages",
106 //Console.err.println("RabbitMQProducer Timer ...")
107 if(_actorRef==null) {
108 _actorRef = aquarium.akkaService.createNamedActor[RabbitMQProducerActor]("RabbitMQProducerActor")
110 if(_actorRef != null){
111 // Console.err.println("RabbitMQProducer Timer --> messages ...")
112 var msgs : mutable.Queue[()=>Unit] = null
115 msgs = _unsentMessages
116 _unsentMessages = mutable.Queue[()=>Unit]()
120 //if(msgs.length>0) Console.err.println("RabbitMQProducer Timer --> messages ..." + msgs.length)
122 // Console.err.println("RabbitMQProducer Timer sending message .." + msg.hashCode)
123 _actorRef ! (msg:()=>Unit)
127 //Console.err.println("Akka ActorSystem is null. Waiting ...")
131 this._resendPeriodMillis,
137 def configure(props: Props): Unit = {
138 val connectionConf = RabbitMQKeys.makeConnectionConf(props)
139 _factory = new ConnectionFactory
140 _factory.setConnectionTimeout(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
141 _factory.setUsername(connectionConf(RabbitMQConKeys.username))
142 _factory.setPassword(connectionConf(RabbitMQConKeys.password))
143 _factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
144 _factory.setRequestedHeartbeat(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
145 _servers = connectionConf(RabbitMQConKeys.servers)
146 _connection =_factory.newConnection(_servers)
147 _channel = _connection.createChannel
148 _channel.confirmSelect
149 _channel.addConfirmListener(new ConfirmListener {
151 private [this] def cutSubset(seqNo:Long,multiple:Boolean) : TreeMap[Long,()=>Unit] =
153 val set = if (multiple)
154 _unconfirmedSet.range(0,seqNo+1)
156 _unconfirmedSet.range(seqNo,seqNo)
157 _unconfirmedSet = _unconfirmedSet -- set
158 val ret : TreeMap[Long,()=>Unit] = set.foldLeft(TreeMap[Long,()=>Unit]())({(map,seq)=>
159 _unconfirmedMessages.get(seq) match{
161 case Some(s) => map + ((seq,s))
163 _unconfirmedMessages = _unconfirmedMessages -- set
167 def handleAck(seqNo:Long,multiple:Boolean) = {
168 //Console.err.println("Received ack for " + seqNo)
169 cutSubset(seqNo,multiple)
172 def handleNack(seqNo:Long,multiple:Boolean) = {
173 //Console.err.println("Received Nack for msg for " + seqNo)
174 for {(_,msg) <- cutSubset(seqNo,multiple)} _actorRef ! (msg:()=>Unit)
179 private[this] def isChannelOpen: Boolean =
181 if (_connection == null ||_connection.isOpen == false )
182 _connection =_factory.newConnection(_servers)
183 if (_channel == null ||_channel.isOpen == false )
184 _channel = _connection.createChannel
185 _connection.isOpen && _channel.isOpen
188 def fun(s:String) : () => Unit = {
192 def sendMessage(exchangeName:String,routingKey:String,payload:String) = {
197 var seq : Long = _channel.getNextPublishSeqNo()
198 _unconfirmedSet += seq
199 _unconfirmedMessages += ((seq,msg))
200 _channel.basicPublish(exchangeName,routingKey,
201 MessageProperties.PERSISTENT_TEXT_PLAIN,
203 //Console.err.println("####Sent message " + payload + " with seqno=" + seq)
205 _unsentMessages += msg
206 //Console.err.println("####Channel closed!")
210 _unsentMessages += msg
214 if(_actorRef != null)
215 _actorRef ! (msg:()=>Unit)
217 lock.withLock(_unsentMessages += msg)
221 object RabbitMQProducer {
222 def main(args: Array[String]) = {
223 val propsfile = new FileStreamResource(new File("aquarium.properties"))
224 var _props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
225 val aquarium = new AquariumBuilder(_props, ResourceLocator.DefaultPolicyModel).
226 update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
227 update(Aquarium.EnvKeys.eventsStoreFolder, Some(new File(".."))).
232 //RabbitMQProducer.wait(1000)
233 val propName = RabbitMQConfKeys.imevents_credit
235 throw new AquariumInternalError(new Exception, "While obtaining value for key %s in properties".format(propName))
236 val prop = _props.get(propName).getOr(exn())
237 if (prop.isEmpty) exn()
238 val Array(exchangeName, routingKey) = prop.split(":")
239 aquarium(Aquarium.EnvKeys.rabbitMQProducer).sendMessage(exchangeName,routingKey,"Test string !!!!")
240 Console.err.println("Message sent")