901d990c3bf9862aaccf69cd09253eae181a9b0e
[aquarium] / src / main / scala / gr / grnet / aquarium / connector / rabbitmq / service / RabbitMQService.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.connector.rabbitmq.service
37
38 import com.ckkloverdos.props.Props
39 import gr.grnet.aquarium.util.date.TimeHelpers
40 import gr.grnet.aquarium.util.{Loggable, Lifecycle}
41 import gr.grnet.aquarium.util.safeUnit
42 import com.rabbitmq.client.Address
43 import gr.grnet.aquarium.{Configurator, Configurable}
44 import gr.grnet.aquarium.connector.rabbitmq.conf.{TopicExchange, RabbitMQConsumerConf, RabbitMQExchangeType}
45 import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.RabbitMQConfKeys
46 import com.ckkloverdos.env.{EnvKey, Env}
47 import gr.grnet.aquarium.converter.{JsonTextFormat, StdConverters}
48 import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
49 import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
50 import com.ckkloverdos.maybe.MaybeEither
51 import gr.grnet.aquarium.actor.RouterRole
52 import gr.grnet.aquarium.actor.message.event.{ProcessIMEvent, ProcessResourceEvent}
53 import gr.grnet.aquarium.store.{LocalFSEventStore, IMEventStore, ResourceEventStore}
54 import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
55 import com.ckkloverdos.key.{LongKey, ArrayKey, IntKey, TypedKeySkeleton, BooleanKey, StringKey}
56
57 /**
58  *
59  * @author Christos KK Loverdos <loverdos@gmail.com>
60  */
61
62 class RabbitMQService extends Loggable with Lifecycle with Configurable {
63   @volatile private[this] var _props: Props = Props()(StdConverters.AllConverters)
64   @volatile private[this] var _consumers = List[RabbitMQConsumer]()
65
66   def propertyPrefix = Some(RabbitMQService.PropertiesPrefix)
67
68   def configurator = Configurator.MasterConfigurator
69
70   def eventBus = configurator.eventBus
71
72   def resourceEventStore = configurator.resourceEventStore
73
74   def imEventStore = configurator.imEventStore
75
76   def converters = configurator.converters
77
78   def router = configurator.actorProvider.actorForRole(RouterRole)
79
80   /**
81    * Configure this instance with the provided properties.
82    *
83    * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
84    */
85   def configure(props: Props) = {
86     this._props = props
87
88     doConfigure()
89   }
90
91   private[this] def doConfigure(): Unit = {
92     val jsonParser: (Array[Byte] ⇒ MaybeEither[JsonTextFormat]) = { payload ⇒
93       converters.convert[JsonTextFormat](payload)
94     }
95
96     val rcEventParser: (JsonTextFormat ⇒ ResourceEventModel) = { jsonTextFormat ⇒
97       StdResourceEvent.fromJsonTextFormat(jsonTextFormat)
98     }
99
100     val imEventParser: (JsonTextFormat ⇒ IMEventModel) = { jsonTextFormat ⇒
101       StdIMEvent.fromJsonTextFormat(jsonTextFormat)
102     }
103
104     val rcForwardAction: (ResourceEventStore#ResourceEvent ⇒ Unit) = { rcEvent ⇒
105       router ! ProcessResourceEvent(rcEvent)
106     }
107
108     val rcDebugForwardAction: (ResourceEventStore#ResourceEvent ⇒ Unit) = { rcEvent ⇒
109       logger.info("Forwarding {}", rcEvent)
110     }
111
112     val imForwardAction: (IMEventStore#IMEvent ⇒ Unit) = { imEvent ⇒
113       router ! ProcessIMEvent(imEvent)
114     }
115
116     val imDebugForwardAction: (IMEventStore#IMEvent ⇒ Unit) = { imEvent ⇒
117       logger.info("Forwarding {}", imEvent)
118     }
119
120     val rcHandler = new GenericPayloadHandler[ResourceEventModel, ResourceEventStore#ResourceEvent](
121       jsonParser,
122       (payload, error) ⇒ LocalFSEventStore.storeUnparsedResourceEvent(configurator, payload, error),
123       rcEventParser,
124       rcEvent ⇒ resourceEventStore.insertResourceEvent(rcEvent),
125       rcDebugForwardAction
126     )
127
128     val imHandler = new GenericPayloadHandler[IMEventModel, IMEventStore#IMEvent](
129       jsonParser,
130       (payload, error) ⇒ LocalFSEventStore.storeUnparsedIMEvent(configurator, payload, error),
131       imEventParser,
132       imEvent ⇒ imEventStore.insertIMEvent(imEvent),
133       imDebugForwardAction
134     )
135
136     val futureExecutor = new PayloadHandlerFutureExecutor
137
138     // (e)xchange:(r)outing key:(q)
139
140     // These two are to trigger an error if the property does not exist
141     locally(_props(RabbitMQConfKeys.rcevents_queues))
142     locally(_props(RabbitMQConfKeys.imevents_queues))
143
144     val all_rc_ERQs = _props.getTrimmedList(RabbitMQConfKeys.rcevents_queues)
145
146     val rcConsumerConfs_ = for(oneERQ ← all_rc_ERQs) yield {
147       RabbitMQService.makeRabbitMQConsumerConf(_props, oneERQ)
148     }
149     val rcConsumerConfs = rcConsumerConfs_.toSet.toList
150     if(rcConsumerConfs.size != rcConsumerConfs_.size) {
151       logger.warn(
152         "Duplicate %s consumer info in %s=%s".format(
153         RabbitMQService.PropertiesPrefix,
154         RabbitMQConfKeys.rcevents_queues,
155         _props(RabbitMQConfKeys.rcevents_queues)))
156     }
157
158     val all_im_ERQs = _props.getTrimmedList(RabbitMQConfKeys.imevents_queues)
159     val imConsumerConfs_ = for(oneERQ ← all_im_ERQs) yield {
160       RabbitMQService.makeRabbitMQConsumerConf(_props, oneERQ)
161     }
162     val imConsumerConfs = imConsumerConfs_.toSet.toList
163     if(imConsumerConfs.size != imConsumerConfs_.size) {
164       logger.warn(
165         "Duplicate %s consumer info in %s=%s".format(
166         RabbitMQService.PropertiesPrefix,
167         RabbitMQConfKeys.imevents_queues,
168         _props(RabbitMQConfKeys.imevents_queues)))
169     }
170
171     val rcConsumers = for(rccc ← rcConsumerConfs) yield {
172       logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
173         RabbitMQService.PropertiesPrefix,
174         rccc.exchangeName,
175         rccc.routingKey,
176         rccc.queueName
177       ))
178       new RabbitMQConsumer(rccc, rcHandler, futureExecutor)
179     }
180
181     val imConsumers = for(imcc ← imConsumerConfs) yield {
182       logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
183         RabbitMQService.PropertiesPrefix,
184         imcc.exchangeName,
185         imcc.routingKey,
186         imcc.queueName
187       ))
188       new RabbitMQConsumer(imcc, imHandler, futureExecutor)
189     }
190
191     this._consumers = rcConsumers ++ imConsumers
192
193     val lg: (String ⇒ Unit) = if(this._consumers.size == 0) logger.warn(_) else logger.debug(_)
194     lg("Got %s consumers".format(this._consumers.size))
195
196     this._consumers.foreach(logger.debug("Configured {}", _))
197   }
198
199   def start() = {
200     logStartingF("") {
201       this._consumers.foreach(_.start())
202
203       for(consumer ← this._consumers) {
204         if(!consumer.isAlive()) {
205           logger.warn("Consumer not started yet {}", consumer.toDebugString)
206         }
207       }
208     } {}
209   }
210
211   def stop() = {
212     logStoppingF("") {
213       for(consumer ← this._consumers) {
214         safeUnit(consumer.stop())
215       }
216     } {}
217   }
218 }
219
220 object RabbitMQService {
221   final val PropertiesPrefix       = "rabbitmq"
222   final val PropertiesPrefixAndDot = PropertiesPrefix + "."
223
224   @inline private[this] def p(name: String) = PropertiesPrefixAndDot + name
225
226   final val DefaultExchangeConfArguments = Env()
227
228   final val DefaultExchangeConf = Env() +
229     (RabbitMQExchangeKeys.`type`,     TopicExchange) +
230     (RabbitMQExchangeKeys.autoDelete, false)         +
231     (RabbitMQExchangeKeys.durable,    true)          +
232     (RabbitMQExchangeKeys.arguments,  DefaultExchangeConfArguments)
233
234
235   final val DefaultQueueConfArguments = Env() +
236     (RabbitMQQueueKeys.Args.xHAPolixy, "all")
237
238   final val DefaultQueueConf = Env() +
239     (RabbitMQQueueKeys.autoDelete, false)         +
240     (RabbitMQQueueKeys.durable,    true)          +
241     (RabbitMQQueueKeys.exclusive,  false)         +
242     (RabbitMQQueueKeys.arguments,  DefaultQueueConfArguments)
243
244   final val DefaultChannelConf = Env() +
245     (RabbitMQChannelKeys.qosPrefetchCount, 1) +
246     (RabbitMQChannelKeys.qosPrefetchSize,  0) +
247     (RabbitMQChannelKeys.qosGlobal,        false)
248
249   def makeConnectionConf(props: Props) = {
250     val servers = props.getTrimmedList(RabbitMQConfKeys.servers)
251     val port = props.getIntEx(RabbitMQConfKeys.port)
252     val addresses = servers.map(new Address(_, port)).toArray
253
254     // TODO: Integrate the below RabbitMQConKeys and RabbitMQConfKeys
255     // TODO:  Normally this means to get rid of Props and use Env everywhere.
256     // TODO:  [Will be more type-safe anyway.]
257     Env() +
258       (RabbitMQConKeys.username, props(RabbitMQConfKeys.username)) +
259       (RabbitMQConKeys.password, props(RabbitMQConfKeys.password)) +
260       (RabbitMQConKeys.vhost,    props(RabbitMQConfKeys.vhost))    +
261       (RabbitMQConKeys.servers,  addresses) +
262       (RabbitMQConKeys.reconnect_period_millis, props.getLongEx(RabbitMQConfKeys.reconnect_period_millis))
263   }
264
265   def makeRabbitMQConsumerConf(props: Props, oneERQ: String) = {
266     val Array(exchange, routing, queue) = oneERQ.split(':')
267
268     RabbitMQConsumerConf(
269       exchangeName   = exchange,
270       routingKey     = routing,
271       queueName      = queue,
272       connectionConf = makeConnectionConf(props),
273       exchangeConf   = DefaultExchangeConf,
274       channelConf    = DefaultChannelConf,
275       queueConf      = DefaultQueueConf
276     )
277   }
278
279   object RabbitMQConKeys {
280     final val username  = StringKey(p("username"))
281     final val password  = StringKey(p("password"))
282     final val vhost     = StringKey(p("vhost"))
283     final val servers   = ArrayKey[Address](p("servers"))
284
285     final val reconnect_period_millis = LongKey(p("reconnect.period.millis"))
286   }
287
288   /**
289    * A [[com.ckkloverdos.key.TypedKey]] for the exchange type
290    */
291   final case object RabbitMQExchangeTypedKey extends TypedKeySkeleton[RabbitMQExchangeType]("type")
292
293   /**
294    * Configuration keys for a `RabbitMQ` exchange.
295    *
296    * @author Christos KK Loverdos <loverdos@gmail.com>
297    */
298   object RabbitMQExchangeKeys {
299     final val `type`     = RabbitMQExchangeTypedKey
300     final val durable    = BooleanKey(p("durable"))
301     final val autoDelete = BooleanKey(p("autoDelete"))
302     final val arguments  = EnvKey(p("arguments"))
303   }
304
305   /**
306    * Configuration keys for a `RabbitMQ` exchange.
307    *
308    * @author Christos KK Loverdos <loverdos@gmail.com>
309    */
310   object RabbitMQQueueKeys {
311     final val durable    = BooleanKey(p("durable"))
312     final val autoDelete = BooleanKey(p("autoDelete"))
313     final val exclusive  = BooleanKey(p("exclusive"))
314     final val arguments  = EnvKey(p("arguments"))
315
316     object Args {
317       // http://www.rabbitmq.com/ha.html
318       // NOTE the exact name of the key (without using p()); this will be passed directly to RabbitMQ
319       final val xHAPolixy = StringKey("x-ha-policy")
320     }
321   }
322
323   /**
324    * Configuration keys for a `RabbitMQ` channel.
325    *
326    * @author Christos KK Loverdos <loverdos@gmail.com>
327    */
328   object RabbitMQChannelKeys {
329     final val qosPrefetchCount = IntKey(p("qosPrefetchCount"))
330     final val qosPrefetchSize  = IntKey(p("qosPrefetchSize"))
331     final val qosGlobal        = BooleanKey(p("qosGlobal"))
332   }
333
334   object RabbitMQConfKeys {
335     /**
336      * How often do we attemot a reconnection?
337      */
338     final val reconnect_period_millis = p("reconnect.period.millis")
339
340     /**
341      * Comma separated list of AMQP servers running in active-active
342      * configuration.
343      */
344     final val servers = p("servers")
345
346     /**
347      * Comma separated list of AMQP servers running in active-active
348      * configuration.
349      */
350     final val port = p("port")
351
352     /**
353      * User name for connecting with the AMQP server
354      */
355     final val username = p("username")
356
357     /**
358      * Password for connecting with the AMQP server
359      */
360     final val password = p("passwd")
361
362     /**
363      * Virtual host on the AMQP server
364      */
365     final val vhost = p("vhost")
366
367     /**
368      * Comma separated list of exchanges known to aquarium
369      * FIXME: What is this??
370      */
371     final val exchange = p("exchange")
372
373     /**
374      * Queues for retrieving resource events from. Multiple queues can be
375      * declared, separated by comma.
376      *
377      * Format is `exchange:routing.key:queue-name,...`
378      */
379     final val rcevents_queues = p("rcevents.queues")
380
381     /**
382      * Queues for retrieving user events from. Multiple queues can be
383      * declared, separated by comma.
384      *
385      * Format is `exchange:routing.key:queue-name,...`
386      */
387     final val imevents_queues = p("imevents.queues")
388   }
389 }