8e95f8fcf141c4e0dfad915f6552f1cf748e9fc8
[aquarium] / src / main / scala / gr / grnet / aquarium / connector / rabbitmq / RabbitMQProducer.scala
1 package gr.grnet.aquarium.connector.rabbitmq
2
3 import conf.{RabbitMQKeys, RabbitMQConsumerConf}
4 import conf.RabbitMQKeys.{RabbitMQConfKeys, RabbitMQConKeys}
5 import gr.grnet.aquarium.{ResourceLocator, AquariumBuilder, Aquarium}
6 import com.rabbitmq.client.{MessageProperties, Channel, Connection, ConnectionFactory}
7 import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
8 import com.ckkloverdos.props.Props
9 import gr.grnet.aquarium.converter.StdConverters
10 import gr.grnet.aquarium.util.Tags
11 import gr.grnet.aquarium.store.memory.MemStoreProvider
12 import java.io.File
13 import com.ckkloverdos.resource.FileStreamResource
14
15
16 /*
17  * Copyright 2011-2012 GRNET S.A. All rights reserved.
18  *
19  * Redistribution and use in source and binary forms, with or
20  * without modification, are permitted provided that the following
21  * conditions are met:
22  *
23  *   1. Redistributions of source code must retain the above
24  *      copyright notice, this list of conditions and the following
25  *      disclaimer.
26  *
27  *   2. Redistributions in binary form must reproduce the above
28  *      copyright notice, this list of conditions and the following
29  *      disclaimer in the documentation and/or other materials
30  *      provided with the distribution.
31  *
32  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
33  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
34  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
35  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
36  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
37  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
38  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
39  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
40  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
41  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
42  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
43  * POSSIBILITY OF SUCH DAMAGE.
44  *
45  * The views and conclusions contained in the software and
46  * documentation are those of the authors and should not be
47  * interpreted as representing official policies, either expressed
48  * or implied, of GRNET S.A.
49  */
50
51 class RabbitMQProducer(val aquarium: Aquarium) {
52   lazy val conf: RabbitMQConsumerConf = {
53     var props = aquarium(Aquarium.EnvKeys.originalProps)
54     var prop = props.get(RabbitMQConfKeys.imevents_credit).getOr("")
55     Console.println("Prop: " + prop)
56     val Array(exchange, routing) = prop.split(":")
57     //Console.println("ex: " + exchange + " routing: " + routing)
58     val conf = RabbitMQConsumerConf(
59       tag = Tags.IMEventTag,
60       exchangeName = exchange,
61       routingKey = routing,
62       queueName = "",
63       connectionConf = RabbitMQKeys.makeConnectionConf(props),
64       exchangeConf = RabbitMQKeys.DefaultExchangeConf,
65       channelConf = RabbitMQKeys.DefaultChannelConf,
66       queueConf = RabbitMQKeys.DefaultQueueConf
67     )
68     conf
69   }
70   private[this] var _factory: ConnectionFactory = {
71     val factory = new ConnectionFactory
72     factory.setConnectionTimeout(conf.connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
73     factory.setUsername(conf.connectionConf(RabbitMQConKeys.username))
74     factory.setPassword(conf.connectionConf(RabbitMQConKeys.password))
75     factory.setVirtualHost(conf.connectionConf(RabbitMQConKeys.vhost))
76     factory.setRequestedHeartbeat(conf.connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
77     factory
78   }
79
80   private[this] var _connection: Connection = _
81   private[this] var _channel: Channel = _
82   //private[this] val _state = new AtomicReference[State](Shutdown)
83   private[this] val _pingIsScheduled = new AtomicBoolean(false)
84
85   private[this] lazy val servers = {
86     val s = conf.connectionConf(RabbitMQConKeys.servers)
87     for { s1 <- s }  Console.err.println("Servers: " + s1.toString)
88     s
89   }
90
91   private[this] def withChannel[A]( next : => A) = {
92     try {
93       var connection : Connection =  null
94       var channel : Channel = null
95       if (_connection == null ||_connection.isOpen == false )
96          _connection =_factory.newConnection(servers)
97       if (_channel == null ||_channel.isOpen == false )
98         _channel = _connection.createChannel
99       assert(_connection.isOpen && _channel.isOpen)
100       next
101     } catch {
102         case e: Exception =>
103           e.printStackTrace
104     }
105   }
106
107   def sendMessage(payload:String) =
108     withChannel {
109       _channel.basicPublish(conf.exchangeName, conf.routingKey,
110         MessageProperties.PERSISTENT_TEXT_PLAIN,
111         payload.getBytes)
112     }
113 }
114
115 object RabbitMQProducer {
116   val propsfile = new FileStreamResource(new File("aquarium.properties"))
117   @volatile private[this] var _props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
118   val aquarium = new AquariumBuilder(_props, ResourceLocator.DefaultPolicyModel).
119                 update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
120                 update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))).
121                 build()
122
123
124   def main(args: Array[String]) = {
125     new RabbitMQProducer(aquarium).sendMessage("{userid: \"pgerakios@grnet.gr\", state:true}")
126     ()
127   }
128 }