Logging facilities
[aquarium] / src / main / scala / gr / grnet / aquarium / connector / rabbitmq / RabbitMQConsumer.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.connector.rabbitmq
37
38 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQConsumerConf
39 import gr.grnet.aquarium.util.{Lifecycle, Loggable}
40 import gr.grnet.aquarium.util.{safeUnit, shortClassNameOf}
41 import com.rabbitmq.client.{Envelope, Consumer, ShutdownSignalException, ShutdownListener, ConnectionFactory, Channel, Connection}
42 import com.rabbitmq.client.AMQP.BasicProperties
43 import gr.grnet.aquarium.Configurator
44 import gr.grnet.aquarium.connector.rabbitmq.eventbus.RabbitMQError
45 import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
46 import gr.grnet.aquarium.service.event.BusEvent
47 import gr.grnet.aquarium.connector.handler.{PayloadHandlerExecutor, HandlerResultPanic, HandlerResultRequeue, HandlerResultReject, HandlerResultSuccess, PayloadHandler}
48 import com.ckkloverdos.maybe.MaybeEither
49 import gr.grnet.aquarium.util.date.TimeHelpers
50 import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.{RabbitMQConKeys, RabbitMQQueueKeys, RabbitMQExchangeKeys, RabbitMQChannelKeys}
51
52 /**
53  * A basic `RabbitMQ` consumer. Sufficiently generalized, sufficiently tied to Aquarium.
54  *
55  * @author Christos KK Loverdos <loverdos@gmail.com>
56  */
57
58 class RabbitMQConsumer(conf: RabbitMQConsumerConf,
59                        handler: PayloadHandler,
60                        executor: PayloadHandlerExecutor) extends Loggable with Lifecycle {
61   private[this] var _factory: ConnectionFactory = _
62   private[this] var _connection: Connection = _
63   private[this] var _channel: Channel = _
64 //  private[this] val _isAlive = new AtomicBoolean(false)
65   private[this] val _state = new AtomicReference[State](Shutdown)
66
67   def isStarted() = {
68     _state.get().isStarted && MaybeEither(_channel.isOpen).getOr(false)
69   }
70
71   sealed trait State {
72     def isStarted: Boolean = false
73   }
74   case object StartupSequence extends State
75   case class  BadStart(e: Exception) extends State
76   case object Started  extends State { override def isStarted = true }
77   case object ShutdownSequence extends State
78   case object Shutdown extends State
79
80   private[this] def doFullShutdownSequence(): Unit = {
81     _state.set(ShutdownSequence)
82     safeUnit(_channel.close())
83     safeUnit(_connection.close())
84     _state.set(Shutdown)
85   }
86
87   private[this] def doSafeFullShutdownSequence(rescheduleStartup: Boolean): Unit = {
88     safeUnit(doFullShutdownSequence())
89     _state.set(Shutdown)
90     if(rescheduleStartup) {
91       doRescheduleStartup()
92     }
93   }
94
95   private[this] def servers = {
96     conf.connectionConf(RabbitMQConKeys.servers)
97   }
98
99   private[this] def serversToDebugStrings = {
100     servers.map(address ⇒ "%s:%s".format(address.getHost, address.getPort))
101   }
102
103   private[this] def doFullStartupSequence() = {
104     import this.conf._
105
106     _state.set(StartupSequence)
107
108     val factory = new ConnectionFactory
109     factory.setUsername(connectionConf(RabbitMQConKeys.username))
110     factory.setPassword(connectionConf(RabbitMQConKeys.password))
111     factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
112
113     val connection = factory.newConnection(servers)
114
115     val channel = connection.createChannel()
116
117     channel.basicQos(
118       channelConf(RabbitMQChannelKeys.qosPrefetchSize),
119       channelConf(RabbitMQChannelKeys.qosPrefetchCount),
120       channelConf(RabbitMQChannelKeys.qosGlobal)
121     )
122
123     channel.exchangeDeclare(
124       exchangeName,
125       exchangeConf(RabbitMQExchangeKeys.`type`).name,
126       exchangeConf(RabbitMQExchangeKeys.durable),
127       exchangeConf(RabbitMQExchangeKeys.autoDelete),
128       exchangeConf(RabbitMQExchangeKeys.arguments).toJavaMap
129     )
130
131     this._factory = factory
132     this._connection = connection
133     this._channel = channel
134
135     val declareOK = channel.queueDeclare(
136       queueName,
137       queueConf(RabbitMQQueueKeys.durable),
138       queueConf(RabbitMQQueueKeys.exclusive),
139       queueConf(RabbitMQQueueKeys.autoDelete),
140       queueConf(RabbitMQQueueKeys.arguments).toJavaMap
141     )
142
143     val bindOK = channel.queueBind(queueName, exchangeName, routingKey)
144
145     channel.addShutdownListener(RabbitMQShutdownListener)
146
147     _channel.basicConsume(
148       queueName,
149       false, // We send explicit acknowledgements to RabbitMQ
150       RabbitMQMessageConsumer
151     )
152   }
153
154   def start(): Unit = {
155     logStartingF(toDebugString) {
156       // The actual starting steps
157       doFullStartupSequence()
158       _state.set(Started)
159     } {
160       // If an exception was thrown during startup, run this before logging the error
161       doSafeFullShutdownSequence(true)
162     }
163   }
164
165   def stop() = {
166     logStoppingF(toDebugString) {
167       doSafeFullShutdownSequence(false)
168     } {}
169   }
170
171   private[this] def postBusError(event: BusEvent): Unit = {
172     Configurator.MasterConfigurator.eventBus ! event
173   }
174
175   private[this] def doRescheduleStartup(): Unit = {
176     val timerService = Configurator.MasterConfigurator.timerService
177     timerService.scheduleOnce(start(), 1000L * 1)
178   }
179
180   private[this] def doWithChannel[A](f: Channel ⇒ A): Unit = {
181     try f(_channel)
182     catch {
183       case e: Exception ⇒
184         postBusError(RabbitMQError(e))
185         doSafeFullShutdownSequence(true)
186     }
187   }
188
189   object RabbitMQMessageConsumer extends Consumer {
190     def handleConsumeOk(consumerTag: String) = {
191     }
192
193     def handleCancelOk(consumerTag: String) = {
194     }
195
196     def handleCancel(consumerTag: String) = {
197     }
198
199     def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException) = {
200     }
201
202     def handleRecoverOk() = {
203     }
204
205     def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) = {
206       def doError: PartialFunction[Throwable, Unit] = {
207         case e: Exception ⇒
208           logger.warn("Unexpected error", e)
209
210         case e: Throwable ⇒
211           throw e
212       }
213
214       try {
215         val deliveryTag = envelope.getDeliveryTag
216
217         executor.exec(body, handler) {
218           case HandlerResultSuccess ⇒
219             doWithChannel(_.basicAck(deliveryTag, false))
220
221           case HandlerResultReject(_) ⇒
222             doWithChannel(_.basicReject(deliveryTag, false))
223
224           case HandlerResultRequeue(_) ⇒
225             doWithChannel(_.basicReject(deliveryTag, true))
226
227           case HandlerResultPanic ⇒
228             // The other end is crucial to the overall operation and it is in panic mode,
229             // so we stop delivering messages until further notice
230             doSafeFullShutdownSequence(true)
231         } (doError)
232       } catch (doError)
233     }
234   }
235
236   object RabbitMQShutdownListener extends ShutdownListener {
237     @inline def isConnectionError(cause: ShutdownSignalException) = cause.isHardError
238     @inline def isChannelError(cause: ShutdownSignalException) = !cause.isHardError
239
240     def shutdownCompleted(cause: ShutdownSignalException) = {
241       safeUnit { _channel.close() }
242
243       // Now, let's see what happened
244       if(isConnectionError(cause)) {
245       } else if(isChannelError(cause)) {
246       }
247     }
248   }
249
250   def toDebugString = {
251     "(servers=%s, exchange=%s, routingKey=%s, queue=%s)".format(
252       serversToDebugStrings.mkString("[", ", ", "]"),
253       conf.exchangeName,
254       conf.routingKey,
255       conf.queueName)
256   }
257
258   override def toString = {
259     "%s%s".format(shortClassNameOf(this), toDebugString)
260   }
261 }