2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.connector.rabbitmq
38 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQConsumerConf
39 import gr.grnet.aquarium.util.{Lifecycle, Loggable}
40 import gr.grnet.aquarium.util.{safeUnit, shortClassNameOf}
41 import com.rabbitmq.client.{Envelope, Consumer, ShutdownSignalException, ShutdownListener, ConnectionFactory, Channel, Connection}
42 import com.rabbitmq.client.AMQP.BasicProperties
43 import gr.grnet.aquarium.Aquarium
44 import gr.grnet.aquarium.connector.rabbitmq.eventbus.RabbitMQError
45 import gr.grnet.aquarium.service.event.BusEvent
46 import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
47 import com.ckkloverdos.maybe.{Maybe, Just, Failed, MaybeEither}
48 import gr.grnet.aquarium.connector.handler.{HandlerResultResend, HandlerResult, PayloadHandlerExecutor, HandlerResultPanic, HandlerResultRequeue, HandlerResultReject, HandlerResultSuccess, PayloadHandler}
49 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys.{RabbitMQConKeys, RabbitMQChannelKeys, RabbitMQExchangeKeys, RabbitMQQueueKeys}
52 * A basic `RabbitMQ` consumer. Sufficiently generalized, sufficiently tied to Aquarium.
54 * @author Christos KK Loverdos <loverdos@gmail.com>
57 class RabbitMQConsumer(val conf: RabbitMQConsumerConf,
60 * Specifies what we do with the message payload.
62 handler: PayloadHandler,
65 * Specifies how we execute the handler
67 executor: PayloadHandlerExecutor,
70 * After the payload is processed, we call this function with ourselves and the result.
72 notifier: (RabbitMQConsumer, Maybe[HandlerResult]) ⇒ Unit
73 ) extends Loggable with Lifecycle { consumerSelf ⇒
75 private[this] var _factory: ConnectionFactory = _
76 private[this] var _connection: Connection = _
77 private[this] var _channel: Channel = _
78 private[this] val _state = new AtomicReference[State](Shutdown)
79 private[this] val _pingIsScheduled = new AtomicBoolean(false)
82 * Reconnects are allowed unless some very specific condition within the application prohibits so.
84 @volatile private[this] var _allowReconnects = true
86 def isAllowingReconnects(): Boolean = _allowReconnects
88 def setAllowReconnects(allowReconnects: Boolean) = {
89 _allowReconnects = allowReconnects
94 val isChannelOpen = MaybeEither((_channel ne null) && _channel.isOpen) match {
95 case failed @ Failed(e) ⇒
96 logger.error("isChannelOpen", e)
103 val isConnectionOpen = MaybeEither((_connection ne null) && _connection.isOpen) match {
104 case failed @ Failed(e) ⇒
105 logger.error("isConnectionOpen", e)
112 _state.get().isStarted && isChannelOpen && isConnectionOpen
116 def isStarted: Boolean = false
118 case object StartupSequence extends State
119 case object Started extends State {
120 override def isStarted = true
122 case object ShutdownSequence extends State
123 case object Shutdown extends State
125 sealed trait StartReason
126 case object LifecycleStartReason extends StartReason
127 case object PingStartReason extends StartReason
129 private[this] def timerService = Aquarium.Instance.timerService
131 private[this] lazy val servers = {
132 conf.connectionConf(RabbitMQConKeys.servers)
135 private[this] lazy val reconnectPeriodMillis = {
136 conf.connectionConf(RabbitMQConKeys.reconnect_period_millis)
139 private[this] lazy val serversToDebugStrings = {
140 servers.map(address ⇒ "%s:%s".format(address.getHost, address.getPort)).toList
143 private[this] def infoList(what: String = ""): List[String] = {
148 List(serversToDebugStrings.mkString("(", ", ", ")")) ++
149 List("%s:%s:%s".format(
156 private[this] def infoString(what: String) = infoList(what).mkString("[", ", ", "]")
158 private[this] def doSafeStartupSequence(startReason: StartReason): Unit = {
161 if(isAlive() || !isAllowingReconnects() || aquarium.isStopping()) {
162 // In case of re-entrance
167 _state.set(StartupSequence)
169 val factory = new ConnectionFactory
170 factory.setConnectionTimeout(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
171 factory.setUsername(connectionConf(RabbitMQConKeys.username))
172 factory.setPassword(connectionConf(RabbitMQConKeys.password))
173 factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
174 factory.setRequestedHeartbeat(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
176 val connection = factory.newConnection(servers)
178 val channel = connection.createChannel()
180 channel.addShutdownListener(RabbitMQShutdownListener)
183 channelConf(RabbitMQChannelKeys.qosPrefetchSize),
184 channelConf(RabbitMQChannelKeys.qosPrefetchCount),
185 channelConf(RabbitMQChannelKeys.qosGlobal)
188 channel.exchangeDeclare(
190 exchangeConf(RabbitMQExchangeKeys.`type`).name,
191 exchangeConf(RabbitMQExchangeKeys.durable),
192 exchangeConf(RabbitMQExchangeKeys.autoDelete),
193 exchangeConf(RabbitMQExchangeKeys.arguments).toJavaMap
196 this._factory = factory
197 this._connection = connection
198 this._channel = channel
200 val declareOK = channel.queueDeclare(
202 queueConf(RabbitMQQueueKeys.durable),
203 queueConf(RabbitMQQueueKeys.exclusive),
204 queueConf(RabbitMQQueueKeys.autoDelete),
205 queueConf(RabbitMQQueueKeys.arguments).toJavaMap
208 val bindOK = channel.queueBind(queueName, exchangeName, routingKey)
210 _channel.basicConsume(
212 false, // We send explicit acknowledgements to RabbitMQ
213 RabbitMQMessageConsumer
218 logger.info("Connected %s".format(infoString("Start")))
222 val info = infoString(startReason.toString)
224 case LifecycleStartReason ⇒
225 logger.error("While connecting %s".format(info), e)
227 case PingStartReason ⇒
228 logger.warn("Could not reconnect %s".format(info))
231 // Shutdown on failure
235 if(!_pingIsScheduled.get()) {
236 // Schedule periodic pings
237 logger.info("Scheduling %s".format(infoString("Ping")))
239 _pingIsScheduled.getAndSet(true)
244 def start(): Unit = {
248 def safeStart(): Unit = {
249 doSafeStartupSequence(LifecycleStartReason)
253 _state.set(ShutdownSequence)
254 safeUnit(_channel.removeShutdownListener(RabbitMQShutdownListener))
255 safeUnit(_channel.close())
256 safeUnit(_connection.close())
264 private[this] def aquarium = Aquarium.Instance
266 private[this] def postBusError(event: BusEvent): Unit = {
267 aquarium.eventBus ! event
270 private[this] def doSchedulePing(): Unit = {
271 val info = infoString("Ping")
273 timerService.scheduleOnce(
276 if(!aquarium.isStopping()) {
277 if(isAllowingReconnects()) {
280 doSafeStartupSequence(PingStartReason)
282 // Reschedule the ping
287 reconnectPeriodMillis,
292 private[this] def doWithChannel[A](f: Channel ⇒ A): Unit = {
296 // FIXME: What is this?
297 postBusError(RabbitMQError(e))
303 object RabbitMQMessageConsumer extends Consumer {
304 def handleConsumeOk(consumerTag: String) = {
307 def handleCancelOk(consumerTag: String) = {
310 def handleCancel(consumerTag: String) = {
313 def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException) = {
317 def handleRecoverOk(consumerTag: String) = {
320 def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) = {
321 val onErrorF: PartialFunction[Throwable, Unit] = {
322 case error: Throwable ⇒
323 safeUnit(notifier(consumerSelf, Failed(error)))
327 val deliveryTag = envelope.getDeliveryTag
329 // nice little composeable functions
330 val notifierF = (handlerResult: HandlerResult) ⇒ {
331 safeUnit(notifier(consumerSelf, Just(handlerResult)))
335 val onSuccessBasicStepF: (HandlerResult ⇒ HandlerResult) = {
336 case result @ HandlerResultSuccess ⇒
337 doWithChannel(_.basicAck(deliveryTag, false))
340 case result @ HandlerResultResend ⇒
341 doWithChannel(_.basicNack(deliveryTag, false, true))
344 case result @ HandlerResultReject(_) ⇒
345 doWithChannel(_.basicReject(deliveryTag, false))
348 case result @ HandlerResultRequeue(_) ⇒
349 doWithChannel(_.basicReject(deliveryTag, true))
352 case result @ HandlerResultPanic ⇒
353 // Just inform RabbitMQ and subsequent actions will be made by the notifier.
354 // So, this is a `HandlerResultResend` with extra semantics.
355 doWithChannel(_.basicNack(deliveryTag, false, true))
359 val onSuccessF = onSuccessBasicStepF andThen notifierF
361 executor.exec(body, handler) (onSuccessF) (onErrorF)
367 object RabbitMQShutdownListener extends ShutdownListener {
368 def shutdownCompleted(cause: ShutdownSignalException) = {
370 logger.info("Got shutdown (%sisHardError) %s".format(
371 if(cause.isHardError) "" else "!",
374 // Now, let's see what happened
375 if(cause.isHardError) {
383 override def toString = {
384 "%s%s".format(shortClassNameOf(this), infoString(""))