2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.service
38 import com.ckkloverdos.props.Props
39 import com.google.common.eventbus.Subscribe
40 import gr.grnet.aquarium.{Aquarium, AquariumAwareSkeleton, Configurable}
41 import gr.grnet.aquarium.converter.StdConverters
42 import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
43 import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle}
44 import gr.grnet.aquarium.util.sameTags
45 import event.{BalanceEvent, AquariumCreatedEvent, StoreIsAliveBusEvent, StoreIsDeadBusEvent}
46 import gr.grnet.aquarium.connector.rabbitmq.service.PayloadHandlerPostNotifier
47 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys.RabbitMQConfKeys
48 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys
49 import gr.grnet.aquarium.connector.handler.{SynchronousPayloadHandlerExecutor, ResourceEventPayloadHandler, IMEventPayloadHandler}
50 import gr.grnet.aquarium.util.json.JsonSupport
53 * The service that is responsible to handle `RabbitMQ` connecrivity.
55 * @author Christos KK Loverdos <loverdos@gmail.com>
58 class RabbitMQService extends Loggable with Lifecycle with Configurable with AquariumAwareSkeleton {
59 @volatile private[this] var _props: Props = Props()(StdConverters.AllConverters)
60 @volatile private[this] var _consumers = List[RabbitMQConsumer]()
62 def propertyPrefix = Some(RabbitMQKeys.PropertiesPrefix)
64 def eventBus = aquarium.eventBus
66 def resourceEventStore = aquarium.resourceEventStore
68 def imEventStore = aquarium.imEventStore
70 def converters = aquarium.converters
73 * Configure this instance with the provided properties.
75 * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
77 def configure(props: Props) = {
82 override def awareOfAquarium(event: AquariumCreatedEvent) {
83 super.awareOfAquarium(event)
85 aquarium.eventBus.addSubscriber(this)
90 private[this] def doSetup(): Unit = {
91 val postNotifier = new PayloadHandlerPostNotifier(logger)
93 val rcHandler = new ResourceEventPayloadHandler(aquarium, logger)
95 val imHandler = new IMEventPayloadHandler(aquarium, logger)
97 val futureExecutor = new SynchronousPayloadHandlerExecutor
99 // (e)xchange:(r)outing key:(q)
101 // These two are to trigger an error if the property does not exist
102 locally(_props(RabbitMQConfKeys.rcevents_queues))
103 locally(_props(RabbitMQConfKeys.imevents_queues))
105 val all_rc_ERQs = _props.getTrimmedList(RabbitMQConfKeys.rcevents_queues)
107 val rcConsumerConfs_ = for(oneERQ ← all_rc_ERQs) yield {
108 RabbitMQKeys.makeRabbitMQConsumerConf(Tags.ResourceEventTag, _props, oneERQ)
110 val rcConsumerConfs = rcConsumerConfs_.toSet.toList
111 if(rcConsumerConfs.size != rcConsumerConfs_.size) {
113 "Duplicate %s consumer info in %s=%s".format(
114 RabbitMQKeys.PropertiesPrefix,
115 RabbitMQConfKeys.rcevents_queues,
116 _props(RabbitMQConfKeys.rcevents_queues)))
119 val all_im_ERQs = _props.getTrimmedList(RabbitMQConfKeys.imevents_queues)
120 val imConsumerConfs_ = for(oneERQ ← all_im_ERQs) yield {
121 RabbitMQKeys.makeRabbitMQConsumerConf(Tags.IMEventTag, _props, oneERQ)
123 val imConsumerConfs = imConsumerConfs_.toSet.toList
124 if(imConsumerConfs.size != imConsumerConfs_.size) {
126 "Duplicate %s consumer info in %s=%s".format(
127 RabbitMQKeys.PropertiesPrefix,
128 RabbitMQConfKeys.imevents_queues,
129 _props(RabbitMQConfKeys.imevents_queues)))
132 val rcConsumers = for(rccc ← rcConsumerConfs) yield {
133 logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
134 RabbitMQKeys.PropertiesPrefix,
139 new RabbitMQConsumer(
148 val imConsumers = for(imcc ← imConsumerConfs) yield {
149 logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
150 RabbitMQKeys.PropertiesPrefix,
155 new RabbitMQConsumer(
164 this._consumers = rcConsumers ++ imConsumers
166 val lg: (String ⇒ Unit) = if(this._consumers.size == 0) logger.warn(_) else logger.debug(_)
167 lg("Got %s consumers".format(this._consumers.size))
169 this._consumers.foreach(logger.debug("Configured {}", _))
177 for(consumer ← this._consumers) {
178 logStartingF(consumer.toString) {
183 for(consumer ← this._consumers) {
184 if(!consumer.isAlive()) {
185 logger.warn("Consumer not started yet %s".format(consumer))
195 for(consumer ← this._consumers) {
196 logStoppingF(consumer.toString) {
203 def handleUserBalance(event:BalanceEvent): Unit = {
204 aquarium(Aquarium.EnvKeys.rabbitMQProducer).sendMessage(event.toJsonString)
208 def handleStoreFailure(event: StoreIsDeadBusEvent): Unit = {
209 val eventTag = event.tag
211 val consumersForTag = this._consumers.filter(consumer ⇒ sameTags(consumer.conf.tag, eventTag))
212 for(consumer ← consumersForTag) {
213 if(consumer.isAlive()) {
214 // Our store is down, so we cannot accept messages anymore
215 logger.info("Shutting down %s, since store for %s is down".format(consumer, eventTag))
216 consumer.setAllowReconnects(false)
223 def handleStoreRevival(event: StoreIsAliveBusEvent): Unit = {
224 val eventTag = event.tag
226 val consumersForTag = this._consumers.filter(consumer ⇒ sameTags(consumer.conf.tag, eventTag))
227 for(consumer ← consumersForTag) {
228 if(!consumer.isAlive() && !aquarium.isStopping()) {
229 // out store is up, so we can again accept messages
230 logger.info("Starting up %s, since store for %s is alive".format(consumer, eventTag))
231 consumer.setAllowReconnects(true)