2c70a6676c22c8f3d34a54db21fa97d955eaa90a
[aquarium] / src / main / scala / gr / grnet / aquarium / connector / rabbitmq / RabbitMQProducer.scala
1 package gr.grnet.aquarium.connector.rabbitmq
2
3 import conf.{RabbitMQKeys, RabbitMQConsumerConf}
4 import conf.RabbitMQKeys.{RabbitMQConfKeys, RabbitMQConKeys}
5 import gr.grnet.aquarium._
6 import com.rabbitmq.client._
7 import com.ckkloverdos.props.Props
8 import gr.grnet.aquarium.converter.StdConverters
9 import util.{Loggable, Lock}
10 import gr.grnet.aquarium.store.memory.MemStoreProvider
11 import java.io.File
12 import com.ckkloverdos.resource.FileStreamResource
13 import scala.Some
14 import collection.immutable.{TreeMap, TreeSet}
15 import java.util.concurrent.atomic.AtomicLong
16 import akka.actor.{Actor, ActorRef}
17 import com.google.common.eventbus.Subscribe
18 import gr.grnet.aquarium.service.event.AquariumCreatedEvent
19 import collection.mutable
20
21
22 /*
23  * Copyright 2011-2012 GRNET S.A. All rights reserved.
24  *
25  * Redistribution and use in source and binary forms, with or
26  * without modification, are permitted provided that the following
27  * conditions are met:
28  *
29  *   1. Redistributions of source code must retain the above
30  *      copyright notice, this list of conditions and the following
31  *      disclaimer.
32  *
33  *   2. Redistributions in binary form must reproduce the above
34  *      copyright notice, this list of conditions and the following
35  *      disclaimer in the documentation and/or other materials
36  *      provided with the distribution.
37  *
38  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
39  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
40  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
41  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
42  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
43  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
44  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
45  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
46  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
47  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
48  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
49  * POSSIBILITY OF SUCH DAMAGE.
50  *
51  * The views and conclusions contained in the software and
52  * documentation are those of the authors and should not be
53  * interpreted as representing official policies, either expressed
54  * or implied, of GRNET S.A.
55  */
56
57 private class RabbitMQProducerActor extends Actor {
58   def receive = {
59     case sendMessage: Function0[_] =>
60       //Console.err.println("Executing msg ... " + sendMessage.hashCode)
61       sendMessage.asInstanceOf[()=>Unit]()
62     case x : AnyRef =>
63       //Console.err.println("Dammit  ..." + x.getClass.getSuperclass.getName)
64       throw new Exception("Unexpected value in RabbitMQProducerActor with type: " +
65                           x.getClass.getSuperclass.getName)
66   }
67 }
68
69 /**
70  *
71  * @author Prodromos Gerakios <pgerakios@grnet.gr>
72  */
73
74 class RabbitMQProducer extends AquariumAwareSkeleton with Configurable with Loggable  {
75   private[this] var _conf: RabbitMQConsumerConf = _
76   private[this] var _factory: ConnectionFactory = _
77   private[this] var _connection: Connection = _
78   private[this] var _channel: Channel = _
79   private[this] var _servers : Array[Address] = _
80   private[this] final val lock = new Lock()
81
82   def propertyPrefix: Option[String] = Some(RabbitMQKeys.PropertiesPrefix)
83   //  Some(RabbitMQConfKeys.imevents_credit)
84
85
86   @volatile private[this] var _unsentMessages = mutable.Queue[()=>Unit]()
87   @volatile private[this] var _unconfirmedSet = new TreeSet[Long]()
88   @volatile private[this] var _unconfirmedMessages = new TreeMap[Long,()=>Unit]()
89
90
91   @volatile private[this] var _actorRef : ActorRef = _
92   private[this] var _resendPeriodMillis = 1000L
93
94
95   @Subscribe
96   override def awareOfAquarium(event: AquariumCreatedEvent) = {
97     super.awareOfAquarium(event)
98     assert(aquarium!=null && aquarium.akkaService != null)
99     resendMessages        // start our daemon
100   }
101
102   private[this] def resendMessages() : Unit = {
103     aquarium.timerService.scheduleOnce(
104     "RabbitMQProducer.resendMessages",
105     {
106       //Console.err.println("RabbitMQProducer Timer ...")
107       if(_actorRef==null) {
108         _actorRef =  aquarium.akkaService.createNamedActor[RabbitMQProducerActor]("RabbitMQProducerActor")
109       }
110       if(_actorRef != null){
111       // Console.err.println("RabbitMQProducer Timer --> messages ...")
112        var msgs : mutable.Queue[()=>Unit] = null
113        lock.withLock {
114           if(isChannelOpen) {
115             msgs  = _unsentMessages
116             _unsentMessages = mutable.Queue[()=>Unit]()
117           }
118        }
119        if(msgs!=null){
120          //if(msgs.length>0) Console.err.println("RabbitMQProducer Timer --> messages ..." + msgs.length)
121          for {msg <- msgs} {
122           // Console.err.println("RabbitMQProducer Timer sending message .." + msg.hashCode)
123            _actorRef ! (msg:()=>Unit)
124          }
125        }
126       } else {
127         //Console.err.println("Akka ActorSystem is null. Waiting ...")
128       }
129       resendMessages()
130     },
131     this._resendPeriodMillis,
132     true
133     )
134     ()
135   }
136
137   def configure(props: Props): Unit = {
138     val connectionConf = RabbitMQKeys.makeConnectionConf(props)
139     _factory = new ConnectionFactory
140     _factory.setConnectionTimeout(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
141     _factory.setUsername(connectionConf(RabbitMQConKeys.username))
142     _factory.setPassword(connectionConf(RabbitMQConKeys.password))
143     _factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
144     _factory.setRequestedHeartbeat(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
145     _servers = connectionConf(RabbitMQConKeys.servers)
146     _connection =_factory.newConnection(_servers)
147     _channel = _connection.createChannel
148     _channel.confirmSelect
149     _channel.addConfirmListener(new ConfirmListener {
150
151       private [this] def cutSubset(seqNo:Long,multiple:Boolean) : TreeMap[Long,()=>Unit] =
152         lock.withLock {
153          val set = if (multiple)
154                     _unconfirmedSet.range(0,seqNo+1)
155                    else
156                     _unconfirmedSet.range(seqNo,seqNo)
157          _unconfirmedSet = _unconfirmedSet -- set
158          val ret : TreeMap[Long,()=>Unit] = set.foldLeft(TreeMap[Long,()=>Unit]())({(map,seq)=>
159            _unconfirmedMessages.get(seq) match{
160              case None => map
161              case Some(s) => map + ((seq,s))
162          }})
163          _unconfirmedMessages = _unconfirmedMessages -- set
164         ret
165        }
166
167       def handleAck(seqNo:Long,multiple:Boolean) = {
168         //Console.err.println("Received ack for  " + seqNo)
169         cutSubset(seqNo,multiple)
170       }
171
172       def handleNack(seqNo:Long,multiple:Boolean) = {
173         //Console.err.println("Received Nack for msg for " + seqNo)
174         for {(_,msg) <- cutSubset(seqNo,multiple)} _actorRef ! (msg:()=>Unit)
175       }
176     })
177   }
178
179   private[this] def isChannelOpen: Boolean =
180     lock.withLock{
181     if (_connection == null ||_connection.isOpen == false )
182       _connection =_factory.newConnection(_servers)
183     if (_channel == null ||_channel.isOpen == false )
184       _channel = _connection.createChannel
185     _connection.isOpen && _channel.isOpen
186     }
187
188   def fun(s:String)  : () => Unit = {
189      () => {}
190   }
191
192   def sendMessage(exchangeName:String,routingKey:String,payload:String) = {
193     def msg () : Unit =
194       lock.withLock {
195         try {
196             if(isChannelOpen) {
197               var seq : Long = _channel.getNextPublishSeqNo()
198               _unconfirmedSet += seq
199               _unconfirmedMessages += ((seq,msg))
200               _channel.basicPublish(exchangeName,routingKey,
201                 MessageProperties.PERSISTENT_TEXT_PLAIN,
202                 payload.getBytes)
203                //Console.err.println("####Sent message " + payload + " with seqno=" + seq)
204             } else {
205               _unsentMessages += msg
206                //Console.err.println("####Channel closed!")
207              }
208         } catch {
209             case e: Exception =>
210               _unsentMessages += msg
211               //e.printStackTrace
212         }
213       }
214     if(_actorRef != null)
215       _actorRef ! (msg:()=>Unit)
216     else
217       lock.withLock(_unsentMessages += msg)
218  }
219 }
220
221 object RabbitMQProducer  {
222   def main(args: Array[String]) = {
223     val propsfile = new FileStreamResource(new File("aquarium.properties"))
224     var _props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
225     val aquarium = new AquariumBuilder(_props, ResourceLocator.DefaultPolicyModel).
226     update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
227     update(Aquarium.EnvKeys.eventsStoreFolder, Some(new File(".."))).
228     build()
229
230     aquarium.start()
231
232     //RabbitMQProducer.wait(1000)
233     val propName = RabbitMQConfKeys.imevents_credit
234     def exn () =
235       throw new AquariumInternalError(new Exception, "While obtaining value for key %s in properties".format(propName))
236     val prop = _props.get(propName).getOr(exn())
237     if (prop.isEmpty) exn()
238     val Array(exchangeName, routingKey) = prop.split(":")
239     aquarium(Aquarium.EnvKeys.rabbitMQProducer).sendMessage(exchangeName,routingKey,"Test string !!!!")
240     Console.err.println("Message sent")
241     ()
242   }
243 }