7a77700aa9143721456b740fa3052526b91134e7
[aquarium] / src / main / scala / gr / grnet / aquarium / connector / rabbitmq / service / RabbitMQService.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.connector.rabbitmq.service
37
38 import com.ckkloverdos.props.Props
39 import gr.grnet.aquarium.util.date.TimeHelpers
40 import gr.grnet.aquarium.util.{ReflectHelpers, Loggable, Lifecycle}
41 import com.rabbitmq.client.{ConnectionFactory, Address}
42 import gr.grnet.aquarium.{Configurator, Configurable}
43 import gr.grnet.aquarium.connector.rabbitmq.eventbus.RabbitMQError
44 import gr.grnet.aquarium.connector.rabbitmq.conf.{TopicExchange, RabbitMQConsumerConf, RabbitMQExchangeType}
45 import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.RabbitMQConfKeys
46 import com.ckkloverdos.key.{ArrayKey, IntKey, TypedKeySkeleton, BooleanKey, StringKey}
47 import com.ckkloverdos.env.{EnvKey, Env}
48 import gr.grnet.aquarium.converter.{JsonTextFormat, StdConverters}
49 import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
50 import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
51 import com.ckkloverdos.maybe.{MaybeEither, Failed, Just, Maybe}
52 import gr.grnet.aquarium.connector.handler.{HandlerResultSuccess, HandlerResultPanic, HandlerResultReject, HandlerResult, PayloadHandler}
53 import gr.grnet.aquarium.actor.RouterRole
54 import gr.grnet.aquarium.actor.message.event.{ProcessIMEvent, ProcessResourceEvent}
55 import gr.grnet.aquarium.store.{LocalFSEventStore, IMEventStore, ResourceEventStore}
56 import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
57
58 /**
59  *
60  * @author Christos KK Loverdos <loverdos@gmail.com>
61  */
62
63 class RabbitMQService extends Loggable with Lifecycle with Configurable {
64   private[this] val props: Props = Props()(StdConverters.AllConverters)
65
66   def propertyPrefix = Some(RabbitMQService.PropertiesPrefix)
67
68   def configurator = Configurator.MasterConfigurator
69
70   def eventBus = configurator.eventBus
71
72   def resourceEventStore = configurator.resourceEventStore
73
74   def imEventStore = configurator.imEventStore
75
76   def converters = configurator.converters
77
78   def router = configurator.actorProvider.actorForRole(RouterRole)
79
80   /**
81    * Configure this instance with the provided properties.
82    *
83    * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
84    */
85   def configure(props: Props) = {
86     ReflectHelpers.setField(this, "props", props)
87
88     try {
89       doConfigure()
90       logger.info("Configured with {}", this.props)
91     } catch {
92       case e: Exception ⇒
93       // If we have no internal error, then something is bad with RabbitMQ
94       eventBus ! RabbitMQError(e)
95       throw e
96     }
97   }
98
99   private[this] def doConfigure(): Unit = {
100     val jsonParser: (Array[Byte] ⇒ MaybeEither[JsonTextFormat]) = { payload ⇒
101       converters.convert[JsonTextFormat](payload)
102     }
103
104     val rcEventParser: (JsonTextFormat ⇒ ResourceEventModel) = { jsonTextFormat ⇒
105       StdResourceEvent.fromJsonTextFormat(jsonTextFormat)
106     }
107
108     val imEventParser: (JsonTextFormat ⇒ IMEventModel) = { jsonTextFormat ⇒
109       StdIMEvent.fromJsonTextFormat(jsonTextFormat)
110     }
111
112     val rcHandler = new GenericPayloadHandler[ResourceEventModel, ResourceEventStore#ResourceEvent](
113       jsonParser,
114       (payload, error) ⇒ LocalFSEventStore.storeUnparsedResourceEvent(configurator, payload, error),
115       rcEventParser,
116       rcEvent ⇒ resourceEventStore.insertResourceEvent(rcEvent),
117       rcEvent ⇒ router ! ProcessResourceEvent(rcEvent)
118     )
119
120     val imHandler = new GenericPayloadHandler[IMEventModel, IMEventStore#IMEvent](
121       jsonParser,
122       (payload, error) ⇒ LocalFSEventStore.storeUnparsedIMEvent(configurator, payload, error),
123       imEventParser,
124       imEvent ⇒ imEventStore.insertIMEvent(imEvent),
125       imEvent ⇒ router ! ProcessIMEvent(imEvent)
126     )
127
128     // (e)xchange:(r)outing key:(q)
129     val all_rc_ERQs = props.getTrimmedList(RabbitMQConfKeys.rcevents_queues)
130     val rcConsumerConfs = for(oneERQ ← all_rc_ERQs) yield {
131       RabbitMQService.makeRabbitMQConsumerConf(props, oneERQ)
132     }
133
134     val all_im_ERQs = props.getTrimmedList(RabbitMQConfKeys.imevents_queues)
135     val imConsumerConfs = for(oneERQ ← all_im_ERQs) yield {
136       RabbitMQService.makeRabbitMQConsumerConf(props, oneERQ)
137     }
138
139     val rcConsumers = for(rccc ← rcConsumerConfs) yield {
140       new RabbitMQConsumer(rccc, rcHandler)
141     }
142
143     val imConsumers = for(imcc ← imConsumerConfs) yield {
144       new RabbitMQConsumer(imcc, imHandler)
145     }
146   }
147
148   def start() = {
149     logStarted(TimeHelpers.nowMillis(), TimeHelpers.nowMillis())
150     System.exit(1)
151   }
152
153   def stop() = {
154     logStopped(TimeHelpers.nowMillis(), TimeHelpers.nowMillis())
155   }
156
157 }
158
159 object RabbitMQService {
160   final val PropertiesPrefix       = "rabbitmq"
161   final val PropertiesPrefixAndDot = PropertiesPrefix + "."
162
163   @inline private[this] def p(name: String) = PropertiesPrefix + name
164
165   final val DefaultExchangeConfArguments = Env()
166
167   final val DefaultExchangeConf = Env() +
168     (RabbitMQExchangeKeys.`type`,     TopicExchange) +
169     (RabbitMQExchangeKeys.autoDelete, false)         +
170     (RabbitMQExchangeKeys.durable,    true)          +
171     (RabbitMQExchangeKeys.arguments,  DefaultExchangeConfArguments)
172
173
174   final val DefaultQueueConfArguments = Env() +
175     (RabbitMQQueueKeys.Args.xHAPolixy, "all")
176
177   final val DefaultQueueConf = Env() +
178     (RabbitMQQueueKeys.autoDelete, false)         +
179     (RabbitMQQueueKeys.durable,    true)          +
180     (RabbitMQQueueKeys.exclusive,  false)         +
181     (RabbitMQQueueKeys.arguments,  DefaultQueueConfArguments)
182
183   final val DefaultChannelConf = Env() +
184     (RabbitMQChannelKeys.qosPrefetchCount, 1) +
185     (RabbitMQChannelKeys.qosPrefetchSize,  0) +
186     (RabbitMQChannelKeys.qosGlobal,        false)
187
188   def makeConnectionConf(props: Props) = {
189     val servers = props.getTrimmedList(RabbitMQConfKeys.servers)
190     val port = props.getIntEx(RabbitMQConfKeys.port)
191     val addresses = servers.map(new Address(_, port)).toArray
192
193     // TODO: Integrate the below RabbitMQConKeys and RabbitMQConfKeys
194     // TODO:  Normally this means to get rid of Props and use Env everywhere.
195     // TODO:  [Will be more type-safe anyway.]
196     Env() +
197       (RabbitMQConKeys.username, props(RabbitMQConfKeys.username)) +
198       (RabbitMQConKeys.password, props(RabbitMQConfKeys.password)) +
199       (RabbitMQConKeys.vhost,    props(RabbitMQConfKeys.vhost))    +
200       (RabbitMQConKeys.servers,  addresses)
201   }
202
203   def makeRabbitMQConsumerConf(props: Props, oneERQ: String) = {
204     val Array(exchange, routing, queue) = oneERQ.split(':')
205
206     RabbitMQConsumerConf(
207       exchangeName   = exchange,
208       routingKey     = routing,
209       queueName      = queue,
210       connectionConf = makeConnectionConf(props),
211       exchangeConf   = DefaultExchangeConf,
212       channelConf    = DefaultChannelConf,
213       queueConf      = DefaultQueueConf
214     )
215   }
216
217   object RabbitMQConKeys {
218     final val username  = StringKey(p("username"))
219     final val password  = StringKey(p("password"))
220     final val vhost     = StringKey(p("vhost"))
221     final val servers   = ArrayKey[Address](p("servers"))
222   }
223
224   /**
225    * A [[com.ckkloverdos.key.TypedKey]] for the exchange type
226    */
227   final case object RabbitMQExchangeTypedKey extends TypedKeySkeleton[RabbitMQExchangeType]("type")
228
229   /**
230    * Configuration keys for a `RabbitMQ` exchange.
231    *
232    * @author Christos KK Loverdos <loverdos@gmail.com>
233    */
234   object RabbitMQExchangeKeys {
235     final val `type`     = RabbitMQExchangeTypedKey
236     final val durable    = BooleanKey(p("durable"))
237     final val autoDelete = BooleanKey(p("autoDelete"))
238     final val arguments  = EnvKey(p("arguments"))
239   }
240
241   /**
242    * Configuration keys for a `RabbitMQ` exchange.
243    *
244    * @author Christos KK Loverdos <loverdos@gmail.com>
245    */
246   object RabbitMQQueueKeys {
247     final val durable    = BooleanKey(p("durable"))
248     final val autoDelete = BooleanKey(p("autoDelete"))
249     final val exclusive  = BooleanKey(p("exclusive"))
250     final val arguments  = EnvKey(p("arguments"))
251
252     object Args {
253       // http://www.rabbitmq.com/ha.html
254       // NOTE the exact name of the key (without using p()); this will be passed directly to RabbitMQ
255       final val xHAPolixy = StringKey("x-ha-policy")
256     }
257   }
258
259   /**
260    * Configuration keys for a `RabbitMQ` channel.
261    *
262    * @author Christos KK Loverdos <loverdos@gmail.com>
263    */
264   object RabbitMQChannelKeys {
265     final val qosPrefetchCount = IntKey(p("qosPrefetchCount"))
266     final val qosPrefetchSize  = IntKey(p("qosPrefetchSize"))
267     final val qosGlobal        = BooleanKey(p("qosGlobal"))
268   }
269
270   object RabbitMQConfKeys {
271     /**
272      * Comma separated list of AMQP servers running in active-active
273      * configuration.
274      */
275     final val servers = p("servers")
276     final val amqp_servers = servers
277
278     /**
279      * Comma separated list of AMQP servers running in active-active
280      * configuration.
281      */
282     final val port = p("port")
283     final val amqp_port = port
284
285     /**
286      * User name for connecting with the AMQP server
287      */
288     final val username = p("username")
289     final val amqp_username = username
290
291     /**
292      * Password for connecting with the AMQP server
293      */
294     final val password = p("passwd")
295     final val amqp_password = password
296
297     /**
298      * Virtual host on the AMQP server
299      */
300     final val vhost = p("vhost")
301     final val amqp_vhost = vhost
302
303     /**
304      * Comma separated list of exchanges known to aquarium
305      * FIXME: What is this??
306      */
307     final val exchange = p("exchange")
308     final val amqp_exchange = exchange
309
310     /**
311      * Queues for retrieving resource events from. Multiple queues can be
312      * declared, separated by comma.
313      *
314      * Format is `exchange:routing.key:queue-name,...`
315      */
316     final val rcevents_queues = p("rcevents.queues")
317     final val amqp_rcevents_queues = rcevents_queues
318
319     /**
320      * Queues for retrieving user events from. Multiple queues can be
321      * declared, separated by comma.
322      *
323      * Format is `exchange:routing.key:queue-name,...`
324      */
325     final val imevents_queues = p("imevents.queues")
326     final val amqp_imevents_queues = imevents_queues
327   }
328
329 }