2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.connector.rabbitmq
38 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQConsumerConf
39 import gr.grnet.aquarium.util.{Lifecycle, Loggable}
40 import gr.grnet.aquarium.util.{safeUnit, shortClassNameOf}
41 import com.rabbitmq.client.{Envelope, Consumer, ShutdownSignalException, ShutdownListener, ConnectionFactory, Channel, Connection}
42 import com.rabbitmq.client.AMQP.BasicProperties
43 import gr.grnet.aquarium.Aquarium
44 import gr.grnet.aquarium.connector.rabbitmq.eventbus.RabbitMQError
45 import gr.grnet.aquarium.service.event.BusEvent
46 import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.{RabbitMQConKeys, RabbitMQQueueKeys, RabbitMQExchangeKeys, RabbitMQChannelKeys}
47 import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
48 import com.ckkloverdos.maybe.{Maybe, Just, Failed, MaybeEither}
49 import gr.grnet.aquarium.connector.handler.{HandlerResultResend, HandlerResult, PayloadHandlerExecutor, HandlerResultPanic, HandlerResultRequeue, HandlerResultReject, HandlerResultSuccess, PayloadHandler}
52 * A basic `RabbitMQ` consumer. Sufficiently generalized, sufficiently tied to Aquarium.
54 * @author Christos KK Loverdos <loverdos@gmail.com>
57 class RabbitMQConsumer(val conf: RabbitMQConsumerConf,
60 * Specifies what we do with the message payload.
62 handler: PayloadHandler,
65 * Specifies how we execute the handler
67 executor: PayloadHandlerExecutor,
70 * After the payload is processed, we call this function with ourselves and the result.
72 notifier: (RabbitMQConsumer, Maybe[HandlerResult]) ⇒ Unit
73 ) extends Loggable with Lifecycle { consumerSelf ⇒
75 private[this] var _factory: ConnectionFactory = _
76 private[this] var _connection: Connection = _
77 private[this] var _channel: Channel = _
78 private[this] val _state = new AtomicReference[State](Shutdown)
79 private[this] val _pingIsScheduled = new AtomicBoolean(false)
82 * Reconnects are allowed unless some very specific condition within the application prohibits so.
84 @volatile private[this] var _allowReconnects = true
86 def isAllowingReconnects(): Boolean = _allowReconnects
88 def setAllowReconnects(allowReconnects: Boolean) = {
89 _allowReconnects = allowReconnects
94 val isChannelOpen = MaybeEither((_channel ne null) && _channel.isOpen) match {
95 case failed @ Failed(e) ⇒
96 logger.error("isChannelOpen", e)
103 val isConnectionOpen = MaybeEither((_connection ne null) && _connection.isOpen) match {
104 case failed @ Failed(e) ⇒
105 logger.error("isConnectionOpen", e)
112 _state.get().isStarted && isChannelOpen && isConnectionOpen
116 def isStarted: Boolean = false
118 case object StartupSequence extends State
119 case object Started extends State {
120 override def isStarted = true
122 case object ShutdownSequence extends State
123 case object Shutdown extends State
125 sealed trait StartReason
126 case object LifecycleStartReason extends StartReason
127 case object PingStartReason extends StartReason
129 private[this] def timerService = Aquarium.Instance.timerService
131 private[this] lazy val servers = {
132 conf.connectionConf(RabbitMQConKeys.servers)
135 private[this] lazy val reconnectPeriodMillis = {
136 conf.connectionConf(RabbitMQConKeys.reconnect_period_millis)
139 private[this] lazy val serversToDebugStrings = {
140 servers.map(address ⇒ "%s:%s".format(address.getHost, address.getPort)).toList
143 private[this] def infoList(what: String = ""): List[String] = {
148 List(serversToDebugStrings.mkString("(", ", ", ")")) ++
149 List("%s:%s:%s".format(
156 private[this] def infoString(what: String) = infoList(what).mkString("[", ", ", "]")
158 private[this] def doSafeStartupSequence(startReason: StartReason): Unit = {
161 if(isAlive() || !isAllowingReconnects() || aquarium.isStopping()) {
162 // In case of re-entrance
167 _state.set(StartupSequence)
169 val factory = new ConnectionFactory
170 factory.setConnectionTimeout(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
171 factory.setUsername(connectionConf(RabbitMQConKeys.username))
172 factory.setPassword(connectionConf(RabbitMQConKeys.password))
173 factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
175 val connection = factory.newConnection(servers)
177 val channel = connection.createChannel()
179 channel.addShutdownListener(RabbitMQShutdownListener)
182 channelConf(RabbitMQChannelKeys.qosPrefetchSize),
183 channelConf(RabbitMQChannelKeys.qosPrefetchCount),
184 channelConf(RabbitMQChannelKeys.qosGlobal)
187 channel.exchangeDeclare(
189 exchangeConf(RabbitMQExchangeKeys.`type`).name,
190 exchangeConf(RabbitMQExchangeKeys.durable),
191 exchangeConf(RabbitMQExchangeKeys.autoDelete),
192 exchangeConf(RabbitMQExchangeKeys.arguments).toJavaMap
195 this._factory = factory
196 this._connection = connection
197 this._channel = channel
199 val declareOK = channel.queueDeclare(
201 queueConf(RabbitMQQueueKeys.durable),
202 queueConf(RabbitMQQueueKeys.exclusive),
203 queueConf(RabbitMQQueueKeys.autoDelete),
204 queueConf(RabbitMQQueueKeys.arguments).toJavaMap
207 val bindOK = channel.queueBind(queueName, exchangeName, routingKey)
209 _channel.basicConsume(
211 false, // We send explicit acknowledgements to RabbitMQ
212 RabbitMQMessageConsumer
217 logger.info("Connected %s".format(infoString("Start")))
221 val info = infoString(startReason.toString)
223 case LifecycleStartReason ⇒
224 logger.error("While connecting %s".format(info), e)
226 case PingStartReason ⇒
227 logger.warn("Could not reconnect %s".format(info))
230 // Shutdown on failure
234 if(!_pingIsScheduled.get()) {
235 // Schedule periodic pings
236 logger.info("Scheduling %s".format(infoString("Ping")))
238 _pingIsScheduled.getAndSet(true)
243 def start(): Unit = {
247 def safeStart(): Unit = {
248 doSafeStartupSequence(LifecycleStartReason)
252 _state.set(ShutdownSequence)
253 safeUnit(_channel.close())
254 safeUnit(_connection.close())
262 private[this] def aquarium = Aquarium.Instance
264 private[this] def postBusError(event: BusEvent): Unit = {
265 aquarium.eventBus ! event
268 private[this] def doSchedulePing(): Unit = {
269 val info = infoString("Ping")
271 timerService.scheduleOnce(
274 if(!aquarium.isStopping()) {
275 if(isAllowingReconnects()) {
278 doSafeStartupSequence(PingStartReason)
280 // Reschedule the ping
285 reconnectPeriodMillis,
290 private[this] def doWithChannel[A](f: Channel ⇒ A): Unit = {
294 // FIXME: What is this?
295 postBusError(RabbitMQError(e))
301 object RabbitMQMessageConsumer extends Consumer {
302 def handleConsumeOk(consumerTag: String) = {
305 def handleCancelOk(consumerTag: String) = {
308 def handleCancel(consumerTag: String) = {
311 def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException) = {
314 def handleRecoverOk() = {
317 def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) = {
318 val onErrorF: PartialFunction[Throwable, Unit] = {
319 case error: Throwable ⇒
320 safeUnit(notifier(consumerSelf, Failed(error)))
324 val deliveryTag = envelope.getDeliveryTag
326 // nice little composeable functions
327 val notifierF = (handlerResult: HandlerResult) ⇒ {
328 safeUnit(notifier(consumerSelf, Just(handlerResult)))
332 val onSuccessBasicStepF: (HandlerResult ⇒ Unit) = {
333 case HandlerResultSuccess ⇒
334 doWithChannel(_.basicAck(deliveryTag, false))
336 case HandlerResultResend ⇒
337 doWithChannel(_.basicNack(deliveryTag, false, true))
339 case HandlerResultReject(_) ⇒
340 doWithChannel(_.basicReject(deliveryTag, false))
342 case HandlerResultRequeue(_) ⇒
343 doWithChannel(_.basicReject(deliveryTag, true))
345 case HandlerResultPanic ⇒
346 // Just inform RabbitMQ and subsequent actions will be made by the notifier.
347 // So, this is a `HandlerResultResend` with extra semantics.
348 doWithChannel(_.basicNack(deliveryTag, false, true))
351 val onSuccessF = notifierF andThen onSuccessBasicStepF
353 executor.exec(body, handler) (onSuccessF) (onErrorF)
359 object RabbitMQShutdownListener extends ShutdownListener {
360 def shutdownCompleted(cause: ShutdownSignalException) = {
362 logger.info("Got shutdown (%sisHardError) %s".format(
363 if(cause.isHardError) "" else "!",
366 // Now, let's see what happened
367 if(cause.isHardError) {
375 override def toString = {
376 "%s%s".format(shortClassNameOf(this), infoString(""))