7f911c88f37bfdd6b9b136e9cc045b96b0bc763c
[aquarium] / src / main / scala / gr / grnet / aquarium / connector / rabbitmq / RabbitMQProducer.scala
1 package gr.grnet.aquarium.connector.rabbitmq
2
3 import conf.{RabbitMQKeys, RabbitMQConsumerConf}
4 import conf.RabbitMQKeys.{RabbitMQConfKeys, RabbitMQConKeys}
5 import gr.grnet.aquarium._
6 import com.rabbitmq.client._
7 import com.ckkloverdos.props.Props
8 import gr.grnet.aquarium.converter.StdConverters
9 import gr.grnet.aquarium.util.Lock
10 import gr.grnet.aquarium.store.memory.MemStoreProvider
11 import java.io.File
12 import com.ckkloverdos.resource.FileStreamResource
13 import scala.Some
14 import collection.immutable.{TreeMap, TreeSet}
15
16
17 /*
18  * Copyright 2011-2012 GRNET S.A. All rights reserved.
19  *
20  * Redistribution and use in source and binary forms, with or
21  * without modification, are permitted provided that the following
22  * conditions are met:
23  *
24  *   1. Redistributions of source code must retain the above
25  *      copyright notice, this list of conditions and the following
26  *      disclaimer.
27  *
28  *   2. Redistributions in binary form must reproduce the above
29  *      copyright notice, this list of conditions and the following
30  *      disclaimer in the documentation and/or other materials
31  *      provided with the distribution.
32  *
33  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
34  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
35  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
36  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
37  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
38  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
39  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
40  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
41  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
43  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
44  * POSSIBILITY OF SUCH DAMAGE.
45  *
46  * The views and conclusions contained in the software and
47  * documentation are those of the authors and should not be
48  * interpreted as representing official policies, either expressed
49  * or implied, of GRNET S.A.
50  */
51
52 /**
53  *
54  * @author Prodromos Gerakios <pgerakios@grnet.gr>
55  */
56
57 class RabbitMQProducer extends Configurable {
58   private[this] var _conf: RabbitMQConsumerConf = _
59   private[this] var _factory: ConnectionFactory = _
60   private[this] var _connection: Connection = _
61   private[this] var _channel: Channel = _
62   private[this] var _servers : Array[Address] = _
63   private[this] final val lock = new Lock()
64   private[this] var _exchangeName : String = _
65   private[this] var _routingKey :String = _
66
67   def propertyPrefix: Option[String] = Some(RabbitMQKeys.PropertiesPrefix)
68   //  Some(RabbitMQConfKeys.imevents_credit)
69
70
71   @volatile private[this] var _unconfirmedSet = new TreeSet[Long]()
72   @volatile private[this] var _unconfirmedMessages = new TreeMap[Long,String]()
73
74   def configure(props: Props): Unit = {
75     val propName = RabbitMQConfKeys.imevents_credit
76     def exn () = throw new AquariumInternalError(new Exception, "While obtaining value for key %s in properties".format(propName))
77     val prop = props.get(propName).getOr(exn())
78     if (prop.isEmpty) exn()
79     val connectionConf = RabbitMQKeys.makeConnectionConf(props)
80     val Array(exchangeName, routingKey) = prop.split(":")
81     _exchangeName = exchangeName
82     _routingKey = routingKey
83     _factory = new ConnectionFactory
84     _factory.setConnectionTimeout(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
85     _factory.setUsername(connectionConf(RabbitMQConKeys.username))
86     _factory.setPassword(connectionConf(RabbitMQConKeys.password))
87     _factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
88     _factory.setRequestedHeartbeat(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
89     _servers = connectionConf(RabbitMQConKeys.servers)
90     _connection =_factory.newConnection(_servers)
91     _channel = _connection.createChannel
92     _channel.confirmSelect
93     _channel.addConfirmListener(new ConfirmListener {
94
95       private [this] def subset(seqNo:Long,multiple:Boolean) : TreeMap[Long,String] = {
96          val set = if (multiple)
97                     _unconfirmedSet.range(0,seqNo+1)
98                    else
99                     _unconfirmedSet.range(seqNo,seqNo)
100          _unconfirmedSet = _unconfirmedSet -- set
101          val ret : TreeMap[Long,String] = set.foldLeft(TreeMap[Long,String]())({(map,seq)=>
102            _unconfirmedMessages.get(seq) match{
103              case None => map
104              case Some(s) => map + ((seq,s))
105          }})
106          _unconfirmedMessages = _unconfirmedMessages -- set
107         ret
108        }
109
110
111       def handleAck(seqNo:Long,multiple:Boolean) = {
112         withChannel {
113           Console.err.println("Received ack for msg " + _unconfirmedMessages.get(seqNo) )
114           subset(seqNo,multiple)
115         }
116       }
117
118       def handleNack(seqNo:Long,multiple:Boolean) = {
119         withChannel {
120           Console.err.println("Received Nack for msg " + _unconfirmedMessages.get(seqNo) )
121           for { (_,msg) <- subset(seqNo,multiple) }
122             sendMessage(msg)
123         }
124       }
125     })
126   }
127
128   private[this] def withChannel[A]( next : => A) = {
129     try {
130       lock.withLock {
131       if (_connection == null ||_connection.isOpen == false )
132          _connection =_factory.newConnection(_servers)
133       if (_channel == null ||_channel.isOpen == false )
134         _channel = _connection.createChannel
135         assert(_connection.isOpen && _channel.isOpen)
136         next
137      }
138     } catch {
139         case e: Exception =>
140           e.printStackTrace
141     }
142   }
143
144   def sendMessage(payload:String) =
145     withChannel {
146       var seq : Long = _channel.getNextPublishSeqNo()
147       _unconfirmedSet += seq
148       _unconfirmedMessages += ((seq,payload))
149       _channel.basicPublish(_exchangeName,_routingKey,
150                             MessageProperties.PERSISTENT_TEXT_PLAIN,
151                             payload.getBytes)
152     }
153 }
154
155 object RabbitMQProducer  {
156   def main(args: Array[String]) = {
157     val propsfile = new FileStreamResource(new File("aquarium.properties"))
158     var _props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
159     val aquarium = new AquariumBuilder(_props, ResourceLocator.DefaultPolicyModel).
160     update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
161     update(Aquarium.EnvKeys.eventsStoreFolder, Some(new File(".."))).
162     build()
163     aquarium(Aquarium.EnvKeys.rabbitMQProducer).sendMessage("Test string !!!!")
164     Console.err.println("Message sent")
165     //aquarium.start()
166     ()
167   }
168 }