Rename Configurator to Aquarium
[aquarium] / src / main / scala / gr / grnet / aquarium / connector / rabbitmq / service / RabbitMQService.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.connector.rabbitmq.service
37
38 import com.ckkloverdos.props.Props
39 import gr.grnet.aquarium.util.{Loggable, Lifecycle}
40 import gr.grnet.aquarium.util.safeUnit
41 import com.rabbitmq.client.Address
42 import gr.grnet.aquarium.{Aquarium, Configurable}
43 import gr.grnet.aquarium.connector.rabbitmq.conf.{TopicExchange, RabbitMQConsumerConf, RabbitMQExchangeType}
44 import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.RabbitMQConfKeys
45 import com.ckkloverdos.env.{EnvKey, Env}
46 import gr.grnet.aquarium.converter.{JsonTextFormat, StdConverters}
47 import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
48 import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
49 import com.ckkloverdos.maybe.MaybeEither
50 import gr.grnet.aquarium.actor.RouterRole
51 import gr.grnet.aquarium.actor.message.event.{ProcessIMEvent, ProcessResourceEvent}
52 import gr.grnet.aquarium.store.{LocalFSEventStore, IMEventStore, ResourceEventStore}
53 import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
54 import com.ckkloverdos.key.{LongKey, ArrayKey, IntKey, TypedKeySkeleton, BooleanKey, StringKey}
55
56 /**
57  *
58  * @author Christos KK Loverdos <loverdos@gmail.com>
59  */
60
61 class RabbitMQService extends Loggable with Lifecycle with Configurable {
62   @volatile private[this] var _props: Props = Props()(StdConverters.AllConverters)
63   @volatile private[this] var _consumers = List[RabbitMQConsumer]()
64
65   def propertyPrefix = Some(RabbitMQService.PropertiesPrefix)
66
67   def aquarium = Aquarium.Instance
68
69   def eventBus = aquarium.eventBus
70
71   def resourceEventStore = aquarium.resourceEventStore
72
73   def imEventStore = aquarium.imEventStore
74
75   def converters = aquarium.converters
76
77   def router = aquarium.actorProvider.actorForRole(RouterRole)
78
79   /**
80    * Configure this instance with the provided properties.
81    *
82    * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
83    */
84   def configure(props: Props) = {
85     this._props = props
86
87     doConfigure()
88   }
89
90   private[this] def doConfigure(): Unit = {
91     val jsonParser: (Array[Byte] ⇒ MaybeEither[JsonTextFormat]) = { payload ⇒
92       converters.convert[JsonTextFormat](payload)
93     }
94
95     val rcEventParser: (JsonTextFormat ⇒ ResourceEventModel) = { jsonTextFormat ⇒
96       StdResourceEvent.fromJsonTextFormat(jsonTextFormat)
97     }
98
99     val imEventParser: (JsonTextFormat ⇒ IMEventModel) = { jsonTextFormat ⇒
100       StdIMEvent.fromJsonTextFormat(jsonTextFormat)
101     }
102
103     val rcForwardAction: (ResourceEventStore#ResourceEvent ⇒ Unit) = { rcEvent ⇒
104       router ! ProcessResourceEvent(rcEvent)
105     }
106
107     val rcDebugForwardAction: (ResourceEventStore#ResourceEvent ⇒ Unit) = { rcEvent ⇒
108       logger.info("Forwarding {}", rcEvent)
109     }
110
111     val imForwardAction: (IMEventStore#IMEvent ⇒ Unit) = { imEvent ⇒
112       router ! ProcessIMEvent(imEvent)
113     }
114
115     val imDebugForwardAction: (IMEventStore#IMEvent ⇒ Unit) = { imEvent ⇒
116       logger.info("Forwarding {}", imEvent)
117     }
118
119     val rcHandler = new GenericPayloadHandler[ResourceEventModel, ResourceEventStore#ResourceEvent](
120       jsonParser,
121       (payload, error) ⇒ LocalFSEventStore.storeUnparsedResourceEvent(aquarium, payload, error),
122       rcEventParser,
123       rcEvent ⇒ resourceEventStore.insertResourceEvent(rcEvent),
124       rcDebugForwardAction
125     )
126
127     val imHandler = new GenericPayloadHandler[IMEventModel, IMEventStore#IMEvent](
128       jsonParser,
129       (payload, error) ⇒ LocalFSEventStore.storeUnparsedIMEvent(aquarium, payload, error),
130       imEventParser,
131       imEvent ⇒ imEventStore.insertIMEvent(imEvent),
132       imDebugForwardAction
133     )
134
135     val futureExecutor = new PayloadHandlerFutureExecutor
136
137     // (e)xchange:(r)outing key:(q)
138
139     // These two are to trigger an error if the property does not exist
140     locally(_props(RabbitMQConfKeys.rcevents_queues))
141     locally(_props(RabbitMQConfKeys.imevents_queues))
142
143     val all_rc_ERQs = _props.getTrimmedList(RabbitMQConfKeys.rcevents_queues)
144
145     val rcConsumerConfs_ = for(oneERQ ← all_rc_ERQs) yield {
146       RabbitMQService.makeRabbitMQConsumerConf(_props, oneERQ)
147     }
148     val rcConsumerConfs = rcConsumerConfs_.toSet.toList
149     if(rcConsumerConfs.size != rcConsumerConfs_.size) {
150       logger.warn(
151         "Duplicate %s consumer info in %s=%s".format(
152         RabbitMQService.PropertiesPrefix,
153         RabbitMQConfKeys.rcevents_queues,
154         _props(RabbitMQConfKeys.rcevents_queues)))
155     }
156
157     val all_im_ERQs = _props.getTrimmedList(RabbitMQConfKeys.imevents_queues)
158     val imConsumerConfs_ = for(oneERQ ← all_im_ERQs) yield {
159       RabbitMQService.makeRabbitMQConsumerConf(_props, oneERQ)
160     }
161     val imConsumerConfs = imConsumerConfs_.toSet.toList
162     if(imConsumerConfs.size != imConsumerConfs_.size) {
163       logger.warn(
164         "Duplicate %s consumer info in %s=%s".format(
165         RabbitMQService.PropertiesPrefix,
166         RabbitMQConfKeys.imevents_queues,
167         _props(RabbitMQConfKeys.imevents_queues)))
168     }
169
170     val rcConsumers = for(rccc ← rcConsumerConfs) yield {
171       logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
172         RabbitMQService.PropertiesPrefix,
173         rccc.exchangeName,
174         rccc.routingKey,
175         rccc.queueName
176       ))
177       new RabbitMQConsumer(rccc, rcHandler, futureExecutor)
178     }
179
180     val imConsumers = for(imcc ← imConsumerConfs) yield {
181       logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
182         RabbitMQService.PropertiesPrefix,
183         imcc.exchangeName,
184         imcc.routingKey,
185         imcc.queueName
186       ))
187       new RabbitMQConsumer(imcc, imHandler, futureExecutor)
188     }
189
190     this._consumers = rcConsumers ++ imConsumers
191
192     val lg: (String ⇒ Unit) = if(this._consumers.size == 0) logger.warn(_) else logger.debug(_)
193     lg("Got %s consumers".format(this._consumers.size))
194
195     this._consumers.foreach(logger.debug("Configured {}", _))
196   }
197
198   def start() = {
199     logStartingF("") {
200       this._consumers.foreach(_.start())
201
202       for(consumer ← this._consumers) {
203         if(!consumer.isAlive()) {
204           logger.warn("Consumer not started yet {}", consumer.toDebugString)
205         }
206       }
207     } {}
208   }
209
210   def stop() = {
211     logStoppingF("") {
212       for(consumer ← this._consumers) {
213         safeUnit(consumer.stop())
214       }
215     } {}
216   }
217 }
218
219 object RabbitMQService {
220   final val PropertiesPrefix       = "rabbitmq"
221   final val PropertiesPrefixAndDot = PropertiesPrefix + "."
222
223   @inline private[this] def p(name: String) = PropertiesPrefixAndDot + name
224
225   final val DefaultExchangeConfArguments = Env()
226
227   final val DefaultExchangeConf = Env() +
228     (RabbitMQExchangeKeys.`type`,     TopicExchange) +
229     (RabbitMQExchangeKeys.autoDelete, false)         +
230     (RabbitMQExchangeKeys.durable,    true)          +
231     (RabbitMQExchangeKeys.arguments,  DefaultExchangeConfArguments)
232
233
234   final val DefaultQueueConfArguments = Env() +
235     (RabbitMQQueueKeys.Args.xHAPolixy, "all")
236
237   final val DefaultQueueConf = Env() +
238     (RabbitMQQueueKeys.autoDelete, false)         +
239     (RabbitMQQueueKeys.durable,    true)          +
240     (RabbitMQQueueKeys.exclusive,  false)         +
241     (RabbitMQQueueKeys.arguments,  DefaultQueueConfArguments)
242
243   final val DefaultChannelConf = Env() +
244     (RabbitMQChannelKeys.qosPrefetchCount, 1) +
245     (RabbitMQChannelKeys.qosPrefetchSize,  0) +
246     (RabbitMQChannelKeys.qosGlobal,        false)
247
248   def makeConnectionConf(props: Props) = {
249     val servers = props.getTrimmedList(RabbitMQConfKeys.servers)
250     val port = props.getIntEx(RabbitMQConfKeys.port)
251     val addresses = servers.map(new Address(_, port)).toArray
252
253     // TODO: Integrate the below RabbitMQConKeys and RabbitMQConfKeys
254     // TODO:  Normally this means to get rid of Props and use Env everywhere.
255     // TODO:  [Will be more type-safe anyway.]
256     Env() +
257       (RabbitMQConKeys.username, props(RabbitMQConfKeys.username)) +
258       (RabbitMQConKeys.password, props(RabbitMQConfKeys.password)) +
259       (RabbitMQConKeys.vhost,    props(RabbitMQConfKeys.vhost))    +
260       (RabbitMQConKeys.servers,  addresses) +
261       (RabbitMQConKeys.reconnect_period_millis, props.getLongEx(RabbitMQConfKeys.reconnect_period_millis))
262   }
263
264   def makeRabbitMQConsumerConf(props: Props, oneERQ: String) = {
265     val Array(exchange, routing, queue) = oneERQ.split(':')
266
267     RabbitMQConsumerConf(
268       exchangeName   = exchange,
269       routingKey     = routing,
270       queueName      = queue,
271       connectionConf = makeConnectionConf(props),
272       exchangeConf   = DefaultExchangeConf,
273       channelConf    = DefaultChannelConf,
274       queueConf      = DefaultQueueConf
275     )
276   }
277
278   object RabbitMQConKeys {
279     final val username  = StringKey(p("username"))
280     final val password  = StringKey(p("password"))
281     final val vhost     = StringKey(p("vhost"))
282     final val servers   = ArrayKey[Address](p("servers"))
283
284     final val reconnect_period_millis = LongKey(p("reconnect.period.millis"))
285   }
286
287   /**
288    * A [[com.ckkloverdos.key.TypedKey]] for the exchange type
289    */
290   final case object RabbitMQExchangeTypedKey extends TypedKeySkeleton[RabbitMQExchangeType]("type")
291
292   /**
293    * Configuration keys for a `RabbitMQ` exchange.
294    *
295    * @author Christos KK Loverdos <loverdos@gmail.com>
296    */
297   object RabbitMQExchangeKeys {
298     final val `type`     = RabbitMQExchangeTypedKey
299     final val durable    = BooleanKey(p("durable"))
300     final val autoDelete = BooleanKey(p("autoDelete"))
301     final val arguments  = EnvKey(p("arguments"))
302   }
303
304   /**
305    * Configuration keys for a `RabbitMQ` exchange.
306    *
307    * @author Christos KK Loverdos <loverdos@gmail.com>
308    */
309   object RabbitMQQueueKeys {
310     final val durable    = BooleanKey(p("durable"))
311     final val autoDelete = BooleanKey(p("autoDelete"))
312     final val exclusive  = BooleanKey(p("exclusive"))
313     final val arguments  = EnvKey(p("arguments"))
314
315     object Args {
316       // http://www.rabbitmq.com/ha.html
317       // NOTE the exact name of the key (without using p()); this will be passed directly to RabbitMQ
318       final val xHAPolixy = StringKey("x-ha-policy")
319     }
320   }
321
322   /**
323    * Configuration keys for a `RabbitMQ` channel.
324    *
325    * @author Christos KK Loverdos <loverdos@gmail.com>
326    */
327   object RabbitMQChannelKeys {
328     final val qosPrefetchCount = IntKey(p("qosPrefetchCount"))
329     final val qosPrefetchSize  = IntKey(p("qosPrefetchSize"))
330     final val qosGlobal        = BooleanKey(p("qosGlobal"))
331   }
332
333   object RabbitMQConfKeys {
334     /**
335      * How often do we attemot a reconnection?
336      */
337     final val reconnect_period_millis = p("reconnect.period.millis")
338
339     /**
340      * Comma separated list of AMQP servers running in active-active
341      * configuration.
342      */
343     final val servers = p("servers")
344
345     /**
346      * Comma separated list of AMQP servers running in active-active
347      * configuration.
348      */
349     final val port = p("port")
350
351     /**
352      * User name for connecting with the AMQP server
353      */
354     final val username = p("username")
355
356     /**
357      * Password for connecting with the AMQP server
358      */
359     final val password = p("passwd")
360
361     /**
362      * Virtual host on the AMQP server
363      */
364     final val vhost = p("vhost")
365
366     /**
367      * Comma separated list of exchanges known to aquarium
368      * FIXME: What is this??
369      */
370     final val exchange = p("exchange")
371
372     /**
373      * Queues for retrieving resource events from. Multiple queues can be
374      * declared, separated by comma.
375      *
376      * Format is `exchange:routing.key:queue-name,...`
377      */
378     final val rcevents_queues = p("rcevents.queues")
379
380     /**
381      * Queues for retrieving user events from. Multiple queues can be
382      * declared, separated by comma.
383      *
384      * Format is `exchange:routing.key:queue-name,...`
385      */
386     final val imevents_queues = p("imevents.queues")
387   }
388 }