b77af94b76559888baba1f3c1fa93dfbcd04f070
[aquarium] / src / main / scala / gr / grnet / aquarium / connector / rabbitmq / RabbitMQProducer.scala
1 package gr.grnet.aquarium.connector.rabbitmq
2
3 import conf.RabbitMQConsumerConf
4 import conf.RabbitMQConsumerConf
5 import conf.{RabbitMQKeys, RabbitMQConsumerConf}
6 import conf.RabbitMQKeys.{RabbitMQConfKeys, RabbitMQConKeys}
7 import gr.grnet.aquarium._
8 import com.rabbitmq.client._
9 import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
10 import com.ckkloverdos.props.Props
11 import gr.grnet.aquarium.converter.StdConverters
12 import gr.grnet.aquarium.util.{Lock, Tags}
13 import gr.grnet.aquarium.store.memory.MemStoreProvider
14 import java.io.File
15 import com.ckkloverdos.resource.FileStreamResource
16 import scala.Some
17
18
19 /*
20  * Copyright 2011-2012 GRNET S.A. All rights reserved.
21  *
22  * Redistribution and use in source and binary forms, with or
23  * without modification, are permitted provided that the following
24  * conditions are met:
25  *
26  *   1. Redistributions of source code must retain the above
27  *      copyright notice, this list of conditions and the following
28  *      disclaimer.
29  *
30  *   2. Redistributions in binary form must reproduce the above
31  *      copyright notice, this list of conditions and the following
32  *      disclaimer in the documentation and/or other materials
33  *      provided with the distribution.
34  *
35  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
36  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
37  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
38  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
39  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
40  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
41  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
42  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
43  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
44  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
45  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
46  * POSSIBILITY OF SUCH DAMAGE.
47  *
48  * The views and conclusions contained in the software and
49  * documentation are those of the authors and should not be
50  * interpreted as representing official policies, either expressed
51  * or implied, of GRNET S.A.
52  */
53
54 /**
55  *
56  * @author Prodromos Gerakios <pgerakios@grnet.gr>
57  */
58
59 class RabbitMQProducer extends Configurable {
60   private[this] var _conf: RabbitMQConsumerConf = _
61   private[this] var _factory: ConnectionFactory = _
62   private[this] var _connection: Connection = _
63   private[this] var _channel: Channel = _
64   private[this] var _servers : Array[Address] = _
65   private[this] final val lock = new Lock()
66   private[this] var _exchangeName : String = _
67   private[this] var _routingKey :String = _
68
69   def propertyPrefix: Option[String] = Some(RabbitMQKeys.PropertiesPrefix)
70   //  Some(RabbitMQConfKeys.imevents_credit)
71
72
73   def configure(props: Props): Unit = {
74     val propName = RabbitMQConfKeys.imevents_credit
75     def exn () = throw new AquariumInternalError(new Exception, "While obtaining value for key %s in properties".format(propName))
76     val prop = props.get(propName).getOr(exn())
77     if (prop.isEmpty) exn()
78     val connectionConf = RabbitMQKeys.makeConnectionConf(props)
79     val Array(exchangeName, routingKey) = prop.split(":")
80     _exchangeName = exchangeName
81     _routingKey = routingKey
82     _factory = new ConnectionFactory
83     _factory.setConnectionTimeout(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
84     _factory.setUsername(connectionConf(RabbitMQConKeys.username))
85     _factory.setPassword(connectionConf(RabbitMQConKeys.password))
86     _factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
87     _factory.setRequestedHeartbeat(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
88     _servers = connectionConf(RabbitMQConKeys.servers)
89   }
90
91   private[this] def withChannel[A]( next : => A) = {
92     try {
93       lock.withLock {
94       if (_connection == null ||_connection.isOpen == false )
95          _connection =_factory.newConnection(_servers)
96       if (_channel == null ||_channel.isOpen == false )
97         _channel = _connection.createChannel
98         assert(_connection.isOpen && _channel.isOpen)
99         next
100      }
101     } catch {
102         case e: Exception =>
103           e.printStackTrace
104     }
105   }
106
107   def sendMessage(payload:String) =
108     withChannel {
109       _channel.basicPublish(_exchangeName,_routingKey,
110         MessageProperties.PERSISTENT_TEXT_PLAIN,
111         payload.getBytes)
112     }
113 }