import com.rabbitmq.client.AMQP.BasicProperties
import gr.grnet.aquarium.Configurator
import gr.grnet.aquarium.connector.rabbitmq.eventbus.RabbitMQError
-import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
import gr.grnet.aquarium.service.event.BusEvent
import gr.grnet.aquarium.connector.handler.{PayloadHandlerExecutor, HandlerResultPanic, HandlerResultRequeue, HandlerResultReject, HandlerResultSuccess, PayloadHandler}
-import com.ckkloverdos.maybe.MaybeEither
import gr.grnet.aquarium.util.date.TimeHelpers
import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.{RabbitMQConKeys, RabbitMQQueueKeys, RabbitMQExchangeKeys, RabbitMQChannelKeys}
+import java.util.concurrent.atomic.{AtomicLong, AtomicReference, AtomicBoolean}
+import com.ckkloverdos.maybe.{Just, Failed, MaybeEither}
/**
* A basic `RabbitMQ` consumer. Sufficiently generalized, sufficiently tied to Aquarium.
class RabbitMQConsumer(conf: RabbitMQConsumerConf,
handler: PayloadHandler,
- executor: PayloadHandlerExecutor) extends Loggable with Lifecycle {
+ executor: PayloadHandlerExecutor) extends Loggable with Lifecycle { consumerSelf ⇒
private[this] var _factory: ConnectionFactory = _
private[this] var _connection: Connection = _
private[this] var _channel: Channel = _
// private[this] val _isAlive = new AtomicBoolean(false)
private[this] val _state = new AtomicReference[State](Shutdown)
+ private[this] var _lastStartFailureMillis = -1L
+ private[this] val _pingIsScheduled = new AtomicBoolean(false)
- def isStarted() = {
- _state.get().isStarted && MaybeEither(_channel.isOpen).getOr(false)
+ def isAlive() = {
+ val isChannelOpen = MaybeEither((_channel ne null) && _channel.isOpen) match {
+ case failed @ Failed(e) ⇒
+ logger.error("isChannelOpen", e)
+ false
+
+ case Just(x) ⇒
+ x
+ }
+
+ val isConnectionOpen = MaybeEither((_connection ne null) && _connection.isOpen) match {
+ case failed @ Failed(e) ⇒
+ logger.error("isConnectionOpen", e)
+ false
+
+ case Just(x) ⇒
+ x
+ }
+
+ _state.get().isStarted && isChannelOpen && isConnectionOpen
}
sealed trait State {
}
case object StartupSequence extends State
case class BadStart(e: Exception) extends State
- case object Started extends State { override def isStarted = true }
+ case object Started extends State {
+ override def isStarted = true
+ }
case object ShutdownSequence extends State
case object Shutdown extends State
- private[this] def doFullShutdownSequence(): Unit = {
+ sealed trait StartReason
+ case object LifecycleStartReason extends StartReason
+ case object ReconnectStartReason extends StartReason
+ case object PingStartReason extends StartReason
+
+ private[this] def timerService = Configurator.MasterConfigurator.timerService
+
+ private[this] def doSafeShutdownSequence(): Unit = {
_state.set(ShutdownSequence)
safeUnit(_channel.close())
safeUnit(_connection.close())
_state.set(Shutdown)
}
- private[this] def doSafeFullShutdownSequence(rescheduleStartup: Boolean): Unit = {
- safeUnit(doFullShutdownSequence())
- _state.set(Shutdown)
- if(rescheduleStartup) {
- doRescheduleStartup()
- }
+ private[this] lazy val servers = {
+ conf.connectionConf(RabbitMQConKeys.servers)
}
- private[this] def servers = {
- conf.connectionConf(RabbitMQConKeys.servers)
+ private[this] lazy val reconnectPeriodMillis = {
+ conf.connectionConf(RabbitMQConKeys.reconnect_period_millis)
}
- private[this] def serversToDebugStrings = {
- servers.map(address ⇒ "%s:%s".format(address.getHost, address.getPort))
+ private[this] lazy val serversToDebugStrings = {
+ servers.map(address ⇒ "%s:%s".format(address.getHost, address.getPort)).toList
+ }
+
+ private[this] def infoList(what: String): List[String] = {
+ List(what) ++
+ List(serversToDebugStrings.mkString("(", ", ", ")")) ++
+ List("%s:%s:%s".format(
+ conf.exchangeName,
+ conf.routingKey,
+ conf.queueName
+ ))
}
- private[this] def doFullStartupSequence() = {
+ private[this] def infoString(what: String) = infoList(what).mkString("[", ", ", "]")
+
+ private[this] def doSafeStartupSequence(startReason: StartReason): Unit = {
import this.conf._
- _state.set(StartupSequence)
+ if(isAlive()) {
+ return
+ }
- val factory = new ConnectionFactory
- factory.setUsername(connectionConf(RabbitMQConKeys.username))
- factory.setPassword(connectionConf(RabbitMQConKeys.password))
- factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
+ try {
+ _state.set(StartupSequence)
- val connection = factory.newConnection(servers)
+ val factory = new ConnectionFactory
+ factory.setConnectionTimeout(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
+ factory.setUsername(connectionConf(RabbitMQConKeys.username))
+ factory.setPassword(connectionConf(RabbitMQConKeys.password))
+ factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
- val channel = connection.createChannel()
+ val connection = factory.newConnection(servers)
- channel.basicQos(
- channelConf(RabbitMQChannelKeys.qosPrefetchSize),
- channelConf(RabbitMQChannelKeys.qosPrefetchCount),
- channelConf(RabbitMQChannelKeys.qosGlobal)
- )
+ val channel = connection.createChannel()
- channel.exchangeDeclare(
- exchangeName,
- exchangeConf(RabbitMQExchangeKeys.`type`).name,
- exchangeConf(RabbitMQExchangeKeys.durable),
- exchangeConf(RabbitMQExchangeKeys.autoDelete),
- exchangeConf(RabbitMQExchangeKeys.arguments).toJavaMap
- )
+ channel.addShutdownListener(RabbitMQShutdownListener)
- this._factory = factory
- this._connection = connection
- this._channel = channel
+ channel.basicQos(
+ channelConf(RabbitMQChannelKeys.qosPrefetchSize),
+ channelConf(RabbitMQChannelKeys.qosPrefetchCount),
+ channelConf(RabbitMQChannelKeys.qosGlobal)
+ )
- val declareOK = channel.queueDeclare(
- queueName,
- queueConf(RabbitMQQueueKeys.durable),
- queueConf(RabbitMQQueueKeys.exclusive),
- queueConf(RabbitMQQueueKeys.autoDelete),
- queueConf(RabbitMQQueueKeys.arguments).toJavaMap
- )
+ channel.exchangeDeclare(
+ exchangeName,
+ exchangeConf(RabbitMQExchangeKeys.`type`).name,
+ exchangeConf(RabbitMQExchangeKeys.durable),
+ exchangeConf(RabbitMQExchangeKeys.autoDelete),
+ exchangeConf(RabbitMQExchangeKeys.arguments).toJavaMap
+ )
- val bindOK = channel.queueBind(queueName, exchangeName, routingKey)
+ this._factory = factory
+ this._connection = connection
+ this._channel = channel
- channel.addShutdownListener(RabbitMQShutdownListener)
+ val declareOK = channel.queueDeclare(
+ queueName,
+ queueConf(RabbitMQQueueKeys.durable),
+ queueConf(RabbitMQQueueKeys.exclusive),
+ queueConf(RabbitMQQueueKeys.autoDelete),
+ queueConf(RabbitMQQueueKeys.arguments).toJavaMap
+ )
- _channel.basicConsume(
- queueName,
- false, // We send explicit acknowledgements to RabbitMQ
- RabbitMQMessageConsumer
- )
- }
+ val bindOK = channel.queueBind(queueName, exchangeName, routingKey)
+
+ _channel.basicConsume(
+ queueName,
+ false, // We send explicit acknowledgements to RabbitMQ
+ RabbitMQMessageConsumer
+ )
- def start(): Unit = {
- logStartingF(toDebugString) {
- // The actual starting steps
- doFullStartupSequence()
_state.set(Started)
- } {
- // If an exception was thrown during startup, run this before logging the error
- doSafeFullShutdownSequence(true)
+
+ logger.info("Connected %s".format(infoString("Start")))
+ }
+ catch {
+ case e: Exception ⇒
+ val info = infoString(startReason.toString)
+ startReason match {
+ case LifecycleStartReason ⇒
+ logger.error("While connecting %s".format(info), e)
+
+ case ReconnectStartReason | PingStartReason ⇒
+ val now = TimeHelpers.nowMillis()
+ if(true/*_lastStartFailureMillis - now > 5*/) {
+ logger.warn("Could not reconnect %s".format(info))
+ }
+ _lastStartFailureMillis = now
+ }
+
+ // Shutdown on failure
+ doSafeShutdownSequence()
+ }
+ finally {
+ if(!_pingIsScheduled.get()) {
+ // Schedule periodic pings
+ logger.info("Scheduling %s".format(infoString("Ping")))
+ doSchedulePing()
+ _pingIsScheduled.getAndSet(true)
+ }
}
}
+ def start(): Unit = {
+ doSafeStartupSequence(LifecycleStartReason)
+ }
+
def stop() = {
- logStoppingF(toDebugString) {
- doSafeFullShutdownSequence(false)
- } {}
+ doSafeShutdownSequence()
}
private[this] def postBusError(event: BusEvent): Unit = {
Configurator.MasterConfigurator.eventBus ! event
}
- private[this] def doRescheduleStartup(): Unit = {
- val timerService = Configurator.MasterConfigurator.timerService
- timerService.scheduleOnce(start(), 1000L * 1)
+ private[this] def doSchedulePing(): Unit = {
+ val info = infoString("Ping")
+
+ timerService.scheduleOnce(
+ info,
+ {
+ val isAlive = consumerSelf.isAlive()
+// logger.info("Ping state is %s (isAlive=%s) for %s".format(_state.get(), isAlive, info))
+
+ if(!isAlive) {
+ doSafeShutdownSequence()
+ doSafeStartupSequence(PingStartReason)
+ }
+
+ // Reschedule the ping
+ doSchedulePing()
+ },
+ reconnectPeriodMillis
+ )
}
private[this] def doWithChannel[A](f: Channel ⇒ A): Unit = {
catch {
case e: Exception ⇒
postBusError(RabbitMQError(e))
- doSafeFullShutdownSequence(true)
+ doSafeShutdownSequence()
}
}
case HandlerResultPanic ⇒
// The other end is crucial to the overall operation and it is in panic mode,
// so we stop delivering messages until further notice
- doSafeFullShutdownSequence(true)
+ doSafeShutdownSequence()
} (doError)
} catch (doError)
}
}
object RabbitMQShutdownListener extends ShutdownListener {
- @inline def isConnectionError(cause: ShutdownSignalException) = cause.isHardError
- @inline def isChannelError(cause: ShutdownSignalException) = !cause.isHardError
-
def shutdownCompleted(cause: ShutdownSignalException) = {
- safeUnit { _channel.close() }
+ logger.info("Got shutdown %s".format(cause))
// Now, let's see what happened
- if(isConnectionError(cause)) {
- } else if(isChannelError(cause)) {
+ if(cause.isHardError) {
+ logger.info("Channel shutdown isHardError")
+ } else {
+ logger.info("Channel shutdown !isHardError")
}
+
+ doSafeShutdownSequence()
}
}
import gr.grnet.aquarium.{Configurator, Configurable}
import gr.grnet.aquarium.connector.rabbitmq.conf.{TopicExchange, RabbitMQConsumerConf, RabbitMQExchangeType}
import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.RabbitMQConfKeys
-import com.ckkloverdos.key.{ArrayKey, IntKey, TypedKeySkeleton, BooleanKey, StringKey}
import com.ckkloverdos.env.{EnvKey, Env}
import gr.grnet.aquarium.converter.{JsonTextFormat, StdConverters}
import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
import gr.grnet.aquarium.actor.message.event.{ProcessIMEvent, ProcessResourceEvent}
import gr.grnet.aquarium.store.{LocalFSEventStore, IMEventStore, ResourceEventStore}
import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
+import com.ckkloverdos.key.{LongKey, ArrayKey, IntKey, TypedKeySkeleton, BooleanKey, StringKey}
/**
*
this._consumers.foreach(_.start())
for(consumer ← this._consumers) {
- if(!consumer.isStarted()) {
+ if(!consumer.isAlive()) {
logger.warn("Consumer not started yet {}", consumer.toDebugString)
}
}
(RabbitMQConKeys.username, props(RabbitMQConfKeys.username)) +
(RabbitMQConKeys.password, props(RabbitMQConfKeys.password)) +
(RabbitMQConKeys.vhost, props(RabbitMQConfKeys.vhost)) +
- (RabbitMQConKeys.servers, addresses)
+ (RabbitMQConKeys.servers, addresses) +
+ (RabbitMQConKeys.reconnect_period_millis, props.getLongEx(RabbitMQConfKeys.reconnect_period_millis))
}
def makeRabbitMQConsumerConf(props: Props, oneERQ: String) = {
final val password = StringKey(p("password"))
final val vhost = StringKey(p("vhost"))
final val servers = ArrayKey[Address](p("servers"))
+
+ final val reconnect_period_millis = LongKey(p("reconnect.period.millis"))
}
/**
object RabbitMQConfKeys {
/**
+ * How often do we attemot a reconnection?
+ */
+ final val reconnect_period_millis = p("reconnect.period.millis")
+
+ /**
* Comma separated list of AMQP servers running in active-active
* configuration.
*/