2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.connector.rabbitmq.service
38 import com.ckkloverdos.props.Props
39 import gr.grnet.aquarium.util.date.TimeHelpers
40 import gr.grnet.aquarium.util.{ReflectHelpers, Loggable, Lifecycle}
41 import com.rabbitmq.client.{ConnectionFactory, Address}
42 import gr.grnet.aquarium.{Configurator, Configurable}
43 import gr.grnet.aquarium.connector.rabbitmq.eventbus.RabbitMQError
44 import gr.grnet.aquarium.connector.rabbitmq.conf.{TopicExchange, RabbitMQConsumerConf, RabbitMQExchangeType}
45 import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.RabbitMQConfKeys
46 import com.ckkloverdos.key.{ArrayKey, IntKey, TypedKeySkeleton, BooleanKey, StringKey}
47 import com.ckkloverdos.env.{EnvKey, Env}
48 import gr.grnet.aquarium.converter.{JsonTextFormat, StdConverters}
49 import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
50 import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
51 import com.ckkloverdos.maybe.{MaybeEither, Failed, Just, Maybe}
52 import gr.grnet.aquarium.connector.handler.{HandlerResultSuccess, HandlerResultPanic, HandlerResultReject, HandlerResult, PayloadHandler}
53 import gr.grnet.aquarium.actor.RouterRole
54 import gr.grnet.aquarium.actor.message.event.{ProcessIMEvent, ProcessResourceEvent}
55 import gr.grnet.aquarium.store.{LocalFSEventStore, IMEventStore, ResourceEventStore}
56 import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
60 * @author Christos KK Loverdos <loverdos@gmail.com>
63 class RabbitMQService extends Loggable with Lifecycle with Configurable {
64 private[this] val props: Props = Props()(StdConverters.AllConverters)
66 def propertyPrefix = Some(RabbitMQService.PropertiesPrefix)
68 def configurator = Configurator.MasterConfigurator
70 def eventBus = configurator.eventBus
72 def resourceEventStore = configurator.resourceEventStore
74 def imEventStore = configurator.imEventStore
76 def converters = configurator.converters
78 def router = configurator.actorProvider.actorForRole(RouterRole)
81 * Configure this instance with the provided properties.
83 * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
85 def configure(props: Props) = {
86 ReflectHelpers.setField(this, "props", props)
90 logger.info("Configured with {}", this.props)
93 // If we have no internal error, then something is bad with RabbitMQ
94 eventBus ! RabbitMQError(e)
99 private[this] def doConfigure(): Unit = {
100 val jsonParser: (Array[Byte] ⇒ MaybeEither[JsonTextFormat]) = { payload ⇒
101 converters.convert[JsonTextFormat](payload)
104 val rcEventParser: (JsonTextFormat ⇒ ResourceEventModel) = { jsonTextFormat ⇒
105 StdResourceEvent.fromJsonTextFormat(jsonTextFormat)
108 val imEventParser: (JsonTextFormat ⇒ IMEventModel) = { jsonTextFormat ⇒
109 StdIMEvent.fromJsonTextFormat(jsonTextFormat)
112 val rcHandler = new GenericPayloadHandler[ResourceEventModel, ResourceEventStore#ResourceEvent](
114 (payload, error) ⇒ LocalFSEventStore.storeUnparsedResourceEvent(configurator, payload, error),
116 rcEvent ⇒ resourceEventStore.insertResourceEvent(rcEvent),
117 rcEvent ⇒ router ! ProcessResourceEvent(rcEvent)
120 val imHandler = new GenericPayloadHandler[IMEventModel, IMEventStore#IMEvent](
122 (payload, error) ⇒ LocalFSEventStore.storeUnparsedIMEvent(configurator, payload, error),
124 imEvent ⇒ imEventStore.insertIMEvent(imEvent),
125 imEvent ⇒ router ! ProcessIMEvent(imEvent)
128 // (e)xchange:(r)outing key:(q)
129 val all_rc_ERQs = props.getTrimmedList(RabbitMQConfKeys.rcevents_queues)
130 val rcConsumerConfs = for(oneERQ ← all_rc_ERQs) yield {
131 RabbitMQService.makeRabbitMQConsumerConf(props, oneERQ)
134 val all_im_ERQs = props.getTrimmedList(RabbitMQConfKeys.imevents_queues)
135 val imConsumerConfs = for(oneERQ ← all_im_ERQs) yield {
136 RabbitMQService.makeRabbitMQConsumerConf(props, oneERQ)
139 val rcConsumers = for(rccc ← rcConsumerConfs) yield {
140 new RabbitMQConsumer(rccc, rcHandler)
143 val imConsumers = for(imcc ← imConsumerConfs) yield {
144 new RabbitMQConsumer(imcc, imHandler)
149 logStarted(TimeHelpers.nowMillis(), TimeHelpers.nowMillis())
154 logStopped(TimeHelpers.nowMillis(), TimeHelpers.nowMillis())
159 object RabbitMQService {
160 final val PropertiesPrefix = "rabbitmq"
161 final val PropertiesPrefixAndDot = PropertiesPrefix + "."
163 @inline private[this] def p(name: String) = PropertiesPrefix + name
165 final val DefaultExchangeConfArguments = Env()
167 final val DefaultExchangeConf = Env() +
168 (RabbitMQExchangeKeys.`type`, TopicExchange) +
169 (RabbitMQExchangeKeys.autoDelete, false) +
170 (RabbitMQExchangeKeys.durable, true) +
171 (RabbitMQExchangeKeys.arguments, DefaultExchangeConfArguments)
174 final val DefaultQueueConfArguments = Env() +
175 (RabbitMQQueueKeys.Args.xHAPolixy, "all")
177 final val DefaultQueueConf = Env() +
178 (RabbitMQQueueKeys.autoDelete, false) +
179 (RabbitMQQueueKeys.durable, true) +
180 (RabbitMQQueueKeys.exclusive, false) +
181 (RabbitMQQueueKeys.arguments, DefaultQueueConfArguments)
183 final val DefaultChannelConf = Env() +
184 (RabbitMQChannelKeys.qosPrefetchCount, 1) +
185 (RabbitMQChannelKeys.qosPrefetchSize, 0) +
186 (RabbitMQChannelKeys.qosGlobal, false)
188 def makeConnectionConf(props: Props) = {
189 val servers = props.getTrimmedList(RabbitMQConfKeys.servers)
190 val port = props.getIntEx(RabbitMQConfKeys.port)
191 val addresses = servers.map(new Address(_, port)).toArray
193 // TODO: Integrate the below RabbitMQConKeys and RabbitMQConfKeys
194 // TODO: Normally this means to get rid of Props and use Env everywhere.
195 // TODO: [Will be more type-safe anyway.]
197 (RabbitMQConKeys.username, props(RabbitMQConfKeys.username)) +
198 (RabbitMQConKeys.password, props(RabbitMQConfKeys.password)) +
199 (RabbitMQConKeys.vhost, props(RabbitMQConfKeys.vhost)) +
200 (RabbitMQConKeys.servers, addresses)
203 def makeRabbitMQConsumerConf(props: Props, oneERQ: String) = {
204 val Array(exchange, routing, queue) = oneERQ.split(':')
206 RabbitMQConsumerConf(
207 exchangeName = exchange,
208 routingKey = routing,
210 connectionConf = makeConnectionConf(props),
211 exchangeConf = DefaultExchangeConf,
212 channelConf = DefaultChannelConf,
213 queueConf = DefaultQueueConf
217 object RabbitMQConKeys {
218 final val username = StringKey(p("username"))
219 final val password = StringKey(p("password"))
220 final val vhost = StringKey(p("vhost"))
221 final val servers = ArrayKey[Address](p("servers"))
225 * A [[com.ckkloverdos.key.TypedKey]] for the exchange type
227 final case object RabbitMQExchangeTypedKey extends TypedKeySkeleton[RabbitMQExchangeType]("type")
230 * Configuration keys for a `RabbitMQ` exchange.
232 * @author Christos KK Loverdos <loverdos@gmail.com>
234 object RabbitMQExchangeKeys {
235 final val `type` = RabbitMQExchangeTypedKey
236 final val durable = BooleanKey(p("durable"))
237 final val autoDelete = BooleanKey(p("autoDelete"))
238 final val arguments = EnvKey(p("arguments"))
242 * Configuration keys for a `RabbitMQ` exchange.
244 * @author Christos KK Loverdos <loverdos@gmail.com>
246 object RabbitMQQueueKeys {
247 final val durable = BooleanKey(p("durable"))
248 final val autoDelete = BooleanKey(p("autoDelete"))
249 final val exclusive = BooleanKey(p("exclusive"))
250 final val arguments = EnvKey(p("arguments"))
253 // http://www.rabbitmq.com/ha.html
254 // NOTE the exact name of the key (without using p()); this will be passed directly to RabbitMQ
255 final val xHAPolixy = StringKey("x-ha-policy")
260 * Configuration keys for a `RabbitMQ` channel.
262 * @author Christos KK Loverdos <loverdos@gmail.com>
264 object RabbitMQChannelKeys {
265 final val qosPrefetchCount = IntKey(p("qosPrefetchCount"))
266 final val qosPrefetchSize = IntKey(p("qosPrefetchSize"))
267 final val qosGlobal = BooleanKey(p("qosGlobal"))
270 object RabbitMQConfKeys {
272 * Comma separated list of AMQP servers running in active-active
275 final val servers = p("servers")
276 final val amqp_servers = servers
279 * Comma separated list of AMQP servers running in active-active
282 final val port = p("port")
283 final val amqp_port = port
286 * User name for connecting with the AMQP server
288 final val username = p("username")
289 final val amqp_username = username
292 * Password for connecting with the AMQP server
294 final val password = p("passwd")
295 final val amqp_password = password
298 * Virtual host on the AMQP server
300 final val vhost = p("vhost")
301 final val amqp_vhost = vhost
304 * Comma separated list of exchanges known to aquarium
305 * FIXME: What is this??
307 final val exchange = p("exchange")
308 final val amqp_exchange = exchange
311 * Queues for retrieving resource events from. Multiple queues can be
312 * declared, separated by comma.
314 * Format is `exchange:routing.key:queue-name,...`
316 final val rcevents_queues = p("rcevents.queues")
317 final val amqp_rcevents_queues = rcevents_queues
320 * Queues for retrieving user events from. Multiple queues can be
321 * declared, separated by comma.
323 * Format is `exchange:routing.key:queue-name,...`
325 final val imevents_queues = p("imevents.queues")
326 final val amqp_imevents_queues = imevents_queues