2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.connector.rabbitmq.service
38 import com.ckkloverdos.props.Props
39 import gr.grnet.aquarium.util.{Loggable, Lifecycle}
40 import gr.grnet.aquarium.util.safeUnit
41 import com.rabbitmq.client.Address
42 import gr.grnet.aquarium.{Aquarium, Configurable}
43 import gr.grnet.aquarium.connector.rabbitmq.conf.{TopicExchange, RabbitMQConsumerConf, RabbitMQExchangeType}
44 import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.RabbitMQConfKeys
45 import com.ckkloverdos.env.{EnvKey, Env}
46 import gr.grnet.aquarium.converter.{JsonTextFormat, StdConverters}
47 import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
48 import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
49 import com.ckkloverdos.maybe.MaybeEither
50 import gr.grnet.aquarium.actor.RouterRole
51 import gr.grnet.aquarium.actor.message.event.{ProcessIMEvent, ProcessResourceEvent}
52 import gr.grnet.aquarium.store.{LocalFSEventStore, IMEventStore, ResourceEventStore}
53 import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
54 import com.ckkloverdos.key.{LongKey, ArrayKey, IntKey, TypedKeySkeleton, BooleanKey, StringKey}
58 * @author Christos KK Loverdos <loverdos@gmail.com>
61 class RabbitMQService extends Loggable with Lifecycle with Configurable {
62 @volatile private[this] var _props: Props = Props()(StdConverters.AllConverters)
63 @volatile private[this] var _consumers = List[RabbitMQConsumer]()
65 def propertyPrefix = Some(RabbitMQService.PropertiesPrefix)
67 def aquarium = Aquarium.Instance
69 def eventBus = aquarium.eventBus
71 def resourceEventStore = aquarium.resourceEventStore
73 def imEventStore = aquarium.imEventStore
75 def converters = aquarium.converters
77 def router = aquarium.actorProvider.actorForRole(RouterRole)
80 * Configure this instance with the provided properties.
82 * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
84 def configure(props: Props) = {
90 private[this] def doConfigure(): Unit = {
91 val jsonParser: (Array[Byte] ⇒ MaybeEither[JsonTextFormat]) = { payload ⇒
92 converters.convert[JsonTextFormat](payload)
95 val rcEventParser: (JsonTextFormat ⇒ ResourceEventModel) = { jsonTextFormat ⇒
96 StdResourceEvent.fromJsonTextFormat(jsonTextFormat)
99 val imEventParser: (JsonTextFormat ⇒ IMEventModel) = { jsonTextFormat ⇒
100 StdIMEvent.fromJsonTextFormat(jsonTextFormat)
103 val rcForwardAction: (ResourceEventStore#ResourceEvent ⇒ Unit) = { rcEvent ⇒
104 router ! ProcessResourceEvent(rcEvent)
107 val rcDebugForwardAction: (ResourceEventStore#ResourceEvent ⇒ Unit) = { rcEvent ⇒
108 logger.info("Forwarding {}", rcEvent)
111 val imForwardAction: (IMEventStore#IMEvent ⇒ Unit) = { imEvent ⇒
112 router ! ProcessIMEvent(imEvent)
115 val imDebugForwardAction: (IMEventStore#IMEvent ⇒ Unit) = { imEvent ⇒
116 logger.info("Forwarding {}", imEvent)
119 val rcHandler = new GenericPayloadHandler[ResourceEventModel, ResourceEventStore#ResourceEvent](
121 (payload, error) ⇒ LocalFSEventStore.storeUnparsedResourceEvent(aquarium, payload, error),
123 rcEvent ⇒ resourceEventStore.insertResourceEvent(rcEvent),
127 val imHandler = new GenericPayloadHandler[IMEventModel, IMEventStore#IMEvent](
129 (payload, error) ⇒ LocalFSEventStore.storeUnparsedIMEvent(aquarium, payload, error),
131 imEvent ⇒ imEventStore.insertIMEvent(imEvent),
135 val futureExecutor = new PayloadHandlerFutureExecutor
137 // (e)xchange:(r)outing key:(q)
139 // These two are to trigger an error if the property does not exist
140 locally(_props(RabbitMQConfKeys.rcevents_queues))
141 locally(_props(RabbitMQConfKeys.imevents_queues))
143 val all_rc_ERQs = _props.getTrimmedList(RabbitMQConfKeys.rcevents_queues)
145 val rcConsumerConfs_ = for(oneERQ ← all_rc_ERQs) yield {
146 RabbitMQService.makeRabbitMQConsumerConf(_props, oneERQ)
148 val rcConsumerConfs = rcConsumerConfs_.toSet.toList
149 if(rcConsumerConfs.size != rcConsumerConfs_.size) {
151 "Duplicate %s consumer info in %s=%s".format(
152 RabbitMQService.PropertiesPrefix,
153 RabbitMQConfKeys.rcevents_queues,
154 _props(RabbitMQConfKeys.rcevents_queues)))
157 val all_im_ERQs = _props.getTrimmedList(RabbitMQConfKeys.imevents_queues)
158 val imConsumerConfs_ = for(oneERQ ← all_im_ERQs) yield {
159 RabbitMQService.makeRabbitMQConsumerConf(_props, oneERQ)
161 val imConsumerConfs = imConsumerConfs_.toSet.toList
162 if(imConsumerConfs.size != imConsumerConfs_.size) {
164 "Duplicate %s consumer info in %s=%s".format(
165 RabbitMQService.PropertiesPrefix,
166 RabbitMQConfKeys.imevents_queues,
167 _props(RabbitMQConfKeys.imevents_queues)))
170 val rcConsumers = for(rccc ← rcConsumerConfs) yield {
171 logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
172 RabbitMQService.PropertiesPrefix,
177 new RabbitMQConsumer(rccc, rcHandler, futureExecutor)
180 val imConsumers = for(imcc ← imConsumerConfs) yield {
181 logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
182 RabbitMQService.PropertiesPrefix,
187 new RabbitMQConsumer(imcc, imHandler, futureExecutor)
190 this._consumers = rcConsumers ++ imConsumers
192 val lg: (String ⇒ Unit) = if(this._consumers.size == 0) logger.warn(_) else logger.debug(_)
193 lg("Got %s consumers".format(this._consumers.size))
195 this._consumers.foreach(logger.debug("Configured {}", _))
200 this._consumers.foreach(_.start())
202 for(consumer ← this._consumers) {
203 if(!consumer.isAlive()) {
204 logger.warn("Consumer not started yet {}", consumer.toDebugString)
212 for(consumer ← this._consumers) {
213 safeUnit(consumer.stop())
219 object RabbitMQService {
220 final val PropertiesPrefix = "rabbitmq"
221 final val PropertiesPrefixAndDot = PropertiesPrefix + "."
223 @inline private[this] def p(name: String) = PropertiesPrefixAndDot + name
225 final val DefaultExchangeConfArguments = Env()
227 final val DefaultExchangeConf = Env() +
228 (RabbitMQExchangeKeys.`type`, TopicExchange) +
229 (RabbitMQExchangeKeys.autoDelete, false) +
230 (RabbitMQExchangeKeys.durable, true) +
231 (RabbitMQExchangeKeys.arguments, DefaultExchangeConfArguments)
234 final val DefaultQueueConfArguments = Env() +
235 (RabbitMQQueueKeys.Args.xHAPolixy, "all")
237 final val DefaultQueueConf = Env() +
238 (RabbitMQQueueKeys.autoDelete, false) +
239 (RabbitMQQueueKeys.durable, true) +
240 (RabbitMQQueueKeys.exclusive, false) +
241 (RabbitMQQueueKeys.arguments, DefaultQueueConfArguments)
243 final val DefaultChannelConf = Env() +
244 (RabbitMQChannelKeys.qosPrefetchCount, 1) +
245 (RabbitMQChannelKeys.qosPrefetchSize, 0) +
246 (RabbitMQChannelKeys.qosGlobal, false)
248 def makeConnectionConf(props: Props) = {
249 val servers = props.getTrimmedList(RabbitMQConfKeys.servers)
250 val port = props.getIntEx(RabbitMQConfKeys.port)
251 val addresses = servers.map(new Address(_, port)).toArray
253 // TODO: Integrate the below RabbitMQConKeys and RabbitMQConfKeys
254 // TODO: Normally this means to get rid of Props and use Env everywhere.
255 // TODO: [Will be more type-safe anyway.]
257 (RabbitMQConKeys.username, props(RabbitMQConfKeys.username)) +
258 (RabbitMQConKeys.password, props(RabbitMQConfKeys.password)) +
259 (RabbitMQConKeys.vhost, props(RabbitMQConfKeys.vhost)) +
260 (RabbitMQConKeys.servers, addresses) +
261 (RabbitMQConKeys.reconnect_period_millis, props.getLongEx(RabbitMQConfKeys.reconnect_period_millis))
264 def makeRabbitMQConsumerConf(props: Props, oneERQ: String) = {
265 val Array(exchange, routing, queue) = oneERQ.split(':')
267 RabbitMQConsumerConf(
268 exchangeName = exchange,
269 routingKey = routing,
271 connectionConf = makeConnectionConf(props),
272 exchangeConf = DefaultExchangeConf,
273 channelConf = DefaultChannelConf,
274 queueConf = DefaultQueueConf
278 object RabbitMQConKeys {
279 final val username = StringKey(p("username"))
280 final val password = StringKey(p("password"))
281 final val vhost = StringKey(p("vhost"))
282 final val servers = ArrayKey[Address](p("servers"))
284 final val reconnect_period_millis = LongKey(p("reconnect.period.millis"))
288 * A [[com.ckkloverdos.key.TypedKey]] for the exchange type
290 final case object RabbitMQExchangeTypedKey extends TypedKeySkeleton[RabbitMQExchangeType]("type")
293 * Configuration keys for a `RabbitMQ` exchange.
295 * @author Christos KK Loverdos <loverdos@gmail.com>
297 object RabbitMQExchangeKeys {
298 final val `type` = RabbitMQExchangeTypedKey
299 final val durable = BooleanKey(p("durable"))
300 final val autoDelete = BooleanKey(p("autoDelete"))
301 final val arguments = EnvKey(p("arguments"))
305 * Configuration keys for a `RabbitMQ` exchange.
307 * @author Christos KK Loverdos <loverdos@gmail.com>
309 object RabbitMQQueueKeys {
310 final val durable = BooleanKey(p("durable"))
311 final val autoDelete = BooleanKey(p("autoDelete"))
312 final val exclusive = BooleanKey(p("exclusive"))
313 final val arguments = EnvKey(p("arguments"))
316 // http://www.rabbitmq.com/ha.html
317 // NOTE the exact name of the key (without using p()); this will be passed directly to RabbitMQ
318 final val xHAPolixy = StringKey("x-ha-policy")
323 * Configuration keys for a `RabbitMQ` channel.
325 * @author Christos KK Loverdos <loverdos@gmail.com>
327 object RabbitMQChannelKeys {
328 final val qosPrefetchCount = IntKey(p("qosPrefetchCount"))
329 final val qosPrefetchSize = IntKey(p("qosPrefetchSize"))
330 final val qosGlobal = BooleanKey(p("qosGlobal"))
333 object RabbitMQConfKeys {
335 * How often do we attemot a reconnection?
337 final val reconnect_period_millis = p("reconnect.period.millis")
340 * Comma separated list of AMQP servers running in active-active
343 final val servers = p("servers")
346 * Comma separated list of AMQP servers running in active-active
349 final val port = p("port")
352 * User name for connecting with the AMQP server
354 final val username = p("username")
357 * Password for connecting with the AMQP server
359 final val password = p("passwd")
362 * Virtual host on the AMQP server
364 final val vhost = p("vhost")
367 * Comma separated list of exchanges known to aquarium
368 * FIXME: What is this??
370 final val exchange = p("exchange")
373 * Queues for retrieving resource events from. Multiple queues can be
374 * declared, separated by comma.
376 * Format is `exchange:routing.key:queue-name,...`
378 final val rcevents_queues = p("rcevents.queues")
381 * Queues for retrieving user events from. Multiple queues can be
382 * declared, separated by comma.
384 * Format is `exchange:routing.key:queue-name,...`
386 final val imevents_queues = p("imevents.queues")