Statistics
| Branch: | Tag: | Revision:

root / src / main / scala / gr / grnet / aquarium / connector / rabbitmq / service / RabbitMQService.scala @ 61bfaf2e

History | View | Annotate | Download (13.2 kB)

1
/*
2
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *
12
 *   2. Redistributions in binary form must reproduce the above
13
 *      copyright notice, this list of conditions and the following
14
 *      disclaimer in the documentation and/or other materials
15
 *      provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
 * POSSIBILITY OF SUCH DAMAGE.
29
 *
30
 * The views and conclusions contained in the software and
31
 * documentation are those of the authors and should not be
32
 * interpreted as representing official policies, either expressed
33
 * or implied, of GRNET S.A.
34
 */
35

    
36
package gr.grnet.aquarium.connector.rabbitmq.service
37

    
38
import com.ckkloverdos.props.Props
39
import gr.grnet.aquarium.util.date.TimeHelpers
40
import gr.grnet.aquarium.util.{Loggable, Lifecycle}
41
import gr.grnet.aquarium.util.safeUnit
42
import com.rabbitmq.client.Address
43
import gr.grnet.aquarium.{Configurator, Configurable}
44
import gr.grnet.aquarium.connector.rabbitmq.conf.{TopicExchange, RabbitMQConsumerConf, RabbitMQExchangeType}
45
import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.RabbitMQConfKeys
46
import com.ckkloverdos.env.{EnvKey, Env}
47
import gr.grnet.aquarium.converter.{JsonTextFormat, StdConverters}
48
import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
49
import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
50
import com.ckkloverdos.maybe.MaybeEither
51
import gr.grnet.aquarium.actor.RouterRole
52
import gr.grnet.aquarium.actor.message.event.{ProcessIMEvent, ProcessResourceEvent}
53
import gr.grnet.aquarium.store.{LocalFSEventStore, IMEventStore, ResourceEventStore}
54
import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
55
import com.ckkloverdos.key.{LongKey, ArrayKey, IntKey, TypedKeySkeleton, BooleanKey, StringKey}
56

    
57
/**
58
 *
59
 * @author Christos KK Loverdos <loverdos@gmail.com>
60
 */
