2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.connector.rabbitmq.service
38 import com.ckkloverdos.props.Props
39 import gr.grnet.aquarium.util.date.TimeHelpers
40 import gr.grnet.aquarium.util.{Loggable, Lifecycle}
41 import gr.grnet.aquarium.util.safeUnit
42 import com.rabbitmq.client.Address
43 import gr.grnet.aquarium.{Configurator, Configurable}
44 import gr.grnet.aquarium.connector.rabbitmq.conf.{TopicExchange, RabbitMQConsumerConf, RabbitMQExchangeType}
45 import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.RabbitMQConfKeys
46 import com.ckkloverdos.env.{EnvKey, Env}
47 import gr.grnet.aquarium.converter.{JsonTextFormat, StdConverters}
48 import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
49 import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
50 import com.ckkloverdos.maybe.MaybeEither
51 import gr.grnet.aquarium.actor.RouterRole
52 import gr.grnet.aquarium.actor.message.event.{ProcessIMEvent, ProcessResourceEvent}
53 import gr.grnet.aquarium.store.{LocalFSEventStore, IMEventStore, ResourceEventStore}
54 import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
55 import com.ckkloverdos.key.{LongKey, ArrayKey, IntKey, TypedKeySkeleton, BooleanKey, StringKey}
59 * @author Christos KK Loverdos <loverdos@gmail.com>
62 class RabbitMQService extends Loggable with Lifecycle with Configurable {
63 @volatile private[this] var _props: Props = Props()(StdConverters.AllConverters)
64 @volatile private[this] var _consumers = List[RabbitMQConsumer]()
66 def propertyPrefix = Some(RabbitMQService.PropertiesPrefix)
68 def configurator = Configurator.MasterConfigurator
70 def eventBus = configurator.eventBus
72 def resourceEventStore = configurator.resourceEventStore
74 def imEventStore = configurator.imEventStore
76 def converters = configurator.converters
78 def router = configurator.actorProvider.actorForRole(RouterRole)
81 * Configure this instance with the provided properties.
83 * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
85 def configure(props: Props) = {
91 private[this] def doConfigure(): Unit = {
92 val jsonParser: (Array[Byte] ⇒ MaybeEither[JsonTextFormat]) = { payload ⇒
93 converters.convert[JsonTextFormat](payload)
96 val rcEventParser: (JsonTextFormat ⇒ ResourceEventModel) = { jsonTextFormat ⇒
97 StdResourceEvent.fromJsonTextFormat(jsonTextFormat)
100 val imEventParser: (JsonTextFormat ⇒ IMEventModel) = { jsonTextFormat ⇒
101 StdIMEvent.fromJsonTextFormat(jsonTextFormat)
104 val rcForwardAction: (ResourceEventStore#ResourceEvent ⇒ Unit) = { rcEvent ⇒
105 router ! ProcessResourceEvent(rcEvent)
108 val rcDebugForwardAction: (ResourceEventStore#ResourceEvent ⇒ Unit) = { rcEvent ⇒
109 logger.info("Forwarding {}", rcEvent)
112 val imForwardAction: (IMEventStore#IMEvent ⇒ Unit) = { imEvent ⇒
113 router ! ProcessIMEvent(imEvent)
116 val imDebugForwardAction: (IMEventStore#IMEvent ⇒ Unit) = { imEvent ⇒
117 logger.info("Forwarding {}", imEvent)
120 val rcHandler = new GenericPayloadHandler[ResourceEventModel, ResourceEventStore#ResourceEvent](
122 (payload, error) ⇒ LocalFSEventStore.storeUnparsedResourceEvent(configurator, payload, error),
124 rcEvent ⇒ resourceEventStore.insertResourceEvent(rcEvent),
128 val imHandler = new GenericPayloadHandler[IMEventModel, IMEventStore#IMEvent](
130 (payload, error) ⇒ LocalFSEventStore.storeUnparsedIMEvent(configurator, payload, error),
132 imEvent ⇒ imEventStore.insertIMEvent(imEvent),
136 val futureExecutor = new PayloadHandlerFutureExecutor
138 // (e)xchange:(r)outing key:(q)
140 // These two are to trigger an error if the property does not exist
141 locally(_props(RabbitMQConfKeys.rcevents_queues))
142 locally(_props(RabbitMQConfKeys.imevents_queues))
144 val all_rc_ERQs = _props.getTrimmedList(RabbitMQConfKeys.rcevents_queues)
146 val rcConsumerConfs_ = for(oneERQ ← all_rc_ERQs) yield {
147 RabbitMQService.makeRabbitMQConsumerConf(_props, oneERQ)
149 val rcConsumerConfs = rcConsumerConfs_.toSet.toList
150 if(rcConsumerConfs.size != rcConsumerConfs_.size) {
152 "Duplicate %s consumer info in %s=%s".format(
153 RabbitMQService.PropertiesPrefix,
154 RabbitMQConfKeys.rcevents_queues,
155 _props(RabbitMQConfKeys.rcevents_queues)))
158 val all_im_ERQs = _props.getTrimmedList(RabbitMQConfKeys.imevents_queues)
159 val imConsumerConfs_ = for(oneERQ ← all_im_ERQs) yield {
160 RabbitMQService.makeRabbitMQConsumerConf(_props, oneERQ)
162 val imConsumerConfs = imConsumerConfs_.toSet.toList
163 if(imConsumerConfs.size != imConsumerConfs_.size) {
165 "Duplicate %s consumer info in %s=%s".format(
166 RabbitMQService.PropertiesPrefix,
167 RabbitMQConfKeys.imevents_queues,
168 _props(RabbitMQConfKeys.imevents_queues)))
171 val rcConsumers = for(rccc ← rcConsumerConfs) yield {
172 logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
173 RabbitMQService.PropertiesPrefix,
178 new RabbitMQConsumer(rccc, rcHandler, futureExecutor)
181 val imConsumers = for(imcc ← imConsumerConfs) yield {
182 logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
183 RabbitMQService.PropertiesPrefix,
188 new RabbitMQConsumer(imcc, imHandler, futureExecutor)
191 this._consumers = rcConsumers ++ imConsumers
193 val lg: (String ⇒ Unit) = if(this._consumers.size == 0) logger.warn(_) else logger.debug(_)
194 lg("Got %s consumers".format(this._consumers.size))
196 this._consumers.foreach(logger.debug("Configured {}", _))
201 this._consumers.foreach(_.start())
203 for(consumer ← this._consumers) {
204 if(!consumer.isAlive()) {
205 logger.warn("Consumer not started yet {}", consumer.toDebugString)
213 for(consumer ← this._consumers) {
214 safeUnit(consumer.stop())
220 object RabbitMQService {
221 final val PropertiesPrefix = "rabbitmq"
222 final val PropertiesPrefixAndDot = PropertiesPrefix + "."
224 @inline private[this] def p(name: String) = PropertiesPrefixAndDot + name
226 final val DefaultExchangeConfArguments = Env()
228 final val DefaultExchangeConf = Env() +
229 (RabbitMQExchangeKeys.`type`, TopicExchange) +
230 (RabbitMQExchangeKeys.autoDelete, false) +
231 (RabbitMQExchangeKeys.durable, true) +
232 (RabbitMQExchangeKeys.arguments, DefaultExchangeConfArguments)
235 final val DefaultQueueConfArguments = Env() +
236 (RabbitMQQueueKeys.Args.xHAPolixy, "all")
238 final val DefaultQueueConf = Env() +
239 (RabbitMQQueueKeys.autoDelete, false) +
240 (RabbitMQQueueKeys.durable, true) +
241 (RabbitMQQueueKeys.exclusive, false) +
242 (RabbitMQQueueKeys.arguments, DefaultQueueConfArguments)
244 final val DefaultChannelConf = Env() +
245 (RabbitMQChannelKeys.qosPrefetchCount, 1) +
246 (RabbitMQChannelKeys.qosPrefetchSize, 0) +
247 (RabbitMQChannelKeys.qosGlobal, false)
249 def makeConnectionConf(props: Props) = {
250 val servers = props.getTrimmedList(RabbitMQConfKeys.servers)
251 val port = props.getIntEx(RabbitMQConfKeys.port)
252 val addresses = servers.map(new Address(_, port)).toArray
254 // TODO: Integrate the below RabbitMQConKeys and RabbitMQConfKeys
255 // TODO: Normally this means to get rid of Props and use Env everywhere.
256 // TODO: [Will be more type-safe anyway.]
258 (RabbitMQConKeys.username, props(RabbitMQConfKeys.username)) +
259 (RabbitMQConKeys.password, props(RabbitMQConfKeys.password)) +
260 (RabbitMQConKeys.vhost, props(RabbitMQConfKeys.vhost)) +
261 (RabbitMQConKeys.servers, addresses) +
262 (RabbitMQConKeys.reconnect_period_millis, props.getLongEx(RabbitMQConfKeys.reconnect_period_millis))
265 def makeRabbitMQConsumerConf(props: Props, oneERQ: String) = {
266 val Array(exchange, routing, queue) = oneERQ.split(':')
268 RabbitMQConsumerConf(
269 exchangeName = exchange,
270 routingKey = routing,
272 connectionConf = makeConnectionConf(props),
273 exchangeConf = DefaultExchangeConf,
274 channelConf = DefaultChannelConf,
275 queueConf = DefaultQueueConf
279 object RabbitMQConKeys {
280 final val username = StringKey(p("username"))
281 final val password = StringKey(p("password"))
282 final val vhost = StringKey(p("vhost"))
283 final val servers = ArrayKey[Address](p("servers"))
285 final val reconnect_period_millis = LongKey(p("reconnect.period.millis"))
289 * A [[com.ckkloverdos.key.TypedKey]] for the exchange type
291 final case object RabbitMQExchangeTypedKey extends TypedKeySkeleton[RabbitMQExchangeType]("type")
294 * Configuration keys for a `RabbitMQ` exchange.
296 * @author Christos KK Loverdos <loverdos@gmail.com>
298 object RabbitMQExchangeKeys {
299 final val `type` = RabbitMQExchangeTypedKey
300 final val durable = BooleanKey(p("durable"))
301 final val autoDelete = BooleanKey(p("autoDelete"))
302 final val arguments = EnvKey(p("arguments"))
306 * Configuration keys for a `RabbitMQ` exchange.
308 * @author Christos KK Loverdos <loverdos@gmail.com>
310 object RabbitMQQueueKeys {
311 final val durable = BooleanKey(p("durable"))
312 final val autoDelete = BooleanKey(p("autoDelete"))
313 final val exclusive = BooleanKey(p("exclusive"))
314 final val arguments = EnvKey(p("arguments"))
317 // http://www.rabbitmq.com/ha.html
318 // NOTE the exact name of the key (without using p()); this will be passed directly to RabbitMQ
319 final val xHAPolixy = StringKey("x-ha-policy")
324 * Configuration keys for a `RabbitMQ` channel.
326 * @author Christos KK Loverdos <loverdos@gmail.com>
328 object RabbitMQChannelKeys {
329 final val qosPrefetchCount = IntKey(p("qosPrefetchCount"))
330 final val qosPrefetchSize = IntKey(p("qosPrefetchSize"))
331 final val qosGlobal = BooleanKey(p("qosGlobal"))
334 object RabbitMQConfKeys {
336 * How often do we attemot a reconnection?
338 final val reconnect_period_millis = p("reconnect.period.millis")
341 * Comma separated list of AMQP servers running in active-active
344 final val servers = p("servers")
347 * Comma separated list of AMQP servers running in active-active
350 final val port = p("port")
353 * User name for connecting with the AMQP server
355 final val username = p("username")
358 * Password for connecting with the AMQP server
360 final val password = p("passwd")
363 * Virtual host on the AMQP server
365 final val vhost = p("vhost")
368 * Comma separated list of exchanges known to aquarium
369 * FIXME: What is this??
371 final val exchange = p("exchange")
374 * Queues for retrieving resource events from. Multiple queues can be
375 * declared, separated by comma.
377 * Format is `exchange:routing.key:queue-name,...`
379 final val rcevents_queues = p("rcevents.queues")
382 * Queues for retrieving user events from. Multiple queues can be
383 * declared, separated by comma.
385 * Format is `exchange:routing.key:queue-name,...`
387 final val imevents_queues = p("imevents.queues")