1 package gr.grnet.aquarium.connector.rabbitmq
3 import conf.{RabbitMQKeys, RabbitMQConsumerConf}
4 import conf.RabbitMQKeys.{RabbitMQConfKeys, RabbitMQConKeys}
5 import gr.grnet.aquarium.{ResourceLocator, AquariumBuilder, Aquarium}
6 import com.rabbitmq.client.{MessageProperties, Channel, Connection, ConnectionFactory}
7 import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
8 import com.ckkloverdos.props.Props
9 import gr.grnet.aquarium.converter.StdConverters
10 import gr.grnet.aquarium.util.Tags
11 import gr.grnet.aquarium.store.memory.MemStoreProvider
13 import com.ckkloverdos.resource.FileStreamResource
17 * Copyright 2011-2012 GRNET S.A. All rights reserved.
19 * Redistribution and use in source and binary forms, with or
20 * without modification, are permitted provided that the following
23 * 1. Redistributions of source code must retain the above
24 * copyright notice, this list of conditions and the following
27 * 2. Redistributions in binary form must reproduce the above
28 * copyright notice, this list of conditions and the following
29 * disclaimer in the documentation and/or other materials
30 * provided with the distribution.
32 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
33 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
34 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
35 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
36 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
37 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
38 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
39 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
40 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
41 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
42 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
43 * POSSIBILITY OF SUCH DAMAGE.
45 * The views and conclusions contained in the software and
46 * documentation are those of the authors and should not be
47 * interpreted as representing official policies, either expressed
48 * or implied, of GRNET S.A.
51 class RabbitMQProducer(val aquarium: Aquarium) {
52 lazy val conf: RabbitMQConsumerConf = {
53 var props = aquarium(Aquarium.EnvKeys.originalProps)
54 var prop = props.get(RabbitMQConfKeys.imevents_credit).getOr("")
55 Console.println("Prop: " + prop)
56 val Array(exchange, routing) = prop.split(":")
57 //Console.println("ex: " + exchange + " routing: " + routing)
58 val conf = RabbitMQConsumerConf(
59 tag = Tags.IMEventTag,
60 exchangeName = exchange,
63 connectionConf = RabbitMQKeys.makeConnectionConf(props),
64 exchangeConf = RabbitMQKeys.DefaultExchangeConf,
65 channelConf = RabbitMQKeys.DefaultChannelConf,
66 queueConf = RabbitMQKeys.DefaultQueueConf
70 private[this] var _factory: ConnectionFactory = {
71 val factory = new ConnectionFactory
72 factory.setConnectionTimeout(conf.connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
73 factory.setUsername(conf.connectionConf(RabbitMQConKeys.username))
74 factory.setPassword(conf.connectionConf(RabbitMQConKeys.password))
75 factory.setVirtualHost(conf.connectionConf(RabbitMQConKeys.vhost))
76 factory.setRequestedHeartbeat(conf.connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
80 private[this] var _connection: Connection = _
81 private[this] var _channel: Channel = _
82 //private[this] val _state = new AtomicReference[State](Shutdown)
83 private[this] val _pingIsScheduled = new AtomicBoolean(false)
85 private[this] lazy val servers = {
86 val s = conf.connectionConf(RabbitMQConKeys.servers)
87 for { s1 <- s } Console.err.println("Servers: " + s1.toString)
91 private[this] def withChannel[A]( next : => A) = {
93 var connection : Connection = null
94 var channel : Channel = null
95 if (_connection == null ||_connection.isOpen == false )
96 _connection =_factory.newConnection(servers)
97 if (_channel == null ||_channel.isOpen == false )
98 _channel = _connection.createChannel
99 assert(_connection.isOpen && _channel.isOpen)
107 def sendMessage(payload:String) =
109 _channel.basicPublish(conf.exchangeName, conf.routingKey,
110 MessageProperties.PERSISTENT_TEXT_PLAIN,
115 object RabbitMQProducer {
116 val propsfile = new FileStreamResource(new File("aquarium.properties"))
117 @volatile private[this] var _props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
118 val aquarium = new AquariumBuilder(_props, ResourceLocator.DefaultPolicyModel).
119 update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
120 update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))).
124 def main(args: Array[String]) = {
125 new RabbitMQProducer(aquarium).sendMessage("{userid: \"pgerakios@grnet.gr\", state:true}")