61

    
62
class RabbitMQService extends Loggable with Lifecycle with Configurable {
63
  @volatile private[this] var _props: Props = Props()(StdConverters.AllConverters)
64
  @volatile private[this] var _consumers = List[RabbitMQConsumer]()
65

    
66
  def propertyPrefix = Some(RabbitMQService.PropertiesPrefix)
67

    
68
  def configurator = Configurator.MasterConfigurator
69

    
70
  def eventBus = configurator.eventBus
71

    
72
  def resourceEventStore = configurator.resourceEventStore
73

    
74
  def imEventStore = configurator.imEventStore
75

    
76
  def converters = configurator.converters
77

    
78
  def router = configurator.actorProvider.actorForRole(RouterRole)
79

    
80
  /**
81
   * Configure this instance with the provided properties.
82
   *
83
   * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
84
   */
85
  def configure(props: Props) = {
86
    this._props = props
87

    
88
    doConfigure()
89
  }
90

    
91
  private[this] def doConfigure(): Unit = {
92
    val jsonParser: (Array[Byte] ⇒ MaybeEither[JsonTextFormat]) = { payload ⇒
93
      converters.convert[JsonTextFormat](payload)
94
    }
95

    
96
    val rcEventParser: (JsonTextFormat ⇒ ResourceEventModel) = { jsonTextFormat ⇒
97
      StdResourceEvent.fromJsonTextFormat(jsonTextFormat)
98
    }
99

    
100
    val imEventParser: (JsonTextFormat ⇒ IMEventModel) = { jsonTextFormat ⇒
101
      StdIMEvent.fromJsonTextFormat(jsonTextFormat)
102
    }
103

    
104
    val rcForwardAction: (ResourceEventStore#ResourceEvent ⇒ Unit) = { rcEvent ⇒
105
      router ! ProcessResourceEvent(rcEvent)
106
    }
107

    
108
    val rcDebugForwardAction: (ResourceEventStore#ResourceEvent ⇒ Unit) = { rcEvent ⇒
109
      logger.info("Forwarding {}", rcEvent)
110
    }
111

    
112
    val imForwardAction: (IMEventStore#IMEvent ⇒ Unit) = { imEvent ⇒
113
      router ! ProcessIMEvent(imEvent)
114
    }
115

    
116
    val imDebugForwardAction: (IMEventStore#IMEvent ⇒ Unit) = { imEvent ⇒
117
      logger.info("Forwarding {}", imEvent)
118
    }
119

    
120
    val rcHandler = new GenericPayloadHandler[ResourceEventModel, ResourceEventStore#ResourceEvent](
121
      jsonParser,
122
      (payload, error) ⇒ LocalFSEventStore.storeUnparsedResourceEvent(configurator, payload, error),
123
      rcEventParser,
124
      rcEvent ⇒ resourceEventStore.insertResourceEvent(rcEvent),
125
      rcDebugForwardAction
126
    )
127

    
128
    val imHandler = new GenericPayloadHandler[IMEventModel, IMEventStore#IMEvent](
129
      jsonParser,
130
      (payload, error) ⇒ LocalFSEventStore.storeUnparsedIMEvent(configurator, payload, error),
131
      imEventParser,
132
      imEvent ⇒ imEventStore.insertIMEvent(imEvent),
133
      imDebugForwardAction
134
    )
135

    
136
    val futureExecutor = new PayloadHandlerFutureExecutor
137

    
138
    // (e)xchange:(r)outing key:(q)
139

    
140
    // These two are to trigger an error if the property does not exist
141
    locally(_props(RabbitMQConfKeys.rcevents_queues))
142
    locally(_props(RabbitMQConfKeys.imevents_queues))
143

    
144
    val all_rc_ERQs = _props.getTrimmedList(RabbitMQConfKeys.rcevents_queues)
145

    
146
    val rcConsumerConfs_ = for(oneERQ ← all_rc_ERQs) yield {
147
      RabbitMQService.makeRabbitMQConsumerConf(_props, oneERQ)
148
    }
149
    val rcConsumerConfs = rcConsumerConfs_.toSet.toList
150
    if(rcConsumerConfs.size != rcConsumerConfs_.size) {
151
      logger.warn(
152
        "Duplicate %s consumer info in %s=%s".format(
153
        RabbitMQService.PropertiesPrefix,
154
        RabbitMQConfKeys.rcevents_queues,
155
        _props(RabbitMQConfKeys.rcevents_queues)))
156
    }
157

    
158
    val all_im_ERQs = _props.getTrimmedList(RabbitMQConfKeys.imevents_queues)
159
    val imConsumerConfs_ = for(oneERQ ← all_im_ERQs) yield {
160
      RabbitMQService.makeRabbitMQConsumerConf(_props, oneERQ)
161
    }
162
    val imConsumerConfs = imConsumerConfs_.toSet.toList
163
    if(imConsumerConfs.size != imConsumerConfs_.size) {
164
      logger.warn(
165
        "Duplicate %s consumer info in %s=%s".format(
166
        RabbitMQService.PropertiesPrefix,
167
        RabbitMQConfKeys.imevents_queues,
168
        _props(RabbitMQConfKeys.imevents_queues)))
169
    }
170

    
171
    val rcConsumers = for(rccc ← rcConsumerConfs) yield {
172
      logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
173
        RabbitMQService.PropertiesPrefix,
174
        rccc.exchangeName,
175
        rccc.routingKey,
176
        rccc.queueName
177
      ))
178
      new RabbitMQConsumer(rccc, rcHandler, futureExecutor)
179
    }
180

    
181
    val imConsumers = for(imcc ← imConsumerConfs) yield {
182
      logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
183
        RabbitMQService.PropertiesPrefix,
184
        imcc.exchangeName,
185
        imcc.routingKey,
186
        imcc.queueName
187
      ))
188
      new RabbitMQConsumer(imcc, imHandler, futureExecutor)
