2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.service
38 import com.ckkloverdos.props.Props
39 import com.google.common.eventbus.Subscribe
40 import gr.grnet.aquarium.{AquariumAwareSkeleton, Aquarium, Configurable}
41 import gr.grnet.aquarium.converter.StdConverters
42 import gr.grnet.aquarium.actor.RouterRole
43 import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
44 import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle}
45 import gr.grnet.aquarium.util.sameTags
46 import gr.grnet.aquarium.service.event.{AquariumCreatedEvent, StoreIsAliveBusEvent, StoreIsDeadBusEvent}
47 import gr.grnet.aquarium.connector.rabbitmq.service.{PayloadHandlerFutureExecutor, PayloadHandlerPostNotifier}
48 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys.RabbitMQConfKeys
49 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys
50 import gr.grnet.aquarium.connector.handler.{SynchronousPayloadHandlerExecutor, ResourceEventPayloadHandler, IMEventPayloadHandler}
54 * @author Christos KK Loverdos <loverdos@gmail.com>
57 class RabbitMQService extends Loggable with Lifecycle with Configurable with AquariumAwareSkeleton {
58 @volatile private[this] var _props: Props = Props()(StdConverters.AllConverters)
59 @volatile private[this] var _consumers = List[RabbitMQConsumer]()
61 def propertyPrefix = Some(RabbitMQKeys.PropertiesPrefix)
63 def eventBus = aquarium.eventBus
65 def resourceEventStore = aquarium.resourceEventStore
67 def imEventStore = aquarium.imEventStore
69 def converters = aquarium.converters
71 def router = aquarium.actorProvider.actorForRole(RouterRole)
74 * Configure this instance with the provided properties.
76 * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
78 def configure(props: Props) = {
83 override def awareOfAquariumEx(event: AquariumCreatedEvent) {
84 super.awareOfAquariumEx(event)
86 aquarium.eventBus.addSubscriber(this)
91 private[this] def doSetup(): Unit = {
92 val postNotifier = new PayloadHandlerPostNotifier(logger)
94 val rcHandler = new ResourceEventPayloadHandler(aquarium, logger)
96 val imHandler = new IMEventPayloadHandler(aquarium, logger)
98 val futureExecutor = new SynchronousPayloadHandlerExecutor
100 // (e)xchange:(r)outing key:(q)
102 // These two are to trigger an error if the property does not exist
103 locally(_props(RabbitMQConfKeys.rcevents_queues))
104 locally(_props(RabbitMQConfKeys.imevents_queues))
106 val all_rc_ERQs = _props.getTrimmedList(RabbitMQConfKeys.rcevents_queues)
108 val rcConsumerConfs_ = for(oneERQ ← all_rc_ERQs) yield {
109 RabbitMQKeys.makeRabbitMQConsumerConf(Tags.ResourceEventTag, _props, oneERQ)
111 val rcConsumerConfs = rcConsumerConfs_.toSet.toList
112 if(rcConsumerConfs.size != rcConsumerConfs_.size) {
114 "Duplicate %s consumer info in %s=%s".format(
115 RabbitMQKeys.PropertiesPrefix,
116 RabbitMQConfKeys.rcevents_queues,
117 _props(RabbitMQConfKeys.rcevents_queues)))
120 val all_im_ERQs = _props.getTrimmedList(RabbitMQConfKeys.imevents_queues)
121 val imConsumerConfs_ = for(oneERQ ← all_im_ERQs) yield {
122 RabbitMQKeys.makeRabbitMQConsumerConf(Tags.IMEventTag, _props, oneERQ)
124 val imConsumerConfs = imConsumerConfs_.toSet.toList
125 if(imConsumerConfs.size != imConsumerConfs_.size) {
127 "Duplicate %s consumer info in %s=%s".format(
128 RabbitMQKeys.PropertiesPrefix,
129 RabbitMQConfKeys.imevents_queues,
130 _props(RabbitMQConfKeys.imevents_queues)))
133 val rcConsumers = for(rccc ← rcConsumerConfs) yield {
134 logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
135 RabbitMQKeys.PropertiesPrefix,
140 new RabbitMQConsumer(
149 val imConsumers = for(imcc ← imConsumerConfs) yield {
150 logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
151 RabbitMQKeys.PropertiesPrefix,
156 new RabbitMQConsumer(
165 this._consumers = rcConsumers ++ imConsumers
167 val lg: (String ⇒ Unit) = if(this._consumers.size == 0) logger.warn(_) else logger.debug(_)
168 lg("Got %s consumers".format(this._consumers.size))
170 this._consumers.foreach(logger.debug("Configured {}", _))
178 for(consumer ← this._consumers) {
179 logStartingF(consumer.toString) {
184 for(consumer ← this._consumers) {
185 if(!consumer.isAlive()) {
186 logger.warn("Consumer not started yet %s".format(consumer))
196 for(consumer ← this._consumers) {
197 logStoppingF(consumer.toString) {
204 def handleStoreFailure(event: StoreIsDeadBusEvent): Unit = {
205 val eventTag = event.tag
207 val consumersForTag = this._consumers.filter(consumer ⇒ sameTags(consumer.conf.tag, eventTag))
208 for(consumer ← consumersForTag) {
209 if(consumer.isAlive()) {
210 // Our store is down, so we cannot accept messages anymore
211 logger.info("Shutting down %s, since store for %s is down".format(consumer, eventTag))
212 consumer.setAllowReconnects(false)
219 def handleStoreRevival(event: StoreIsAliveBusEvent): Unit = {
220 val eventTag = event.tag
222 val consumersForTag = this._consumers.filter(consumer ⇒ sameTags(consumer.conf.tag, eventTag))
223 for(consumer ← consumersForTag) {
224 if(!consumer.isAlive() && !aquarium.isStopping()) {
225 // out store is up, so we can again accept messages
226 logger.info("Starting up %s, since store for %s is alive".format(consumer, eventTag))
227 consumer.setAllowReconnects(true)