dafa4c50aa10f3e484312e5458394c35ee188583
[aquarium] / src / main / scala / gr / grnet / aquarium / connector / rabbitmq / RabbitMQConsumer.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.connector.rabbitmq
37
38 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQConsumerConf
39 import gr.grnet.aquarium.util.{Lifecycle, Loggable}
40 import gr.grnet.aquarium.util.{safeUnit, shortClassNameOf}
41 import com.rabbitmq.client.{Envelope, Consumer, ShutdownSignalException, ShutdownListener, ConnectionFactory, Channel, Connection}
42 import com.rabbitmq.client.AMQP.BasicProperties
43 import gr.grnet.aquarium.Aquarium
44 import gr.grnet.aquarium.connector.rabbitmq.eventbus.RabbitMQError
45 import gr.grnet.aquarium.service.event.BusEvent
46 import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
47 import com.ckkloverdos.maybe.{Maybe, Just, Failed, MaybeEither}
48 import gr.grnet.aquarium.connector.handler.{HandlerResultResend, HandlerResult, PayloadHandlerExecutor, HandlerResultPanic, HandlerResultRequeue, HandlerResultReject, HandlerResultSuccess, PayloadHandler}
49 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys.{RabbitMQConKeys, RabbitMQChannelKeys, RabbitMQExchangeKeys, RabbitMQQueueKeys}
50
51 /**
52  * A basic `RabbitMQ` consumer. Sufficiently generalized, sufficiently tied to Aquarium.
53  *
54  * @author Christos KK Loverdos <loverdos@gmail.com>
55  */
56
57 class RabbitMQConsumer(val conf: RabbitMQConsumerConf,
58
59                        /**
60                         * Specifies what we do with the message payload.
61                         */
62                        handler: PayloadHandler,
63
64                        /**
65                         * Specifies how we execute the handler
66                         */
67                        executor: PayloadHandlerExecutor,
68
69                        /**
70                         * After the payload is processed, we call this function with ourselves and the result.
71                         */
72                        notifier: (RabbitMQConsumer, Maybe[HandlerResult]) ⇒ Unit
73 ) extends Loggable with Lifecycle { consumerSelf ⇒
74
75   private[this] var _factory: ConnectionFactory = _
76   private[this] var _connection: Connection = _
77   private[this] var _channel: Channel = _
78   private[this] val _state = new AtomicReference[State](Shutdown)
79   private[this] val _pingIsScheduled = new AtomicBoolean(false)
80
81   /**
82    * Reconnects are allowed unless some very specific condition within the application prohibits so.
83    */
84   @volatile private[this] var _allowReconnects = true
85
86   def isAllowingReconnects(): Boolean = _allowReconnects
87
88   def setAllowReconnects(allowReconnects: Boolean) = {
89     _allowReconnects = allowReconnects
90     doSchedulePing()
91   }
92
93   def isAlive() = {
94     val isChannelOpen = MaybeEither((_channel ne null) && _channel.isOpen) match {
95       case failed @ Failed(e) ⇒
96         logger.error("isChannelOpen", e)
97         false
98
99       case Just(x) ⇒
100         x
101     }
102
103     val isConnectionOpen = MaybeEither((_connection ne null) && _connection.isOpen) match {
104       case failed @ Failed(e) ⇒
105         logger.error("isConnectionOpen", e)
106         false
107
108       case Just(x) ⇒
109         x
110     }
111
112     _state.get().isStarted && isChannelOpen && isConnectionOpen
113   }
114
115   sealed trait State {
116     def isStarted: Boolean = false
117   }
118   case object StartupSequence extends State
119   case object Started  extends State {
120     override def isStarted = true
121   }
122   case object ShutdownSequence extends State
123   case object Shutdown extends State
124
125   sealed trait StartReason
126   case object LifecycleStartReason extends StartReason
127   case object PingStartReason extends StartReason
128
129   private[this] def timerService = Aquarium.Instance.timerService
130
131   private[this] lazy val servers = {
132     conf.connectionConf(RabbitMQConKeys.servers)
133   }
134
135   private[this] lazy val reconnectPeriodMillis = {
136     conf.connectionConf(RabbitMQConKeys.reconnect_period_millis)
137   }
138
139   private[this] lazy val serversToDebugStrings = {
140     servers.map(address ⇒ "%s:%s".format(address.getHost, address.getPort)).toList
141   }
142
143   private[this] def infoList(what: String = ""): List[String] = {
144     (what match {
145       case "" ⇒ List()
146       case _  ⇒ List(what)
147     }) ++
148     List(serversToDebugStrings.mkString("(", ", ", ")")) ++
149     List("%s:%s:%s".format(
150       conf.exchangeName,
151       conf.routingKey,
152       conf.queueName
153     ))
154   }
155
156   private[this] def infoString(what: String) = infoList(what).mkString("[", ", ", "]")
157
158   private[this] def doSafeStartupSequence(startReason: StartReason): Unit = {
159     import this.conf._
160
161     if(isAlive() || !isAllowingReconnects() || aquarium.isStopping()) {
162       // In case of re-entrance
163       return
164     }
165
166     try {
167       _state.set(StartupSequence)
168
169       val factory = new ConnectionFactory
170       factory.setConnectionTimeout(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
171       factory.setUsername(connectionConf(RabbitMQConKeys.username))
172       factory.setPassword(connectionConf(RabbitMQConKeys.password))
173       factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
174       factory.setRequestedHeartbeat(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
175
176       val connection = factory.newConnection(servers)
177
178       val channel = connection.createChannel()
179
180       channel.addShutdownListener(RabbitMQShutdownListener)
181
182       channel.basicQos(
183         channelConf(RabbitMQChannelKeys.qosPrefetchSize),
184         channelConf(RabbitMQChannelKeys.qosPrefetchCount),
185         channelConf(RabbitMQChannelKeys.qosGlobal)
186       )
187
188       channel.exchangeDeclare(
189         exchangeName,
190         exchangeConf(RabbitMQExchangeKeys.`type`).name,
191         exchangeConf(RabbitMQExchangeKeys.durable),
192         exchangeConf(RabbitMQExchangeKeys.autoDelete),
193         exchangeConf(RabbitMQExchangeKeys.arguments).toJavaMap
194       )
195
196       this._factory = factory
197       this._connection = connection
198       this._channel = channel
199
200       val declareOK = channel.queueDeclare(
201         queueName,
202         queueConf(RabbitMQQueueKeys.durable),
203         queueConf(RabbitMQQueueKeys.exclusive),
204         queueConf(RabbitMQQueueKeys.autoDelete),
205         queueConf(RabbitMQQueueKeys.arguments).toJavaMap
206       )
207
208       val bindOK = channel.queueBind(queueName, exchangeName, routingKey)
209
210       _channel.basicConsume(
211         queueName,
212         false, // We send explicit acknowledgements to RabbitMQ
213         RabbitMQMessageConsumer
214       )
215
216       _state.set(Started)
217
218       logger.info("Connected %s".format(infoString("Start")))
219     }
220     catch {
221       case e: Exception ⇒
222         val info = infoString(startReason.toString)
223         startReason match {
224           case LifecycleStartReason ⇒
225             logger.error("While connecting %s".format(info), e)
226
227           case PingStartReason ⇒
228             logger.warn("Could not reconnect %s".format(info))
229         }
230
231         // Shutdown on failure
232         safeStop()
233     }
234     finally {
235       if(!_pingIsScheduled.get()) {
236         // Schedule periodic pings
237         logger.info("Scheduling %s".format(infoString("Ping")))
238         doSchedulePing()
239         _pingIsScheduled.getAndSet(true)
240       }
241     }
242   }
243
244   def start(): Unit = {
245     safeStart()
246   }
247
248   def safeStart(): Unit = {
249     doSafeStartupSequence(LifecycleStartReason)
250   }
251
252   def safeStop() = {
253     _state.set(ShutdownSequence)
254     safeUnit(_channel.removeShutdownListener(RabbitMQShutdownListener))
255     safeUnit(_channel.close())
256     safeUnit(_connection.close())
257     _state.set(Shutdown)
258   }
259
260   def stop() = {
261     safeStop()
262   }
263
264   private[this] def aquarium = Aquarium.Instance
265
266   private[this] def postBusError(event: BusEvent): Unit = {
267     aquarium.eventBus ! event
268   }
269
270   private[this] def doSchedulePing(): Unit = {
271     val info = infoString("Ping")
272
273     timerService.scheduleOnce(
274       info,
275       {
276         if(!aquarium.isStopping()) {
277           if(isAllowingReconnects()) {
278             if(!isAlive()) {
279               safeStop()
280               doSafeStartupSequence(PingStartReason)
281             }
282             // Reschedule the ping
283             doSchedulePing()
284           }
285         }
286       },
287       reconnectPeriodMillis,
288       true
289     )
290   }
291
292   private[this] def doWithChannel[A](f: Channel ⇒ A): Unit = {
293     try f(_channel)
294     catch {
295       case e: Exception ⇒
296         // FIXME: What is this?
297         postBusError(RabbitMQError(e))
298
299         safeStop()
300     }
301   }
302
303   object RabbitMQMessageConsumer extends Consumer {
304     def handleConsumeOk(consumerTag: String) = {
305     }
306
307     def handleCancelOk(consumerTag: String) = {
308     }
309
310     def handleCancel(consumerTag: String) = {
311     }
312
313     def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException) = {
314     }
315
316
317     def handleRecoverOk(consumerTag: String) = {
318     }
319
320     def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) = {
321       val onErrorF: PartialFunction[Throwable, Unit] = {
322         case error: Throwable ⇒
323           safeUnit(notifier(consumerSelf, Failed(error)))
324       }
325
326       try {
327         val deliveryTag = envelope.getDeliveryTag
328
329         // nice little composeable functions
330         val notifierF = (handlerResult: HandlerResult) ⇒ {
331           safeUnit(notifier(consumerSelf, Just(handlerResult)))
332           handlerResult
333         }
334
335         val onSuccessBasicStepF: (HandlerResult ⇒ HandlerResult) = {
336           case result @ HandlerResultSuccess ⇒
337             doWithChannel(_.basicAck(deliveryTag, false))
338             result
339
340           case result @ HandlerResultResend ⇒
341             doWithChannel(_.basicNack(deliveryTag, false, true))
342             result
343
344           case result @ HandlerResultReject(_) ⇒
345             doWithChannel(_.basicReject(deliveryTag, false))
346             result
347
348           case result @ HandlerResultRequeue(_) ⇒
349             doWithChannel(_.basicReject(deliveryTag, true))
350             result
351
352           case result @ HandlerResultPanic ⇒
353             // Just inform RabbitMQ and subsequent actions will be made by the notifier.
354             // So, this is a `HandlerResultResend` with extra semantics.
355             doWithChannel(_.basicNack(deliveryTag, false, true))
356             result
357         }
358
359         val onSuccessF = onSuccessBasicStepF andThen notifierF
360
361         executor.exec(body, handler) (onSuccessF) (onErrorF)
362       }
363       catch (onErrorF)
364     }
365   }
366
367   object RabbitMQShutdownListener extends ShutdownListener {
368     def shutdownCompleted(cause: ShutdownSignalException) = {
369       cause.getReason
370       logger.info("Got shutdown (%sisHardError) %s".format(
371         if(cause.isHardError) "" else "!",
372         cause.toString))
373
374       // Now, let's see what happened
375       if(cause.isHardError) {
376       } else {
377       }
378
379       safeStop()
380     }
381   }
382
383   override def toString = {
384     "%s%s".format(shortClassNameOf(this), infoString(""))
385   }
386 }