189
    }
190

    
191
    this._consumers = rcConsumers ++ imConsumers
192

    
193
    val lg: (String ⇒ Unit) = if(this._consumers.size == 0) logger.warn(_) else logger.debug(_)
194
    lg("Got %s consumers".format(this._consumers.size))
195

    
196
    this._consumers.foreach(logger.debug("Configured {}", _))
197
  }
198

    
199
  def start() = {
200
    logStartingF("") {
201
      this._consumers.foreach(_.start())
202

    
203
      for(consumer ← this._consumers) {
204
        if(!consumer.isAlive()) {
205
          logger.warn("Consumer not started yet {}", consumer.toDebugString)
206
        }
207
      }
208
    } {}
209
  }
210

    
211
  def stop() = {
212
    logStoppingF("") {
213
      for(consumer ← this._consumers) {
214
        safeUnit(consumer.stop())
215
      }
216
    } {}
217
  }
218
}
219

    
220
object RabbitMQService {
221
  final val PropertiesPrefix       = "rabbitmq"
222
  final val PropertiesPrefixAndDot = PropertiesPrefix + "."
223

    
224
  @inline private[this] def p(name: String) = PropertiesPrefixAndDot + name
225

    
226
  final val DefaultExchangeConfArguments = Env()
227

    
228
  final val DefaultExchangeConf = Env() +
229
    (RabbitMQExchangeKeys.`type`,     TopicExchange) +
230
    (RabbitMQExchangeKeys.autoDelete, false)         +
231
    (RabbitMQExchangeKeys.durable,    true)          +
232
    (RabbitMQExchangeKeys.arguments,  DefaultExchangeConfArguments)
233

    
234

    
235
  final val DefaultQueueConfArguments = Env() +
236
    (RabbitMQQueueKeys.Args.xHAPolixy, "all")
237

    
238
  final val DefaultQueueConf = Env() +
239
    (RabbitMQQueueKeys.autoDelete, false)         +
240
    (RabbitMQQueueKeys.durable,    true)          +
241
    (RabbitMQQueueKeys.exclusive,  false)         +
242
    (RabbitMQQueueKeys.arguments,  DefaultQueueConfArguments)
243

    
244
  final val DefaultChannelConf = Env() +
245
    (RabbitMQChannelKeys.qosPrefetchCount, 1) +
246
    (RabbitMQChannelKeys.qosPrefetchSize,  0) +
247
    (RabbitMQChannelKeys.qosGlobal,        false)
248

    
249
  def makeConnectionConf(props: Props) = {
250
    val servers = props.getTrimmedList(RabbitMQConfKeys.servers)
251
    val port = props.getIntEx(RabbitMQConfKeys.port)
252
    val addresses = servers.map(new Address(_, port)).toArray
253

    
254
    // TODO: Integrate the below RabbitMQConKeys and RabbitMQConfKeys
255
    // TODO:  Normally this means to get rid of Props and use Env everywhere.
256
    // TODO:  [Will be more type-safe anyway.]
257
    Env() +
258
      (RabbitMQConKeys.username, props(RabbitMQConfKeys.username)) +
259
      (RabbitMQConKeys.password, props(RabbitMQConfKeys.password)) +
260
      (RabbitMQConKeys.vhost,    props(RabbitMQConfKeys.vhost))    +
261
      (RabbitMQConKeys.servers,  addresses) +
262
      (RabbitMQConKeys.reconnect_period_millis, props.getLongEx(RabbitMQConfKeys.reconnect_period_millis))
263
  }
264

    
265
  def makeRabbitMQConsumerConf(props: Props, oneERQ: String) = {
266
    val Array(exchange, routing, queue) = oneERQ.split(':')
267

    
268
    RabbitMQConsumerConf(
269
      exchangeName   = exchange,
270
      routingKey     = routing,
271
      queueName      = queue,
272
      connectionConf = makeConnectionConf(props),
273
      exchangeConf   = DefaultExchangeConf,
274
      channelConf    = DefaultChannelConf,
275
      queueConf      = DefaultQueueConf
276
    )
277
  }
278

    
279
  object RabbitMQConKeys {
280
    final val username  = StringKey(p("username"))
281
    final val password  = StringKey(p("password"))
282
    final val vhost     = StringKey(p("vhost"))
283
    final val servers   = ArrayKey[Address](p("servers"))
284

    
285
    final val reconnect_period_millis = LongKey(p("reconnect.period.millis"))
286
  }
287

    
288
  /**
289
   * A [[com.ckkloverdos.key.TypedKey]] for the exchange type
290
   */
291
  final case object RabbitMQExchangeTypedKey extends TypedKeySkeleton[RabbitMQExchangeType]("type")
292

    
293
  /**
294
   * Configuration keys for a `RabbitMQ` exchange.
295
   *
296
   * @author Christos KK Loverdos <loverdos@gmail.com>
297
   */
298
  object RabbitMQExchangeKeys {
299
    final val `type`     = RabbitMQExchangeTypedKey
300
    final val durable    = BooleanKey(p("durable"))
301
    final val autoDelete = BooleanKey(p("autoDelete"))
302
    final val arguments  = EnvKey(p("arguments"))
303
  }
304

    
305
  /**
306
   * Configuration keys for a `RabbitMQ` exchange.
307
   *
308
   * @author Christos KK Loverdos <loverdos@gmail.com>
309
   */
310
  object RabbitMQQueueKeys {
311
    final val durable    = BooleanKey(p("durable"))
312
    final val autoDelete = BooleanKey(p("autoDelete"))
313
    final val exclusive  = BooleanKey(p("exclusive"))
314
    final val arguments  = EnvKey(p("arguments"))
315

    
316
    object Args {
317
      // http://www.rabbitmq.com/ha.html
318
      // NOTE the exact name of the key (without using p()); this will be passed directly to RabbitMQ
319
      final val xHAPolixy = StringKey("x-ha-policy")
320
    }
321
  }
322

    
323
  /**
324
   * Configuration keys for a `RabbitMQ` channel.
325
   *
326
   * @author Christos KK Loverdos <loverdos@gmail.com>
327
   */
328
  object RabbitMQChannelKeys {
329
    final val qosPrefetchCount = IntKey(p("qosPrefetchCount"))
330
    final val qosPrefetchSize  = IntKey(p("qosPrefetchSize"))
331
    final val qosGlobal        = BooleanKey(p("qosGlobal"))
332
  }
333

    
334
  object RabbitMQConfKeys {
335
    /**
336
     * How often do we attemot a reconnection?
337
     */
338
    final val reconnect_period_millis = p("reconnect.period.millis")
339

    
340
    /**
341
     * Comma separated list of AMQP servers running in active-active
342
     * configuration.
343
     */
344
    final val servers = p("servers")
345

    
346
    /**
347
     * Comma separated list of AMQP servers running in active-active
348
     * configuration.
349
     */
350
    final val port = p("port")
351

    
352
    /**
353
     * User name for connecting with the AMQP server
354
     */
355
    final val username = p("username")
356

    
357
    /**
358
     * Password for connecting with the AMQP server
359
     */
360
    final val password = p("passwd")
361

    
362
    /**
363
     * Virtual host on the AMQP server
364
     */
365
    final val vhost = p("vhost")
366

    
367
    /**
368
     * Comma separated list of exchanges known to aquarium
369
     * FIXME: What is this??
370
     */
371
    final val exchange = p("exchange")
372

    
373
    /**
374
     * Queues for retrieving resource events from. Multiple queues can be
375
     * declared, separated by comma.
376
     *
377
     * Format is `exchange:routing.key:queue-name,...`
378
     */
379
    final val rcevents_queues = p("rcevents.queues")
380

    
381
    /**
382
     * Queues for retrieving user events from. Multiple queues can be
383
     * declared, separated by comma.
384
     *
385
     * Format is `exchange:routing.key:queue-name,...`
386
     */
387
    final val imevents_queues = p("imevents.queues")
388
  }
389
}