Refactor Aquarium to make it more configurable
[aquarium] / src / main / scala / gr / grnet / aquarium / service / RabbitMQService.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.service
37
38 import com.ckkloverdos.props.Props
39 import com.google.common.eventbus.Subscribe
40 import gr.grnet.aquarium.{AquariumAwareSkeleton, Aquarium, Configurable}
41 import gr.grnet.aquarium.converter.StdConverters
42 import gr.grnet.aquarium.actor.RouterRole
43 import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
44 import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle}
45 import gr.grnet.aquarium.util.sameTags
46 import gr.grnet.aquarium.service.event.{AquariumCreatedEvent, StoreIsAliveBusEvent, StoreIsDeadBusEvent}
47 import gr.grnet.aquarium.connector.rabbitmq.service.{PayloadHandlerFutureExecutor, PayloadHandlerPostNotifier}
48 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys.RabbitMQConfKeys
49 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys
50 import gr.grnet.aquarium.connector.handler.{SynchronousPayloadHandlerExecutor, ResourceEventPayloadHandler, IMEventPayloadHandler}
51
52 /**
53  *
54  * @author Christos KK Loverdos <loverdos@gmail.com>
55  */
56
57 class RabbitMQService extends Loggable with Lifecycle with Configurable with AquariumAwareSkeleton {
58   @volatile private[this] var _props: Props = Props()(StdConverters.AllConverters)
59   @volatile private[this] var _consumers = List[RabbitMQConsumer]()
60
61   def propertyPrefix = Some(RabbitMQKeys.PropertiesPrefix)
62
63   def eventBus = aquarium.eventBus
64
65   def resourceEventStore = aquarium.resourceEventStore
66
67   def imEventStore = aquarium.imEventStore
68
69   def converters = aquarium.converters
70
71   def router = aquarium.actorProvider.actorForRole(RouterRole)
72
73   /**
74    * Configure this instance with the provided properties.
75    *
76    * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
77    */
78   def configure(props: Props) = {
79     this._props = props
80   }
81
82   @Subscribe
83   override def awareOfAquariumEx(event: AquariumCreatedEvent) {
84     super.awareOfAquariumEx(event)
85
86     aquarium.eventBus.addSubscriber(this)
87
88     doSetup()
89   }
90
91   private[this] def doSetup(): Unit = {
92     val postNotifier = new PayloadHandlerPostNotifier(logger)
93
94     val rcHandler = new ResourceEventPayloadHandler(aquarium, logger)
95
96     val imHandler = new IMEventPayloadHandler(aquarium, logger)
97
98     val futureExecutor = new SynchronousPayloadHandlerExecutor
99
100     // (e)xchange:(r)outing key:(q)
101
102     // These two are to trigger an error if the property does not exist
103     locally(_props(RabbitMQConfKeys.rcevents_queues))
104     locally(_props(RabbitMQConfKeys.imevents_queues))
105
106     val all_rc_ERQs = _props.getTrimmedList(RabbitMQConfKeys.rcevents_queues)
107
108     val rcConsumerConfs_ = for(oneERQ ← all_rc_ERQs) yield {
109       RabbitMQKeys.makeRabbitMQConsumerConf(Tags.ResourceEventTag, _props, oneERQ)
110     }
111     val rcConsumerConfs = rcConsumerConfs_.toSet.toList
112     if(rcConsumerConfs.size != rcConsumerConfs_.size) {
113       logger.warn(
114         "Duplicate %s consumer info in %s=%s".format(
115           RabbitMQKeys.PropertiesPrefix,
116           RabbitMQConfKeys.rcevents_queues,
117           _props(RabbitMQConfKeys.rcevents_queues)))
118     }
119
120     val all_im_ERQs = _props.getTrimmedList(RabbitMQConfKeys.imevents_queues)
121     val imConsumerConfs_ = for(oneERQ ← all_im_ERQs) yield {
122       RabbitMQKeys.makeRabbitMQConsumerConf(Tags.IMEventTag, _props, oneERQ)
123     }
124     val imConsumerConfs = imConsumerConfs_.toSet.toList
125     if(imConsumerConfs.size != imConsumerConfs_.size) {
126       logger.warn(
127         "Duplicate %s consumer info in %s=%s".format(
128           RabbitMQKeys.PropertiesPrefix,
129           RabbitMQConfKeys.imevents_queues,
130           _props(RabbitMQConfKeys.imevents_queues)))
131     }
132
133     val rcConsumers = for(rccc ← rcConsumerConfs) yield {
134       logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
135         RabbitMQKeys.PropertiesPrefix,
136         rccc.exchangeName,
137         rccc.routingKey,
138         rccc.queueName
139       ))
140       new RabbitMQConsumer(
141         aquarium,
142         rccc,
143         rcHandler,
144         futureExecutor,
145         postNotifier
146       )
147     }
148
149     val imConsumers = for(imcc ← imConsumerConfs) yield {
150       logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
151         RabbitMQKeys.PropertiesPrefix,
152         imcc.exchangeName,
153         imcc.routingKey,
154         imcc.queueName
155       ))
156       new RabbitMQConsumer(
157         aquarium,
158         imcc,
159         imHandler,
160         futureExecutor,
161         postNotifier
162       )
163     }
164
165     this._consumers = rcConsumers ++ imConsumers
166
167     val lg: (String ⇒ Unit) = if(this._consumers.size == 0) logger.warn(_) else logger.debug(_)
168     lg("Got %s consumers".format(this._consumers.size))
169
170     this._consumers.foreach(logger.debug("Configured {}", _))
171   }
172
173   def start() = {
174     safeStart()
175   }
176
177   def safeStart() = {
178     for(consumer ← this._consumers) {
179       logStartingF(consumer.toString) {
180         consumer.safeStart()
181       } {}
182     }
183
184     for(consumer ← this._consumers) {
185       if(!consumer.isAlive()) {
186         logger.warn("Consumer not started yet %s".format(consumer))
187       }
188     }
189   }
190
191   def stop() = {
192     safeStop()
193   }
194
195   def safeStop() = {
196     for(consumer ← this._consumers) {
197       logStoppingF(consumer.toString) {
198         consumer.safeStop()
199       } {}
200     }
201   }
202
203   @Subscribe
204   def handleStoreFailure(event: StoreIsDeadBusEvent): Unit = {
205     val eventTag = event.tag
206
207     val consumersForTag = this._consumers.filter(consumer ⇒ sameTags(consumer.conf.tag, eventTag))
208     for(consumer ← consumersForTag) {
209       if(consumer.isAlive()) {
210         // Our store is down, so we cannot accept messages anymore
211         logger.info("Shutting down %s, since store for %s is down".format(consumer, eventTag))
212         consumer.setAllowReconnects(false)
213         consumer.safeStop()
214       }
215     }
216   }
217
218   @Subscribe
219   def handleStoreRevival(event: StoreIsAliveBusEvent): Unit = {
220     val eventTag = event.tag
221
222     val consumersForTag = this._consumers.filter(consumer ⇒ sameTags(consumer.conf.tag, eventTag))
223     for(consumer ← consumersForTag) {
224       if(!consumer.isAlive() && !aquarium.isStopping()) {
225         // out store is up, so we can again accept messages
226         logger.info("Starting up %s, since store for %s is alive".format(consumer, eventTag))
227         consumer.setAllowReconnects(true)
228         consumer.safeStart()
229       }
230     }
231   }
232 }