WIP Rework AMQP stuff: Automatic reconnection to RabbitMQ
authorChristos KK Loverdos <loverdos@gmail.com>
Wed, 16 May 2012 11:31:17 +0000 (14:31 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Wed, 16 May 2012 11:31:17 +0000 (14:31 +0300)
src/main/resources/aquarium.properties
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQConsumer.scala
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/RabbitMQService.scala
src/main/scala/gr/grnet/aquarium/service/SimpleTimerService.scala
src/main/scala/gr/grnet/aquarium/service/TimerService.scala
src/test/resources/aquarium.properties

index 00a79ed..989c5eb 100644 (file)
@@ -11,6 +11,9 @@ aquarium.role-agreement.map=role-agreement.map
 
 ### Queue related settings
 
+# How often do we attemot a reconnection?
+rabbitmq.reconnect.period.millis=1000
+
 # Comma separated list of rabbitmq servers to use. The servers must be in an
 # active-active mode.
 rabbitmq.servers=localhost
index 6fbb098..f73167c 100644 (file)
@@ -42,12 +42,12 @@ import com.rabbitmq.client.{Envelope, Consumer, ShutdownSignalException, Shutdow
 import com.rabbitmq.client.AMQP.BasicProperties
 import gr.grnet.aquarium.Configurator
 import gr.grnet.aquarium.connector.rabbitmq.eventbus.RabbitMQError
-import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
 import gr.grnet.aquarium.service.event.BusEvent
 import gr.grnet.aquarium.connector.handler.{PayloadHandlerExecutor, HandlerResultPanic, HandlerResultRequeue, HandlerResultReject, HandlerResultSuccess, PayloadHandler}
-import com.ckkloverdos.maybe.MaybeEither
 import gr.grnet.aquarium.util.date.TimeHelpers
 import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.{RabbitMQConKeys, RabbitMQQueueKeys, RabbitMQExchangeKeys, RabbitMQChannelKeys}
+import java.util.concurrent.atomic.{AtomicLong, AtomicReference, AtomicBoolean}
+import com.ckkloverdos.maybe.{Just, Failed, MaybeEither}
 
 /**
  * A basic `RabbitMQ` consumer. Sufficiently generalized, sufficiently tied to Aquarium.
@@ -57,15 +57,35 @@ import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.{RabbitMQCon
 
 class RabbitMQConsumer(conf: RabbitMQConsumerConf,
                        handler: PayloadHandler,
-                       executor: PayloadHandlerExecutor) extends Loggable with Lifecycle {
+                       executor: PayloadHandlerExecutor) extends Loggable with Lifecycle { consumerSelf ⇒
   private[this] var _factory: ConnectionFactory = _
   private[this] var _connection: Connection = _
   private[this] var _channel: Channel = _
 //  private[this] val _isAlive = new AtomicBoolean(false)
   private[this] val _state = new AtomicReference[State](Shutdown)
+  private[this] var _lastStartFailureMillis = -1L
+  private[this] val _pingIsScheduled = new AtomicBoolean(false)
 
-  def isStarted() = {
-    _state.get().isStarted && MaybeEither(_channel.isOpen).getOr(false)
+  def isAlive() = {
+    val isChannelOpen = MaybeEither((_channel ne null) && _channel.isOpen) match {
+      case failed @ Failed(e) ⇒
+        logger.error("isChannelOpen", e)
+        false
+
+      case Just(x) ⇒
+        x
+    }
+
+    val isConnectionOpen = MaybeEither((_connection ne null) && _connection.isOpen) match {
+      case failed @ Failed(e) ⇒
+        logger.error("isConnectionOpen", e)
+        false
+
+      case Just(x) ⇒
+        x
+    }
+
+    _state.get().isStarted && isChannelOpen && isConnectionOpen
   }
 
   sealed trait State {
@@ -73,108 +93,169 @@ class RabbitMQConsumer(conf: RabbitMQConsumerConf,
   }
   case object StartupSequence extends State
   case class  BadStart(e: Exception) extends State
-  case object Started  extends State { override def isStarted = true }
+  case object Started  extends State {
+    override def isStarted = true
+  }
   case object ShutdownSequence extends State
   case object Shutdown extends State
 
-  private[this] def doFullShutdownSequence(): Unit = {
+  sealed trait StartReason
+  case object LifecycleStartReason extends StartReason
+  case object ReconnectStartReason extends StartReason
+  case object PingStartReason extends StartReason
+
+  private[this] def timerService = Configurator.MasterConfigurator.timerService
+
+  private[this] def doSafeShutdownSequence(): Unit = {
     _state.set(ShutdownSequence)
     safeUnit(_channel.close())
     safeUnit(_connection.close())
     _state.set(Shutdown)
   }
 
-  private[this] def doSafeFullShutdownSequence(rescheduleStartup: Boolean): Unit = {
-    safeUnit(doFullShutdownSequence())
-    _state.set(Shutdown)
-    if(rescheduleStartup) {
-      doRescheduleStartup()
-    }
+  private[this] lazy val servers = {
+    conf.connectionConf(RabbitMQConKeys.servers)
   }
 
-  private[this] def servers = {
-    conf.connectionConf(RabbitMQConKeys.servers)
+  private[this] lazy val reconnectPeriodMillis = {
+    conf.connectionConf(RabbitMQConKeys.reconnect_period_millis)
   }
 
-  private[this] def serversToDebugStrings = {
-    servers.map(address ⇒ "%s:%s".format(address.getHost, address.getPort))
+  private[this] lazy val serversToDebugStrings = {
+    servers.map(address ⇒ "%s:%s".format(address.getHost, address.getPort)).toList
+  }
+
+  private[this] def infoList(what: String): List[String] = {
+    List(what) ++
+    List(serversToDebugStrings.mkString("(", ", ", ")")) ++
+    List("%s:%s:%s".format(
+      conf.exchangeName,
+      conf.routingKey,
+      conf.queueName
+    ))
   }
 
-  private[this] def doFullStartupSequence() = {
+  private[this] def infoString(what: String) = infoList(what).mkString("[", ", ", "]")
+
+  private[this] def doSafeStartupSequence(startReason: StartReason): Unit = {
     import this.conf._
 
-    _state.set(StartupSequence)
+    if(isAlive()) {
+      return
+    }
 
-    val factory = new ConnectionFactory
-    factory.setUsername(connectionConf(RabbitMQConKeys.username))
-    factory.setPassword(connectionConf(RabbitMQConKeys.password))
-    factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
+    try {
+      _state.set(StartupSequence)
 
-    val connection = factory.newConnection(servers)
+      val factory = new ConnectionFactory
+      factory.setConnectionTimeout(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
+      factory.setUsername(connectionConf(RabbitMQConKeys.username))
+      factory.setPassword(connectionConf(RabbitMQConKeys.password))
+      factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
 
-    val channel = connection.createChannel()
+      val connection = factory.newConnection(servers)
 
-    channel.basicQos(
-      channelConf(RabbitMQChannelKeys.qosPrefetchSize),
-      channelConf(RabbitMQChannelKeys.qosPrefetchCount),
-      channelConf(RabbitMQChannelKeys.qosGlobal)
-    )
+      val channel = connection.createChannel()
 
-    channel.exchangeDeclare(
-      exchangeName,
-      exchangeConf(RabbitMQExchangeKeys.`type`).name,
-      exchangeConf(RabbitMQExchangeKeys.durable),
-      exchangeConf(RabbitMQExchangeKeys.autoDelete),
-      exchangeConf(RabbitMQExchangeKeys.arguments).toJavaMap
-    )
+      channel.addShutdownListener(RabbitMQShutdownListener)
 
-    this._factory = factory
-    this._connection = connection
-    this._channel = channel
+      channel.basicQos(
+        channelConf(RabbitMQChannelKeys.qosPrefetchSize),
+        channelConf(RabbitMQChannelKeys.qosPrefetchCount),
+        channelConf(RabbitMQChannelKeys.qosGlobal)
+      )
 
-    val declareOK = channel.queueDeclare(
-      queueName,
-      queueConf(RabbitMQQueueKeys.durable),
-      queueConf(RabbitMQQueueKeys.exclusive),
-      queueConf(RabbitMQQueueKeys.autoDelete),
-      queueConf(RabbitMQQueueKeys.arguments).toJavaMap
-    )
+      channel.exchangeDeclare(
+        exchangeName,
+        exchangeConf(RabbitMQExchangeKeys.`type`).name,
+        exchangeConf(RabbitMQExchangeKeys.durable),
+        exchangeConf(RabbitMQExchangeKeys.autoDelete),
+        exchangeConf(RabbitMQExchangeKeys.arguments).toJavaMap
+      )
 
-    val bindOK = channel.queueBind(queueName, exchangeName, routingKey)
+      this._factory = factory
+      this._connection = connection
+      this._channel = channel
 
-    channel.addShutdownListener(RabbitMQShutdownListener)
+      val declareOK = channel.queueDeclare(
+        queueName,
+        queueConf(RabbitMQQueueKeys.durable),
+        queueConf(RabbitMQQueueKeys.exclusive),
+        queueConf(RabbitMQQueueKeys.autoDelete),
+        queueConf(RabbitMQQueueKeys.arguments).toJavaMap
+      )
 
-    _channel.basicConsume(
-      queueName,
-      false, // We send explicit acknowledgements to RabbitMQ
-      RabbitMQMessageConsumer
-    )
-  }
+      val bindOK = channel.queueBind(queueName, exchangeName, routingKey)
+
+      _channel.basicConsume(
+        queueName,
+        false, // We send explicit acknowledgements to RabbitMQ
+        RabbitMQMessageConsumer
+      )
 
-  def start(): Unit = {
-    logStartingF(toDebugString) {
-      // The actual starting steps
-      doFullStartupSequence()
       _state.set(Started)
-    } {
-      // If an exception was thrown during startup, run this before logging the error
-      doSafeFullShutdownSequence(true)
+
+      logger.info("Connected %s".format(infoString("Start")))
+    }
+    catch {
+      case e: Exception ⇒
+        val info = infoString(startReason.toString)
+        startReason match {
+          case LifecycleStartReason ⇒
+            logger.error("While connecting %s".format(info), e)
+
+          case ReconnectStartReason | PingStartReason ⇒
+            val now = TimeHelpers.nowMillis()
+            if(true/*_lastStartFailureMillis - now > 5*/) {
+              logger.warn("Could not reconnect %s".format(info))
+            }
+            _lastStartFailureMillis = now
+        }
+
+        // Shutdown on failure
+        doSafeShutdownSequence()
+    }
+    finally {
+      if(!_pingIsScheduled.get()) {
+        // Schedule periodic pings
+        logger.info("Scheduling %s".format(infoString("Ping")))
+        doSchedulePing()
+        _pingIsScheduled.getAndSet(true)
+      }
     }
   }
 
