2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.connector.rabbitmq
38 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQConsumerConf
39 import gr.grnet.aquarium.util.{Lifecycle, Loggable}
40 import gr.grnet.aquarium.util.{safeUnit, shortClassNameOf}
41 import com.rabbitmq.client.{Envelope, Consumer, ShutdownSignalException, ShutdownListener, ConnectionFactory, Channel, Connection}
42 import com.rabbitmq.client.AMQP.BasicProperties
43 import gr.grnet.aquarium.Configurator
44 import gr.grnet.aquarium.connector.rabbitmq.eventbus.RabbitMQError
45 import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
46 import gr.grnet.aquarium.service.event.BusEvent
47 import gr.grnet.aquarium.connector.handler.{PayloadHandlerExecutor, HandlerResultPanic, HandlerResultRequeue, HandlerResultReject, HandlerResultSuccess, PayloadHandler}
48 import com.ckkloverdos.maybe.MaybeEither
49 import gr.grnet.aquarium.util.date.TimeHelpers
50 import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.{RabbitMQConKeys, RabbitMQQueueKeys, RabbitMQExchangeKeys, RabbitMQChannelKeys}
53 * A basic `RabbitMQ` consumer. Sufficiently generalized, sufficiently tied to Aquarium.
55 * @author Christos KK Loverdos <loverdos@gmail.com>
58 class RabbitMQConsumer(conf: RabbitMQConsumerConf,
59 handler: PayloadHandler,
60 executor: PayloadHandlerExecutor) extends Loggable with Lifecycle {
61 private[this] var _factory: ConnectionFactory = _
62 private[this] var _connection: Connection = _
63 private[this] var _channel: Channel = _
64 // private[this] val _isAlive = new AtomicBoolean(false)
65 private[this] val _state = new AtomicReference[State](Shutdown)
68 _state.get().isStarted && MaybeEither(_channel.isOpen).getOr(false)
72 def isStarted: Boolean = false
74 case object StartupSequence extends State
75 case class BadStart(e: Exception) extends State
76 case object Started extends State { override def isStarted = true }
77 case object ShutdownSequence extends State
78 case object Shutdown extends State
80 private[this] def doFullShutdownSequence(): Unit = {
81 _state.set(ShutdownSequence)
82 safeUnit(_channel.close())
83 safeUnit(_connection.close())
87 private[this] def doSafeFullShutdownSequence(rescheduleStartup: Boolean): Unit = {
88 safeUnit(doFullShutdownSequence())
90 if(rescheduleStartup) {
95 private[this] def servers = {
96 conf.connectionConf(RabbitMQConKeys.servers)
99 private[this] def serversToDebugStrings = {
100 servers.map(address ⇒ "%s:%s".format(address.getHost, address.getPort))
103 private[this] def doFullStartupSequence() = {
106 _state.set(StartupSequence)
108 val factory = new ConnectionFactory
109 factory.setUsername(connectionConf(RabbitMQConKeys.username))
110 factory.setPassword(connectionConf(RabbitMQConKeys.password))
111 factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
113 val connection = factory.newConnection(servers)
115 val channel = connection.createChannel()
118 channelConf(RabbitMQChannelKeys.qosPrefetchSize),
119 channelConf(RabbitMQChannelKeys.qosPrefetchCount),
120 channelConf(RabbitMQChannelKeys.qosGlobal)
123 channel.exchangeDeclare(
125 exchangeConf(RabbitMQExchangeKeys.`type`).name,
126 exchangeConf(RabbitMQExchangeKeys.durable),
127 exchangeConf(RabbitMQExchangeKeys.autoDelete),
128 exchangeConf(RabbitMQExchangeKeys.arguments).toJavaMap
131 this._factory = factory
132 this._connection = connection
133 this._channel = channel
135 val declareOK = channel.queueDeclare(
137 queueConf(RabbitMQQueueKeys.durable),
138 queueConf(RabbitMQQueueKeys.exclusive),
139 queueConf(RabbitMQQueueKeys.autoDelete),
140 queueConf(RabbitMQQueueKeys.arguments).toJavaMap
143 val bindOK = channel.queueBind(queueName, exchangeName, routingKey)
145 channel.addShutdownListener(RabbitMQShutdownListener)
147 _channel.basicConsume(
149 false, // We send explicit acknowledgements to RabbitMQ
150 RabbitMQMessageConsumer
154 def start(): Unit = {
155 logStartingF(toDebugString) {
156 // The actual starting steps
157 doFullStartupSequence()
160 // If an exception was thrown during startup, run this before logging the error
161 doSafeFullShutdownSequence(true)
166 logStoppingF(toDebugString) {
167 doSafeFullShutdownSequence(false)
171 private[this] def postBusError(event: BusEvent): Unit = {
172 Configurator.MasterConfigurator.eventBus ! event
175 private[this] def doRescheduleStartup(): Unit = {
176 val timerService = Configurator.MasterConfigurator.timerService
177 timerService.scheduleOnce(start(), 1000L * 1)
180 private[this] def doWithChannel[A](f: Channel ⇒ A): Unit = {
184 postBusError(RabbitMQError(e))
185 doSafeFullShutdownSequence(true)
189 object RabbitMQMessageConsumer extends Consumer {
190 def handleConsumeOk(consumerTag: String) = {
193 def handleCancelOk(consumerTag: String) = {
196 def handleCancel(consumerTag: String) = {
199 def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException) = {
202 def handleRecoverOk() = {
205 def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) = {
206 def doError: PartialFunction[Throwable, Unit] = {
208 logger.warn("Unexpected error", e)
215 val deliveryTag = envelope.getDeliveryTag
217 executor.exec(body, handler) {
218 case HandlerResultSuccess ⇒
219 doWithChannel(_.basicAck(deliveryTag, false))
221 case HandlerResultReject(_) ⇒
222 doWithChannel(_.basicReject(deliveryTag, false))
224 case HandlerResultRequeue(_) ⇒
225 doWithChannel(_.basicReject(deliveryTag, true))
227 case HandlerResultPanic ⇒
228 // The other end is crucial to the overall operation and it is in panic mode,
229 // so we stop delivering messages until further notice
230 doSafeFullShutdownSequence(true)
236 object RabbitMQShutdownListener extends ShutdownListener {
237 @inline def isConnectionError(cause: ShutdownSignalException) = cause.isHardError
238 @inline def isChannelError(cause: ShutdownSignalException) = !cause.isHardError
240 def shutdownCompleted(cause: ShutdownSignalException) = {
241 safeUnit { _channel.close() }
243 // Now, let's see what happened
244 if(isConnectionError(cause)) {
245 } else if(isChannelError(cause)) {
250 def toDebugString = {
251 "(servers=%s, exchange=%s, routingKey=%s, queue=%s)".format(
252 serversToDebugStrings.mkString("[", ", ", "]"),
258 override def toString = {
259 "%s%s".format(shortClassNameOf(this), toDebugString)