Reliable message passing for RabbitMQProducer
[aquarium] / src / main / scala / gr / grnet / aquarium / connector / rabbitmq / RabbitMQProducer.scala
1 package gr.grnet.aquarium.connector.rabbitmq
2
3 import conf.RabbitMQConsumerConf
4 import conf.RabbitMQConsumerConf
5 import conf.{RabbitMQKeys, RabbitMQConsumerConf}
6 import conf.RabbitMQKeys.{RabbitMQConfKeys, RabbitMQConKeys}
7 import gr.grnet.aquarium._
8 import com.rabbitmq.client._
9 import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
10 import com.ckkloverdos.props.Props
11 import gr.grnet.aquarium.converter.StdConverters
12 import gr.grnet.aquarium.util.{Lock, Tags}
13 import gr.grnet.aquarium.store.memory.MemStoreProvider
14 import java.io.File
15 import com.ckkloverdos.resource.FileStreamResource
16 import scala.Some
17 import collection.immutable.{TreeMap, SortedSet, TreeSet}
18 import java.util.Collections
19
20
21 /*
22  * Copyright 2011-2012 GRNET S.A. All rights reserved.
23  *
24  * Redistribution and use in source and binary forms, with or
25  * without modification, are permitted provided that the following
26  * conditions are met:
27  *
28  *   1. Redistributions of source code must retain the above
29  *      copyright notice, this list of conditions and the following
30  *      disclaimer.
31  *
32  *   2. Redistributions in binary form must reproduce the above
33  *      copyright notice, this list of conditions and the following
34  *      disclaimer in the documentation and/or other materials
35  *      provided with the distribution.
36  *
37  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
38  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
39  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
40  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
41  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
42  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
43  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
44  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
45  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
46  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
47  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
48  * POSSIBILITY OF SUCH DAMAGE.
49  *
50  * The views and conclusions contained in the software and
51  * documentation are those of the authors and should not be
52  * interpreted as representing official policies, either expressed
53  * or implied, of GRNET S.A.
54  */
55
56 /**
57  *
58  * @author Prodromos Gerakios <pgerakios@grnet.gr>
59  */
60
61 class RabbitMQProducer extends Configurable {
62   private[this] var _conf: RabbitMQConsumerConf = _
63   private[this] var _factory: ConnectionFactory = _
64   private[this] var _connection: Connection = _
65   private[this] var _channel: Channel = _
66   private[this] var _servers : Array[Address] = _
67   private[this] final val lock = new Lock()
68   private[this] var _exchangeName : String = _
69   private[this] var _routingKey :String = _
70
71   def propertyPrefix: Option[String] = Some(RabbitMQKeys.PropertiesPrefix)
72   //  Some(RabbitMQConfKeys.imevents_credit)
73
74
75   @volatile private[this] var _unconfirmedSet = new TreeSet[Long]()
76   @volatile private[this] var _unconfirmedMessages = new TreeMap[Long,String]()
77
78   def configure(props: Props): Unit = {
79     val propName = RabbitMQConfKeys.imevents_credit
80     def exn () = throw new AquariumInternalError(new Exception, "While obtaining value for key %s in properties".format(propName))
81     val prop = props.get(propName).getOr(exn())
82     if (prop.isEmpty) exn()
83     val connectionConf = RabbitMQKeys.makeConnectionConf(props)
84     val Array(exchangeName, routingKey) = prop.split(":")
85     _exchangeName = exchangeName
86     _routingKey = routingKey
87     _factory = new ConnectionFactory
88     _factory.setConnectionTimeout(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
89     _factory.setUsername(connectionConf(RabbitMQConKeys.username))
90     _factory.setPassword(connectionConf(RabbitMQConKeys.password))
91     _factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
92     _factory.setRequestedHeartbeat(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
93     _servers = connectionConf(RabbitMQConKeys.servers)
94     _connection =_factory.newConnection(_servers)
95     _channel = _connection.createChannel
96     _channel.confirmSelect
97     _channel.addConfirmListener(new ConfirmListener {
98
99       private [this] def subset(seqNo:Long,multiple:Boolean) : TreeMap[Long,String] = {
100          val set = if (multiple)
101                     _unconfirmedSet.range(0,seqNo+1)
102                    else
103                     _unconfirmedSet.range(seqNo,seqNo)
104          _unconfirmedSet = _unconfirmedSet -- set
105          val ret : TreeMap[Long,String] = set.foldLeft(TreeMap[Long,String]())({(map,seq)=>
106            _unconfirmedMessages.get(seq) match{
107              case None => map
108              case Some(s) => map + ((seq,s))
109          }})
110          _unconfirmedMessages = _unconfirmedMessages -- set
111         ret
112        }
113
114
115       def handleAck(seqNo:Long,multiple:Boolean) = {
116         withChannel {
117           Console.err.println("Received ack for msg " + _unconfirmedMessages.get(seqNo) )
118           subset(seqNo,multiple)
119         }
120       }
121
122       def handleNack(seqNo:Long,multiple:Boolean) = {
123         withChannel {
124           Console.err.println("Received Nack for msg " + _unconfirmedMessages.get(seqNo) )
125           for { (_,msg) <- subset(seqNo,multiple) }
126             sendMessage(msg)
127         }
128       }
129     })
130   }
131
132   private[this] def withChannel[A]( next : => A) = {
133     try {
134       lock.withLock {
135       if (_connection == null ||_connection.isOpen == false )
136          _connection =_factory.newConnection(_servers)
137       if (_channel == null ||_channel.isOpen == false )
138         _channel = _connection.createChannel
139         assert(_connection.isOpen && _channel.isOpen)
140         next
141      }
142     } catch {
143         case e: Exception =>
144           e.printStackTrace
145     }
146   }
147
148   def sendMessage(payload:String) =
149     withChannel {
150       var seq : Long = _channel.getNextPublishSeqNo()
151       _unconfirmedSet += seq
152       _unconfirmedMessages += ((seq,payload))
153       _channel.basicPublish(_exchangeName,_routingKey,
154                             MessageProperties.PERSISTENT_TEXT_PLAIN,
155                             payload.getBytes)
156     }
157 }
158
159 object RabbitMQProducer  {
160   def main(args: Array[String]) = {
161     val propsfile = new FileStreamResource(new File("aquarium.properties"))
162     var _props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
163     val aquarium = new AquariumBuilder(_props, ResourceLocator.DefaultPolicyModel).
164     update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
165     update(Aquarium.EnvKeys.eventsStoreFolder, Some(new File(".."))).
166     build()
167     aquarium(Aquarium.EnvKeys.rabbitMQProducer).sendMessage("Test string !!!!")
168     Console.err.println("Message sent")
169     aquarium.stop()
170     ()
171   }
172 }