9986d0164f73af7df2e36556f2a29bde02fcc788
[aquarium] / src / main / scala / gr / grnet / aquarium / service / RabbitMQService.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.service
37
38 import com.ckkloverdos.props.Props
39 import com.google.common.eventbus.Subscribe
40 import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable}
41 import gr.grnet.aquarium.converter.StdConverters
42 import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
43 import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle}
44 import gr.grnet.aquarium.util.sameTags
45 import gr.grnet.aquarium.service.event.{AquariumCreatedEvent, StoreIsAliveBusEvent, StoreIsDeadBusEvent}
46 import gr.grnet.aquarium.connector.rabbitmq.service.PayloadHandlerPostNotifier
47 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys.RabbitMQConfKeys
48 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys
49 import gr.grnet.aquarium.connector.handler.{SynchronousPayloadHandlerExecutor, ResourceEventPayloadHandler, IMEventPayloadHandler}
50
51 /**
52  * The service that is responsible to handle `RabbitMQ` connecrivity.
53  *
54  * @author Christos KK Loverdos <loverdos@gmail.com>
55  */
56
57 class RabbitMQService extends Loggable with Lifecycle with Configurable with AquariumAwareSkeleton {
58   @volatile private[this] var _props: Props = Props()(StdConverters.AllConverters)
59   @volatile private[this] var _consumers = List[RabbitMQConsumer]()
60
61   def propertyPrefix = Some(RabbitMQKeys.PropertiesPrefix)
62
63   def eventBus = aquarium.eventBus
64
65   def resourceEventStore = aquarium.resourceEventStore
66
67   def imEventStore = aquarium.imEventStore
68
69   def converters = aquarium.converters
70
71   /**
72    * Configure this instance with the provided properties.
73    *
74    * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
75    */
76   def configure(props: Props) = {
77     this._props = props
78   }
79
80   @Subscribe
81   override def awareOfAquariumEx(event: AquariumCreatedEvent) {
82     super.awareOfAquariumEx(event)
83
84     aquarium.eventBus.addSubscriber(this)
85
86     doSetup()
87   }
88
89   private[this] def doSetup(): Unit = {
90     val postNotifier = new PayloadHandlerPostNotifier(logger)
91
92     val rcHandler = new ResourceEventPayloadHandler(aquarium, logger)
93
94     val imHandler = new IMEventPayloadHandler(aquarium, logger)
95
96     val futureExecutor = new SynchronousPayloadHandlerExecutor
97
98     // (e)xchange:(r)outing key:(q)
99
100     // These two are to trigger an error if the property does not exist
101     locally(_props(RabbitMQConfKeys.rcevents_queues))
102     locally(_props(RabbitMQConfKeys.imevents_queues))
103
104     val all_rc_ERQs = _props.getTrimmedList(RabbitMQConfKeys.rcevents_queues)
105
106     val rcConsumerConfs_ = for(oneERQ ← all_rc_ERQs) yield {
107       RabbitMQKeys.makeRabbitMQConsumerConf(Tags.ResourceEventTag, _props, oneERQ)
108     }
109     val rcConsumerConfs = rcConsumerConfs_.toSet.toList
110     if(rcConsumerConfs.size != rcConsumerConfs_.size) {
111       logger.warn(
112         "Duplicate %s consumer info in %s=%s".format(
113           RabbitMQKeys.PropertiesPrefix,
114           RabbitMQConfKeys.rcevents_queues,
115           _props(RabbitMQConfKeys.rcevents_queues)))
116     }
117
118     val all_im_ERQs = _props.getTrimmedList(RabbitMQConfKeys.imevents_queues)
119     val imConsumerConfs_ = for(oneERQ ← all_im_ERQs) yield {
120       RabbitMQKeys.makeRabbitMQConsumerConf(Tags.IMEventTag, _props, oneERQ)
121     }
122     val imConsumerConfs = imConsumerConfs_.toSet.toList
123     if(imConsumerConfs.size != imConsumerConfs_.size) {
124       logger.warn(
125         "Duplicate %s consumer info in %s=%s".format(
126           RabbitMQKeys.PropertiesPrefix,
127           RabbitMQConfKeys.imevents_queues,
128           _props(RabbitMQConfKeys.imevents_queues)))
129     }
130
131     val rcConsumers = for(rccc ← rcConsumerConfs) yield {
132       logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
133         RabbitMQKeys.PropertiesPrefix,
134         rccc.exchangeName,
135         rccc.routingKey,
136         rccc.queueName
137       ))
138       new RabbitMQConsumer(
139         aquarium,
140         rccc,
141         rcHandler,
142         futureExecutor,
143         postNotifier
144       )
145     }
146
147     val imConsumers = for(imcc ← imConsumerConfs) yield {
148       logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
149         RabbitMQKeys.PropertiesPrefix,
150         imcc.exchangeName,
151         imcc.routingKey,
152         imcc.queueName
153       ))
154       new RabbitMQConsumer(
155         aquarium,
156         imcc,
157         imHandler,
158         futureExecutor,
159         postNotifier
160       )
161     }
162
163     this._consumers = rcConsumers ++ imConsumers
164
165     val lg: (String ⇒ Unit) = if(this._consumers.size == 0) logger.warn(_) else logger.debug(_)
166     lg("Got %s consumers".format(this._consumers.size))
167
168     this._consumers.foreach(logger.debug("Configured {}", _))
169   }
170
171   def start() = {
172     safeStart()
173   }
174
175   def safeStart() = {
176     for(consumer ← this._consumers) {
177       logStartingF(consumer.toString) {
178         consumer.safeStart()
179       } {}
180     }
181
182     for(consumer ← this._consumers) {
183       if(!consumer.isAlive()) {
184         logger.warn("Consumer not started yet %s".format(consumer))
185       }
186     }
187   }
188
189   def stop() = {
190     safeStop()
191   }
192
193   def safeStop() = {
194     for(consumer ← this._consumers) {
195       logStoppingF(consumer.toString) {
196         consumer.safeStop()
197       } {}
198     }
199   }
200
201   @Subscribe
202   def handleStoreFailure(event: StoreIsDeadBusEvent): Unit = {
203     val eventTag = event.tag
204
205     val consumersForTag = this._consumers.filter(consumer ⇒ sameTags(consumer.conf.tag, eventTag))
206     for(consumer ← consumersForTag) {
207       if(consumer.isAlive()) {
208         // Our store is down, so we cannot accept messages anymore
209         logger.info("Shutting down %s, since store for %s is down".format(consumer, eventTag))
210         consumer.setAllowReconnects(false)
211         consumer.safeStop()
212       }
213     }
214   }
215
216   @Subscribe
217   def handleStoreRevival(event: StoreIsAliveBusEvent): Unit = {
218     val eventTag = event.tag
219
220     val consumersForTag = this._consumers.filter(consumer ⇒ sameTags(consumer.conf.tag, eventTag))
221     for(consumer ← consumersForTag) {
222       if(!consumer.isAlive() && !aquarium.isStopping()) {
223         // out store is up, so we can again accept messages
224         logger.info("Starting up %s, since store for %s is alive".format(consumer, eventTag))
225         consumer.setAllowReconnects(true)
226         consumer.safeStart()
227       }
228     }
229   }
230 }