Reliable message passing for RabbitMQProducer
[aquarium] / src / main / scala / gr / grnet / aquarium / service / RabbitMQService.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.service
37
38 import com.ckkloverdos.props.Props
39 import com.google.common.eventbus.Subscribe
40 import gr.grnet.aquarium.{Aquarium, AquariumAwareSkeleton, Configurable}
41 import gr.grnet.aquarium.converter.StdConverters
42 import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
43 import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle}
44 import gr.grnet.aquarium.util.sameTags
45 import event.{BalanceEvent, AquariumCreatedEvent, StoreIsAliveBusEvent, StoreIsDeadBusEvent}
46 import gr.grnet.aquarium.connector.rabbitmq.service.PayloadHandlerPostNotifier
47 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys.RabbitMQConfKeys
48 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys
49 import gr.grnet.aquarium.connector.handler.{SynchronousPayloadHandlerExecutor, ResourceEventPayloadHandler, IMEventPayloadHandler}
50 import gr.grnet.aquarium.util.json.JsonSupport
51
52 /**
53  * The service that is responsible to handle `RabbitMQ` connecrivity.
54  *
55  * @author Christos KK Loverdos <loverdos@gmail.com>
56  */
57
58 class RabbitMQService extends Loggable with Lifecycle with Configurable with AquariumAwareSkeleton {
59   @volatile private[this] var _props: Props = Props()(StdConverters.AllConverters)
60   @volatile private[this] var _consumers = List[RabbitMQConsumer]()
61
62   def propertyPrefix = Some(RabbitMQKeys.PropertiesPrefix)
63
64   def eventBus = aquarium.eventBus
65
66   def resourceEventStore = aquarium.resourceEventStore
67
68   def imEventStore = aquarium.imEventStore
69
70   def converters = aquarium.converters
71
72   /**
73    * Configure this instance with the provided properties.
74    *
75    * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
76    */
77   def configure(props: Props) = {
78     this._props = props
79   }
80
81   @Subscribe
82   override def awareOfAquarium(event: AquariumCreatedEvent) {
83     super.awareOfAquarium(event)
84
85     aquarium.eventBus.addSubscriber(this)
86
87     doSetup()
88   }
89
90   private[this] def doSetup(): Unit = {
91     val postNotifier = new PayloadHandlerPostNotifier(logger)
92
93     val rcHandler = new ResourceEventPayloadHandler(aquarium, logger)
94
95     val imHandler = new IMEventPayloadHandler(aquarium, logger)
96
97     val futureExecutor = new SynchronousPayloadHandlerExecutor
98
99     // (e)xchange:(r)outing key:(q)
100
101     // These two are to trigger an error if the property does not exist
102     locally(_props(RabbitMQConfKeys.rcevents_queues))
103     locally(_props(RabbitMQConfKeys.imevents_queues))
104
105     val all_rc_ERQs = _props.getTrimmedList(RabbitMQConfKeys.rcevents_queues)
106
107     val rcConsumerConfs_ = for(oneERQ ← all_rc_ERQs) yield {
108       RabbitMQKeys.makeRabbitMQConsumerConf(Tags.ResourceEventTag, _props, oneERQ)
109     }
110     val rcConsumerConfs = rcConsumerConfs_.toSet.toList
111     if(rcConsumerConfs.size != rcConsumerConfs_.size) {
112       logger.warn(
113         "Duplicate %s consumer info in %s=%s".format(
114           RabbitMQKeys.PropertiesPrefix,
115           RabbitMQConfKeys.rcevents_queues,
116           _props(RabbitMQConfKeys.rcevents_queues)))
117     }
118
119     val all_im_ERQs = _props.getTrimmedList(RabbitMQConfKeys.imevents_queues)
120     val imConsumerConfs_ = for(oneERQ ← all_im_ERQs) yield {
121       RabbitMQKeys.makeRabbitMQConsumerConf(Tags.IMEventTag, _props, oneERQ)
122     }
123     val imConsumerConfs = imConsumerConfs_.toSet.toList
124     if(imConsumerConfs.size != imConsumerConfs_.size) {
125       logger.warn(
126         "Duplicate %s consumer info in %s=%s".format(
127           RabbitMQKeys.PropertiesPrefix,
128           RabbitMQConfKeys.imevents_queues,
129           _props(RabbitMQConfKeys.imevents_queues)))
130     }
131
132     val rcConsumers = for(rccc ← rcConsumerConfs) yield {
133       logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
134         RabbitMQKeys.PropertiesPrefix,
135         rccc.exchangeName,
136         rccc.routingKey,
137         rccc.queueName
138       ))
139       new RabbitMQConsumer(
140         aquarium,
141         rccc,
142         rcHandler,
143         futureExecutor,
144         postNotifier
145       )
146     }
147
148     val imConsumers = for(imcc ← imConsumerConfs) yield {
149       logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
150         RabbitMQKeys.PropertiesPrefix,
151         imcc.exchangeName,
152         imcc.routingKey,
153         imcc.queueName
154       ))
155       new RabbitMQConsumer(
156         aquarium,
157         imcc,
158         imHandler,
159         futureExecutor,
160         postNotifier
161       )
162     }
163
164     this._consumers = rcConsumers ++ imConsumers
165
166     val lg: (String ⇒ Unit) = if(this._consumers.size == 0) logger.warn(_) else logger.debug(_)
167     lg("Got %s consumers".format(this._consumers.size))
168
169     this._consumers.foreach(logger.debug("Configured {}", _))
170   }
171
172   def start() = {
173     safeStart()
174   }
175
176   def safeStart() = {
177     for(consumer ← this._consumers) {
178       logStartingF(consumer.toString) {
179         consumer.safeStart()
180       } {}
181     }
182
183     for(consumer ← this._consumers) {
184       if(!consumer.isAlive()) {
185         logger.warn("Consumer not started yet %s".format(consumer))
186       }
187     }
188   }
189
190   def stop() = {
191     safeStop()
192   }
193
194   def safeStop() = {
195     for(consumer ← this._consumers) {
196       logStoppingF(consumer.toString) {
197         consumer.safeStop()
198       } {}
199     }
200   }
201
202   @Subscribe
203   def handleUserBalance(event:BalanceEvent): Unit = {
204     aquarium(Aquarium.EnvKeys.rabbitMQProducer).sendMessage(event.toJsonString)
205   }
206
207   @Subscribe
208   def handleStoreFailure(event: StoreIsDeadBusEvent): Unit = {
209     val eventTag = event.tag
210
211     val consumersForTag = this._consumers.filter(consumer ⇒ sameTags(consumer.conf.tag, eventTag))
212     for(consumer ← consumersForTag) {
213       if(consumer.isAlive()) {
214         // Our store is down, so we cannot accept messages anymore
215         logger.info("Shutting down %s, since store for %s is down".format(consumer, eventTag))
216         consumer.setAllowReconnects(false)
217         consumer.safeStop()
218       }
219     }
220   }
221
222   @Subscribe
223   def handleStoreRevival(event: StoreIsAliveBusEvent): Unit = {
224     val eventTag = event.tag
225
226     val consumersForTag = this._consumers.filter(consumer ⇒ sameTags(consumer.conf.tag, eventTag))
227     for(consumer ← consumersForTag) {
228       if(!consumer.isAlive() && !aquarium.isStopping()) {
229         // out store is up, so we can again accept messages
230         logger.info("Starting up %s, since store for %s is alive".format(consumer, eventTag))
231         consumer.setAllowReconnects(true)
232         consumer.safeStart()
233       }
234     }
235   }
236 }