Refine event payload handling
[aquarium] / src / main / scala / gr / grnet / aquarium / connector / rabbitmq / RabbitMQConsumer.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.connector.rabbitmq
37
38 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQConsumerConf
39 import gr.grnet.aquarium.util.{Lifecycle, Loggable}
40 import gr.grnet.aquarium.util.{safeUnit, shortClassNameOf}
41 import com.rabbitmq.client.{Envelope, Consumer, ShutdownSignalException, ShutdownListener, ConnectionFactory, Channel, Connection}
42 import com.rabbitmq.client.AMQP.BasicProperties
43 import gr.grnet.aquarium.Aquarium
44 import gr.grnet.aquarium.connector.rabbitmq.eventbus.RabbitMQError
45 import gr.grnet.aquarium.service.event.BusEvent
46 import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.{RabbitMQConKeys, RabbitMQQueueKeys, RabbitMQExchangeKeys, RabbitMQChannelKeys}
47 import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
48 import com.ckkloverdos.maybe.{Maybe, Just, Failed, MaybeEither}
49 import gr.grnet.aquarium.connector.handler.{HandlerResultResend, HandlerResult, PayloadHandlerExecutor, HandlerResultPanic, HandlerResultRequeue, HandlerResultReject, HandlerResultSuccess, PayloadHandler}
50
51 /**
52  * A basic `RabbitMQ` consumer. Sufficiently generalized, sufficiently tied to Aquarium.
53  *
54  * @author Christos KK Loverdos <loverdos@gmail.com>
55  */
56
57 class RabbitMQConsumer(val conf: RabbitMQConsumerConf,
58
59                        /**
60                         * Specifies what we do with the message payload.
61                         */
62                        handler: PayloadHandler,
63
64                        /**
65                         * Specifies how we execute the handler
66                         */
67                        executor: PayloadHandlerExecutor,
68
69                        /**
70                         * After the payload is processed, we call this function with ourselves and the result.
71                         */
72                        notifier: (RabbitMQConsumer, Maybe[HandlerResult]) ⇒ Unit
73 ) extends Loggable with Lifecycle { consumerSelf ⇒
74
75   private[this] var _factory: ConnectionFactory = _
76   private[this] var _connection: Connection = _
77   private[this] var _channel: Channel = _
78   private[this] val _state = new AtomicReference[State](Shutdown)
79   private[this] val _pingIsScheduled = new AtomicBoolean(false)
80
81   /**
82    * Reconnects are allowed unless some very specific condition within the application prohibits so.
83    */
84   @volatile private[this] var _allowReconnects = true
85
86   def isAllowingReconnects(): Boolean = _allowReconnects
87
88   def setAllowReconnects(allowReconnects: Boolean) = {
89     _allowReconnects = allowReconnects
90     doSchedulePing()
91   }
92
93   def isAlive() = {
94     val isChannelOpen = MaybeEither((_channel ne null) && _channel.isOpen) match {
95       case failed @ Failed(e) ⇒
96         logger.error("isChannelOpen", e)
97         false
98
99       case Just(x) ⇒
100         x
101     }
102
103     val isConnectionOpen = MaybeEither((_connection ne null) && _connection.isOpen) match {
104       case failed @ Failed(e) ⇒
105         logger.error("isConnectionOpen", e)
106         false
107
108       case Just(x) ⇒
109         x
110     }
111
112     _state.get().isStarted && isChannelOpen && isConnectionOpen
113   }
114
115   sealed trait State {
116     def isStarted: Boolean = false
117   }
118   case object StartupSequence extends State
119   case object Started  extends State {
120     override def isStarted = true
121   }
122   case object ShutdownSequence extends State
123   case object Shutdown extends State
124
125   sealed trait StartReason
126   case object LifecycleStartReason extends StartReason
127   case object PingStartReason extends StartReason
128
129   private[this] def timerService = Aquarium.Instance.timerService
130
131   private[this] lazy val servers = {
132     conf.connectionConf(RabbitMQConKeys.servers)
133   }
134
135   private[this] lazy val reconnectPeriodMillis = {
136     conf.connectionConf(RabbitMQConKeys.reconnect_period_millis)
137   }
138
139   private[this] lazy val serversToDebugStrings = {
140     servers.map(address ⇒ "%s:%s".format(address.getHost, address.getPort)).toList
141   }
142
143   private[this] def infoList(what: String = ""): List[String] = {
144     (what match {
145       case "" ⇒ List()
146       case _  ⇒ List(what)
147     }) ++
148     List(serversToDebugStrings.mkString("(", ", ", ")")) ++
149     List("%s:%s:%s".format(
150       conf.exchangeName,
151       conf.routingKey,
152       conf.queueName
153     ))
154   }
155
156   private[this] def infoString(what: String) = infoList(what).mkString("[", ", ", "]")
157
158   private[this] def doSafeStartupSequence(startReason: StartReason): Unit = {
159     import this.conf._
160
161     if(isAlive() || !isAllowingReconnects() || aquarium.isStopping()) {
162       // In case of re-entrance
163       return
164     }
165
166     try {
167       _state.set(StartupSequence)
168
169       val factory = new ConnectionFactory
170       factory.setConnectionTimeout(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
171       factory.setUsername(connectionConf(RabbitMQConKeys.username))
172       factory.setPassword(connectionConf(RabbitMQConKeys.password))
173       factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
174
175       val connection = factory.newConnection(servers)
176
177       val channel = connection.createChannel()
178
179       channel.addShutdownListener(RabbitMQShutdownListener)
180
181       channel.basicQos(
182         channelConf(RabbitMQChannelKeys.qosPrefetchSize),
183         channelConf(RabbitMQChannelKeys.qosPrefetchCount),
184         channelConf(RabbitMQChannelKeys.qosGlobal)
185       )
186
187       channel.exchangeDeclare(
188         exchangeName,
189         exchangeConf(RabbitMQExchangeKeys.`type`).name,
190         exchangeConf(RabbitMQExchangeKeys.durable),
191         exchangeConf(RabbitMQExchangeKeys.autoDelete),
192         exchangeConf(RabbitMQExchangeKeys.arguments).toJavaMap
193       )
194
195       this._factory = factory
196       this._connection = connection
197       this._channel = channel
198
199       val declareOK = channel.queueDeclare(
200         queueName,
201         queueConf(RabbitMQQueueKeys.durable),
202         queueConf(RabbitMQQueueKeys.exclusive),
203         queueConf(RabbitMQQueueKeys.autoDelete),
204         queueConf(RabbitMQQueueKeys.arguments).toJavaMap
205       )
206
207       val bindOK = channel.queueBind(queueName, exchangeName, routingKey)
208
209       _channel.basicConsume(
210         queueName,
211         false, // We send explicit acknowledgements to RabbitMQ
212         RabbitMQMessageConsumer
213       )
214
215       _state.set(Started)
216
217       logger.info("Connected %s".format(infoString("Start")))
218     }
219     catch {
220       case e: Exception ⇒
221         val info = infoString(startReason.toString)
222         startReason match {
223           case LifecycleStartReason ⇒
224             logger.error("While connecting %s".format(info), e)
225
226           case PingStartReason ⇒
227             logger.warn("Could not reconnect %s".format(info))
228         }
229
230         // Shutdown on failure
231         safeStop()
232     }
233     finally {
234       if(!_pingIsScheduled.get()) {
235         // Schedule periodic pings
236         logger.info("Scheduling %s".format(infoString("Ping")))
237         doSchedulePing()
238         _pingIsScheduled.getAndSet(true)
239       }
240     }
241   }
242
243   def start(): Unit = {
244     safeStart()
245   }
246
247   def safeStart(): Unit = {
248     doSafeStartupSequence(LifecycleStartReason)
249   }
250
251   def safeStop() = {
252     _state.set(ShutdownSequence)
253     safeUnit(_channel.close())
254     safeUnit(_connection.close())
255     _state.set(Shutdown)
256   }
257
258   def stop() = {
259     safeStop()
260   }
261
262   private[this] def aquarium = Aquarium.Instance
263
264   private[this] def postBusError(event: BusEvent): Unit = {
265     aquarium.eventBus ! event
266   }
267
268   private[this] def doSchedulePing(): Unit = {
269     val info = infoString("Ping")
270
271     timerService.scheduleOnce(
272       info,
273       {
274         if(!aquarium.isStopping()) {
275           if(isAllowingReconnects()) {
276             if(!isAlive()) {
277               safeStop()
278               doSafeStartupSequence(PingStartReason)
279             }
280             // Reschedule the ping
281             doSchedulePing()
282           }
283         }
284       },
285       reconnectPeriodMillis,
286       true
287     )
288   }
289
290   private[this] def doWithChannel[A](f: Channel ⇒ A): Unit = {
291     try f(_channel)
292     catch {
293       case e: Exception ⇒
294         // FIXME: What is this?
295         postBusError(RabbitMQError(e))
296
297         safeStop()
298     }
299   }
300
301   object RabbitMQMessageConsumer extends Consumer {
302     def handleConsumeOk(consumerTag: String) = {
303     }
304
305     def handleCancelOk(consumerTag: String) = {
306     }
307
308     def handleCancel(consumerTag: String) = {
309     }
310
311     def handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException) = {
312     }
313
314     def handleRecoverOk() = {
315     }
316
317     def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) = {
318       val onErrorF: PartialFunction[Throwable, Unit] = {
319         case error: Throwable ⇒
320           safeUnit(notifier(consumerSelf, Failed(error)))
321       }
322
323       try {
324         val deliveryTag = envelope.getDeliveryTag
325
326         // nice little composeable functions
327         val notifierF = (handlerResult: HandlerResult) ⇒ {
328           safeUnit(notifier(consumerSelf, Just(handlerResult)))
329           handlerResult
330         }
331
332         val onSuccessBasicStepF: (HandlerResult ⇒ Unit) = {
333           case HandlerResultSuccess ⇒
334             doWithChannel(_.basicAck(deliveryTag, false))
335
336           case HandlerResultResend ⇒
337             doWithChannel(_.basicNack(deliveryTag, false, true))
338
339           case HandlerResultReject(_) ⇒
340             doWithChannel(_.basicReject(deliveryTag, false))
341
342           case HandlerResultRequeue(_) ⇒
343             doWithChannel(_.basicReject(deliveryTag, true))
344
345           case HandlerResultPanic ⇒
346             // Just inform RabbitMQ and subsequent actions will be made by the notifier.
347             // So, this is a `HandlerResultResend` with extra semantics.
348             doWithChannel(_.basicNack(deliveryTag, false, true))
349         }
350
351         val onSuccessF = notifierF andThen onSuccessBasicStepF
352
353         executor.exec(body, handler) (onSuccessF) (onErrorF)
354       }
355       catch (onErrorF)
356     }
357   }
358
359   object RabbitMQShutdownListener extends ShutdownListener {
360     def shutdownCompleted(cause: ShutdownSignalException) = {
361       cause.getReason
362       logger.info("Got shutdown (%sisHardError) %s".format(
363         if(cause.isHardError) "" else "!",
364         cause.toString))
365
366       // Now, let's see what happened
367       if(cause.isHardError) {
368       } else {
369       }
370
371       safeStop()
372     }
373   }
374
375   override def toString = {
376     "%s%s".format(shortClassNameOf(this), infoString(""))
377   }
378 }