+  def start(): Unit = {
+    doSafeStartupSequence(LifecycleStartReason)
+  }
+
   def stop() = {
-    logStoppingF(toDebugString) {
-      doSafeFullShutdownSequence(false)
-    } {}
+    doSafeShutdownSequence()
   }
 
   private[this] def postBusError(event: BusEvent): Unit = {
     Configurator.MasterConfigurator.eventBus ! event
   }
 
-  private[this] def doRescheduleStartup(): Unit = {
-    val timerService = Configurator.MasterConfigurator.timerService
-    timerService.scheduleOnce(start(), 1000L * 1)
+  private[this] def doSchedulePing(): Unit = {
+    val info = infoString("Ping")
+
+    timerService.scheduleOnce(
+      info,
+      {
+        val isAlive = consumerSelf.isAlive()
+//        logger.info("Ping state is %s (isAlive=%s) for %s".format(_state.get(), isAlive, info))
+
+        if(!isAlive) {
+          doSafeShutdownSequence()
+          doSafeStartupSequence(PingStartReason)
+        }
+
+        // Reschedule the ping
+        doSchedulePing()
+      },
+      reconnectPeriodMillis
+    )
   }
 
   private[this] def doWithChannel[A](f: Channel ⇒ A): Unit = {
@@ -182,7 +263,7 @@ class RabbitMQConsumer(conf: RabbitMQConsumerConf,
     catch {
       case e: Exception ⇒
         postBusError(RabbitMQError(e))
-        doSafeFullShutdownSequence(true)
+        doSafeShutdownSequence()
     }
   }
 
@@ -227,23 +308,24 @@ class RabbitMQConsumer(conf: RabbitMQConsumerConf,
           case HandlerResultPanic ⇒
             // The other end is crucial to the overall operation and it is in panic mode,
             // so we stop delivering messages until further notice
-            doSafeFullShutdownSequence(true)
+            doSafeShutdownSequence()
         } (doError)
       } catch (doError)
     }
   }
 
   object RabbitMQShutdownListener extends ShutdownListener {
-    @inline def isConnectionError(cause: ShutdownSignalException) = cause.isHardError
-    @inline def isChannelError(cause: ShutdownSignalException) = !cause.isHardError
-
     def shutdownCompleted(cause: ShutdownSignalException) = {
-      safeUnit { _channel.close() }
+      logger.info("Got shutdown %s".format(cause))
 
       // Now, let's see what happened
-      if(isConnectionError(cause)) {
-      } else if(isChannelError(cause)) {
+      if(cause.isHardError) {
+        logger.info("Channel shutdown isHardError")
+      } else {
+        logger.info("Channel shutdown !isHardError")
       }
+
+      doSafeShutdownSequence()
     }
   }
 
index c2a2361..901d990 100644 (file)
@@ -43,7 +43,6 @@ import com.rabbitmq.client.Address
 import gr.grnet.aquarium.{Configurator, Configurable}
 import gr.grnet.aquarium.connector.rabbitmq.conf.{TopicExchange, RabbitMQConsumerConf, RabbitMQExchangeType}
 import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.RabbitMQConfKeys
-import com.ckkloverdos.key.{ArrayKey, IntKey, TypedKeySkeleton, BooleanKey, StringKey}
 import com.ckkloverdos.env.{EnvKey, Env}
 import gr.grnet.aquarium.converter.{JsonTextFormat, StdConverters}
 import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
@@ -53,6 +52,7 @@ import gr.grnet.aquarium.actor.RouterRole
 import gr.grnet.aquarium.actor.message.event.{ProcessIMEvent, ProcessResourceEvent}
 import gr.grnet.aquarium.store.{LocalFSEventStore, IMEventStore, ResourceEventStore}
 import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
+import com.ckkloverdos.key.{LongKey, ArrayKey, IntKey, TypedKeySkeleton, BooleanKey, StringKey}
 
 /**
  *
@@ -201,7 +201,7 @@ class RabbitMQService extends Loggable with Lifecycle with Configurable {
       this._consumers.foreach(_.start())
 
       for(consumer ← this._consumers) {
-        if(!consumer.isStarted()) {
+        if(!consumer.isAlive()) {
           logger.warn("Consumer not started yet {}", consumer.toDebugString)
         }
       }
@@ -258,7 +258,8 @@ object RabbitMQService {
       (RabbitMQConKeys.username, props(RabbitMQConfKeys.username)) +
       (RabbitMQConKeys.password, props(RabbitMQConfKeys.password)) +
       (RabbitMQConKeys.vhost,    props(RabbitMQConfKeys.vhost))    +
-      (RabbitMQConKeys.servers,  addresses)
+      (RabbitMQConKeys.servers,  addresses) +
+      (RabbitMQConKeys.reconnect_period_millis, props.getLongEx(RabbitMQConfKeys.reconnect_period_millis))
   }
 
   def makeRabbitMQConsumerConf(props: Props, oneERQ: String) = {
@@ -280,6 +281,8 @@ object RabbitMQService {
     final val password  = StringKey(p("password"))
     final val vhost     = StringKey(p("vhost"))
     final val servers   = ArrayKey[Address](p("servers"))
+
+    final val reconnect_period_millis = LongKey(p("reconnect.period.millis"))
   }
 
   /**
@@ -330,6 +333,11 @@ object RabbitMQService {
 
   object RabbitMQConfKeys {
     /**
+     * How often do we attemot a reconnection?
+     */
+    final val reconnect_period_millis = p("reconnect.period.millis")
+
+    /**
      * Comma separated list of AMQP servers running in active-active
      * configuration.
      */
index 5127e8b..71551f3 100644 (file)
 
 package gr.grnet.aquarium.service
 
-import gr.grnet.aquarium.util.{Loggable, Lifecycle}
-import gr.grnet.aquarium.util.date.TimeHelpers
 import java.util.{TimerTask, Timer}
 import gr.grnet.aquarium.uid.{UUIDGenerator, UIDGenerator}
+import gr.grnet.aquarium.util.chainOfCausesForLogging
 
 
 /**
@@ -50,14 +49,14 @@ class SimpleTimerService extends TimerService {
   private[this] val timer = new Timer()
   private[this] val uidGen: UIDGenerator[_] = UUIDGenerator
 
-  def scheduleOnce[T](f: ⇒ T, delayMillis: Long): String = {
+  def scheduleOnce[T](infoString: String, f: ⇒ T, delayMillis: Long, reportException: Boolean = true): String = {
     val uid = uidGen.nextUID()
     val timerTask = new TimerTask {
       def run() = {
         try f
         catch {
           case e: Exception ⇒
-            logger.warn("While running task %s".format(uid))
+            logger.warn("While running task %s(%s)\n%s".format(infoString, uid, chainOfCausesForLogging(e, 1)))
         }
       }
     }
index b8399ec..cabb8af 100644 (file)
@@ -44,5 +44,5 @@ import gr.grnet.aquarium.util.{Loggable, Lifecycle}
  */
 
 trait TimerService extends Lifecycle with Loggable {
-  def scheduleOnce[T](f: ⇒ T, delayMillis: Long): String
+  def scheduleOnce[T](infoString: String, f: ⇒ T, delayMillis: Long, reportException: Boolean = true): String
 }
index 03c17c4..23688d3 100644 (file)
@@ -11,6 +11,9 @@ aquarium.role-agreement.map=role-agreement.map
 
 ### Queue related settings
 
+# How often do we attemot a reconnection?
+rabbitmq.reconnect.period.millis=1000
+
 # Comma separated list of rabbitmq servers to use. The servers must be in an
 # active-active mode.
 rabbitmq.servers=localhost