import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQConsumerConf
import gr.grnet.aquarium.util.{Lifecycle, Loggable}
-import gr.grnet.aquarium.util.tryUnit
-import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.{RabbitMQQueueKeys, RabbitMQExchangeKeys}
-import java.util.concurrent.atomic.AtomicBoolean
+import gr.grnet.aquarium.util.{safeUnit, shortClassNameOf}
import com.rabbitmq.client.{Envelope, Consumer, ShutdownSignalException, ShutdownListener, ConnectionFactory, Channel, Connection}
import com.rabbitmq.client.AMQP.BasicProperties
+import gr.grnet.aquarium.Aquarium
+import gr.grnet.aquarium.connector.rabbitmq.eventbus.RabbitMQError
+import gr.grnet.aquarium.service.event.BusEvent
+import gr.grnet.aquarium.connector.handler.{PayloadHandlerExecutor, HandlerResultPanic, HandlerResultRequeue, HandlerResultReject, HandlerResultSuccess, PayloadHandler}
+import gr.grnet.aquarium.util.date.TimeHelpers
+import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.{RabbitMQConKeys, RabbitMQQueueKeys, RabbitMQExchangeKeys, RabbitMQChannelKeys}
+import java.util.concurrent.atomic.{AtomicLong, AtomicReference, AtomicBoolean}
+import com.ckkloverdos.maybe.{Just, Failed, MaybeEither}
/**
+ * A basic `RabbitMQ` consumer. Sufficiently generalized, sufficiently tied to Aquarium.
*
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
-class RabbitMQConsumer(val conf: RabbitMQConsumerConf) extends Loggable with Lifecycle {
+class RabbitMQConsumer(conf: RabbitMQConsumerConf,
+ handler: PayloadHandler,
+ executor: PayloadHandlerExecutor) extends Loggable with Lifecycle { consumerSelf ⇒
private[this] var _factory: ConnectionFactory = _
private[this] var _connection: Connection = _
private[this] var _channel: Channel = _
- private[this] val _isAlive = new AtomicBoolean(false)
+// private[this] val _isAlive = new AtomicBoolean(false)
+ private[this] val _state = new AtomicReference[State](Shutdown)
+ private[this] var _lastStartFailureMillis = -1L
+ private[this] val _pingIsScheduled = new AtomicBoolean(false)
def isAlive() = {
- _isAlive.get()
+ val isChannelOpen = MaybeEither((_channel ne null) && _channel.isOpen) match {
+ case failed @ Failed(e) ⇒
+ logger.error("isChannelOpen", e)
+ false
+
+ case Just(x) ⇒
+ x
+ }
+
+ val isConnectionOpen = MaybeEither((_connection ne null) && _connection.isOpen) match {
+ case failed @ Failed(e) ⇒
+ logger.error("isConnectionOpen", e)
+ false
+
+ case Just(x) ⇒
+ x
+ }
+
+ _state.get().isStarted && isChannelOpen && isConnectionOpen
}
- def start() = {
- import service.RabbitMQService.RabbitMQConKeys
- import service.RabbitMQService.RabbitMQChannelKeys
- import conf._
+ sealed trait State {
+ def isStarted: Boolean = false
+ }
+ case object StartupSequence extends State
+ case class BadStart(e: Exception) extends State
+ case object Started extends State {
+ override def isStarted = true
+ }
+ case object ShutdownSequence extends State
+ case object Shutdown extends State
- val factory = new ConnectionFactory
- factory.setUsername(connectionConf(RabbitMQConKeys.username))
- factory.setPassword(connectionConf(RabbitMQConKeys.password))
- factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
+ sealed trait StartReason
+ case object LifecycleStartReason extends StartReason
+ case object ReconnectStartReason extends StartReason
+ case object PingStartReason extends StartReason
- val connection = factory.newConnection(connectionConf(RabbitMQConKeys.servers))
+ private[this] def timerService = Aquarium.Instance.timerService
- val channel = connection.createChannel()
+ private[this] def doSafeShutdownSequence(): Unit = {
+ _state.set(ShutdownSequence)
+ safeUnit(_channel.close())
+ safeUnit(_connection.close())
+ _state.set(Shutdown)
+ }
- channel.basicQos(
- channelConf(RabbitMQChannelKeys.qosPrefetchSize),
- channelConf(RabbitMQChannelKeys.qosPrefetchCount),
- channelConf(RabbitMQChannelKeys.qosGlobal)
- )
+ private[this] lazy val servers = {
+ conf.connectionConf(RabbitMQConKeys.servers)
+ }
- channel.exchangeDeclare(
- exchangeName,
- exchangeConf(RabbitMQExchangeKeys.`type`).name,
- exchangeConf(RabbitMQExchangeKeys.durable),
- exchangeConf(RabbitMQExchangeKeys.autoDelete),
- exchangeConf(RabbitMQExchangeKeys.arguments).toJavaMap.asInstanceOf[java.util.Map[String, AnyRef]]
- )
+ private[this] lazy val reconnectPeriodMillis = {
+ conf.connectionConf(RabbitMQConKeys.reconnect_period_millis)
+ }
- this._factory = factory
- this._connection = connection
- this._channel = channel
+ private[this] lazy val serversToDebugStrings = {
+ servers.map(address ⇒ "%s:%s".format(address.getHost, address.getPort)).toList
+ }
- val declareOK = channel.queueDeclare(
- queueName,
- queueConf(RabbitMQQueueKeys.durable),
- queueConf(RabbitMQQueueKeys.exclusive),
- queueConf(RabbitMQQueueKeys.autoDelete),
- queueConf(RabbitMQQueueKeys.arguments).toJavaMap.asInstanceOf[java.util.Map[String, AnyRef]]
- )
+ private[this] def infoList(what: String): List[String] = {
+ List(what) ++
+ List(serversToDebugStrings.mkString("(", ", ", ")")) ++
+ List("%s:%s:%s".format(
+ conf.exchangeName,
+ conf.routingKey,
+ conf.queueName
+ ))
+ }
+
+ private[this] def infoString(what: String) = infoList(what).mkString("[", ", ", "]")
+
+ private[this] def doSafeStartupSequence(startReason: StartReason): Unit = {
+ import this.conf._
+
+ if(isAlive()) {
+ return
+ }
+
+ try {
+ _state.set(StartupSequence)
+
+ val factory = new ConnectionFactory
+ factory.setConnectionTimeout(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
+ factory.setUsername(connectionConf(RabbitMQConKeys.username))
+ factory.setPassword(connectionConf(RabbitMQConKeys.password))
+ factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
+
+ val connection = factory.newConnection(servers)
+
+ val channel = connection.createChannel()
+
+ channel.addShutdownListener(RabbitMQShutdownListener)
+
+ channel.basicQos(
+ channelConf(RabbitMQChannelKeys.qosPrefetchSize),
+ channelConf(RabbitMQChannelKeys.qosPrefetchCount),
+ channelConf(RabbitMQChannelKeys.qosGlobal)
+ )
+
+ channel.exchangeDeclare(
+ exchangeName,
+ exchangeConf(RabbitMQExchangeKeys.`type`).name,
+ exchangeConf(RabbitMQExchangeKeys.durable),
+ exchangeConf(RabbitMQExchangeKeys.autoDelete),
+ exchangeConf(RabbitMQExchangeKeys.arguments).toJavaMap
+ )
+
+ this._factory = factory
+ this._connection = connection
+ this._channel = channel
+
+ val declareOK = channel.queueDeclare(
+ queueName,
+ queueConf(RabbitMQQueueKeys.durable),
+ queueConf(RabbitMQQueueKeys.exclusive),
+ queueConf(RabbitMQQueueKeys.autoDelete),
+ queueConf(RabbitMQQueueKeys.arguments).toJavaMap
+ )
- logger.info("Queue declaration: {}", declareOK)
+ val bindOK = channel.queueBind(queueName, exchangeName, routingKey)
- val bindOK = channel.queueBind(queueName, exchangeName, routingKey)
- channel.addShutdownListener(RabbitMQShutdownListener)
+ _channel.basicConsume(
+ queueName,
+ false, // We send explicit acknowledgements to RabbitMQ
+ RabbitMQMessageConsumer
+ )
- if(_channel.isOpen) {
- _isAlive.getAndSet(true)
+ _state.set(Started)
+
+ logger.info("Connected %s".format(infoString("Start")))
+ }
+ catch {
+ case e: Exception ⇒
+ val info = infoString(startReason.toString)
+ startReason match {
+ case LifecycleStartReason ⇒
+ logger.error("While connecting %s".format(info), e)
+
+ case ReconnectStartReason | PingStartReason ⇒
+ val now = TimeHelpers.nowMillis()
+ if(true/*_lastStartFailureMillis - now > 5*/) {
+ logger.warn("Could not reconnect %s".format(info))
+ }
+ _lastStartFailureMillis = now
+ }
+
+ // Shutdown on failure
+ doSafeShutdownSequence()
}
+ finally {
+ if(!_pingIsScheduled.get()) {
+ // Schedule periodic pings
+ logger.info("Scheduling %s".format(infoString("Ping")))
+ doSchedulePing()
+ _pingIsScheduled.getAndSet(true)
+ }
+ }
+ }
+
+ def start(): Unit = {
+ doSafeStartupSequence(LifecycleStartReason)
+ }
+
+ def stop() = {
+ doSafeShutdownSequence()
+ }
+
+ private[this] def postBusError(event: BusEvent): Unit = {
+ Aquarium.Instance.eventBus ! event
+ }
+
+ private[this] def doSchedulePing(): Unit = {
+ val info = infoString("Ping")
- _channel.basicConsume(
- queueName,
- false, // We send explicit acknowledgements to RabbitMQ
- RabbitMQMessageConsumer
+ timerService.scheduleOnce(
+ info,
+ {
+ val isAlive = consumerSelf.isAlive()
+// logger.info("Ping state is %s (isAlive=%s) for %s".format(_state.get(), isAlive, info))
+
+ if(!isAlive) {
+ doSafeShutdownSequence()
+ doSafeStartupSequence(PingStartReason)
+ }
+
+ // Reschedule the ping
+ doSchedulePing()
+ },
+ reconnectPeriodMillis
)
+ }
- logger.info("Queue binding {}", bindOK)
+ private[this] def doWithChannel[A](f: Channel ⇒ A): Unit = {
+ try f(_channel)
+ catch {
+ case e: Exception ⇒
+ postBusError(RabbitMQError(e))
+ doSafeShutdownSequence()
+ }
}
object RabbitMQMessageConsumer extends Consumer {
}
def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) = {
+ def doError: PartialFunction[Throwable, Unit] = {
+ case e: Exception ⇒
+ logger.warn("Unexpected error", e)
+
+ case e: Throwable ⇒
+ throw e
+ }
+
+ try {
+ val deliveryTag = envelope.getDeliveryTag
+ executor.exec(body, handler) {
+ case HandlerResultSuccess ⇒
+ doWithChannel(_.basicAck(deliveryTag, false))
+
+ case HandlerResultReject(_) ⇒
+ doWithChannel(_.basicReject(deliveryTag, false))
+
+ case HandlerResultRequeue(_) ⇒
+ doWithChannel(_.basicReject(deliveryTag, true))
+
+ case HandlerResultPanic ⇒
+ // The other end is crucial to the overall operation and it is in panic mode,
+ // so we stop delivering messages until further notice
+ doSafeShutdownSequence()
+ } (doError)
+ } catch (doError)
}
}
object RabbitMQShutdownListener extends ShutdownListener {
- @inline def isConnectionError(cause: ShutdownSignalException) = cause.isHardError
- @inline def isChannelError(cause: ShutdownSignalException) = !cause.isHardError
-
def shutdownCompleted(cause: ShutdownSignalException) = {
- tryUnit { _channel.close() }
- _isAlive.getAndSet(false)
+ logger.info("Got shutdown %s".format(cause))
// Now, let's see what happened
- if(isConnectionError(cause)) {
- } else if(isChannelError(cause)) {
+ if(cause.isHardError) {
+ logger.info("Channel shutdown isHardError")
+ } else {
+ logger.info("Channel shutdown !isHardError")
}
+
+ doSafeShutdownSequence()
}
}
- def stop() = {
+ def toDebugString = {
+ "(servers=%s, exchange=%s, routingKey=%s, queue=%s)".format(
+ serversToDebugStrings.mkString("[", ", ", "]"),
+ conf.exchangeName,
+ conf.routingKey,
+ conf.queueName)
+ }
+ override def toString = {
+ "%s%s".format(shortClassNameOf(this), toDebugString)
}
}