Revision 61bfaf2e src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQConsumer.scala

b/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQConsumer.scala
42 42
import com.rabbitmq.client.AMQP.BasicProperties
43 43
import gr.grnet.aquarium.Configurator
44 44
import gr.grnet.aquarium.connector.rabbitmq.eventbus.RabbitMQError
45
import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
46 45
import gr.grnet.aquarium.service.event.BusEvent
47 46
import gr.grnet.aquarium.connector.handler.{PayloadHandlerExecutor, HandlerResultPanic, HandlerResultRequeue, HandlerResultReject, HandlerResultSuccess, PayloadHandler}
48
import com.ckkloverdos.maybe.MaybeEither
49 47
import gr.grnet.aquarium.util.date.TimeHelpers
50 48
import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.{RabbitMQConKeys, RabbitMQQueueKeys, RabbitMQExchangeKeys, RabbitMQChannelKeys}
49
import java.util.concurrent.atomic.{AtomicLong, AtomicReference, AtomicBoolean}
50
import com.ckkloverdos.maybe.{Just, Failed, MaybeEither}
51 51

  
52 52
/**
53 53
 * A basic `RabbitMQ` consumer. Sufficiently generalized, sufficiently tied to Aquarium.
......
57 57

  
58 58
class RabbitMQConsumer(conf: RabbitMQConsumerConf,
59 59
                       handler: PayloadHandler,
60
                       executor: PayloadHandlerExecutor) extends Loggable with Lifecycle {
60
                       executor: PayloadHandlerExecutor) extends Loggable with Lifecycle { consumerSelf ⇒
61 61
  private[this] var _factory: ConnectionFactory = _
62 62
  private[this] var _connection: Connection = _
63 63
  private[this] var _channel: Channel = _
64 64
//  private[this] val _isAlive = new AtomicBoolean(false)
65 65
  private[this] val _state = new AtomicReference[State](Shutdown)
66
  private[this] var _lastStartFailureMillis = -1L
67
  private[this] val _pingIsScheduled = new AtomicBoolean(false)
66 68

  
67
  def isStarted() = {
68
    _state.get().isStarted && MaybeEither(_channel.isOpen).getOr(false)
69
  def isAlive() = {
70
    val isChannelOpen = MaybeEither((_channel ne null) && _channel.isOpen) match {
71
      case failed @ Failed(e) ⇒
72
        logger.error("isChannelOpen", e)
73
        false
74

  
75
      case Just(x) ⇒
76
        x
77
    }
78

  
79
    val isConnectionOpen = MaybeEither((_connection ne null) && _connection.isOpen) match {
80
      case failed @ Failed(e) ⇒
81
        logger.error("isConnectionOpen", e)
82
        false
83

  
84
      case Just(x) ⇒
85
        x
86
    }
87

  
88
    _state.get().isStarted && isChannelOpen && isConnectionOpen
69 89
  }
70 90

  
71 91
  sealed trait State {
......
73 93
  }
74 94
  case object StartupSequence extends State
75 95
  case class  BadStart(e: Exception) extends State
76
  case object Started  extends State { override def isStarted = true }
96
  case object Started  extends State {
97
    override def isStarted = true
98
  }
77 99
  case object ShutdownSequence extends State
78 100
  case object Shutdown extends State
79 101

  
80
  private[this] def doFullShutdownSequence(): Unit = {
102
  sealed trait StartReason
103
  case object LifecycleStartReason extends StartReason
104
  case object ReconnectStartReason extends StartReason
105
  case object PingStartReason extends StartReason
106

  
107
  private[this] def timerService = Configurator.MasterConfigurator.timerService
108

  
109
  private[this] def doSafeShutdownSequence(): Unit = {
81 110
    _state.set(ShutdownSequence)
82 111
    safeUnit(_channel.close())
83 112
    safeUnit(_connection.close())
84 113
    _state.set(Shutdown)
85 114
  }
86 115

  
87
  private[this] def doSafeFullShutdownSequence(rescheduleStartup: Boolean): Unit = {
88
    safeUnit(doFullShutdownSequence())
89
    _state.set(Shutdown)
90
    if(rescheduleStartup) {
91
      doRescheduleStartup()
92
    }
116
  private[this] lazy val servers = {
117
    conf.connectionConf(RabbitMQConKeys.servers)
93 118
  }
94 119

  
95
  private[this] def servers = {
96
    conf.connectionConf(RabbitMQConKeys.servers)
120
  private[this] lazy val reconnectPeriodMillis = {
121
    conf.connectionConf(RabbitMQConKeys.reconnect_period_millis)
97 122
  }
98 123

  
99
  private[this] def serversToDebugStrings = {
100
    servers.map(address ⇒ "%s:%s".format(address.getHost, address.getPort))
124
  private[this] lazy val serversToDebugStrings = {
125
    servers.map(address ⇒ "%s:%s".format(address.getHost, address.getPort)).toList
126
  }
127

  
128
  private[this] def infoList(what: String): List[String] = {
129
    List(what) ++
130
    List(serversToDebugStrings.mkString("(", ", ", ")")) ++
131
    List("%s:%s:%s".format(
132
      conf.exchangeName,
133
      conf.routingKey,
134
      conf.queueName
135
    ))
101 136
  }
102 137

  
103
  private[this] def doFullStartupSequence() = {
138
  private[this] def infoString(what: String) = infoList(what).mkString("[", ", ", "]")
139

  
140
  private[this] def doSafeStartupSequence(startReason: StartReason): Unit = {
104 141
    import this.conf._
105 142

  
106
    _state.set(StartupSequence)
143
    if(isAlive()) {
144
      return
145
    }
107 146

  
108
    val factory = new ConnectionFactory
109
    factory.setUsername(connectionConf(RabbitMQConKeys.username))
110
    factory.setPassword(connectionConf(RabbitMQConKeys.password))
111
    factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
147
    try {
148
      _state.set(StartupSequence)
112 149

  
113
    val connection = factory.newConnection(servers)
150
      val factory = new ConnectionFactory
151
      factory.setConnectionTimeout(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
152
      factory.setUsername(connectionConf(RabbitMQConKeys.username))
153
      factory.setPassword(connectionConf(RabbitMQConKeys.password))
154
      factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
114 155

  
115
    val channel = connection.createChannel()
156
      val connection = factory.newConnection(servers)
116 157

  
117
    channel.basicQos(
118
      channelConf(RabbitMQChannelKeys.qosPrefetchSize),
119
      channelConf(RabbitMQChannelKeys.qosPrefetchCount),
120
      channelConf(RabbitMQChannelKeys.qosGlobal)
121
    )
158
      val channel = connection.createChannel()
122 159

  
123
    channel.exchangeDeclare(
124
      exchangeName,
125
      exchangeConf(RabbitMQExchangeKeys.`type`).name,
126
      exchangeConf(RabbitMQExchangeKeys.durable),
127
      exchangeConf(RabbitMQExchangeKeys.autoDelete),
128
      exchangeConf(RabbitMQExchangeKeys.arguments).toJavaMap
129
    )
160
      channel.addShutdownListener(RabbitMQShutdownListener)
130 161

  
131
    this._factory = factory
132
    this._connection = connection
133
    this._channel = channel
162
      channel.basicQos(
163
        channelConf(RabbitMQChannelKeys.qosPrefetchSize),
164
        channelConf(RabbitMQChannelKeys.qosPrefetchCount),
165
        channelConf(RabbitMQChannelKeys.qosGlobal)
166
      )
134 167

  
135
    val declareOK = channel.queueDeclare(
136
      queueName,
137
      queueConf(RabbitMQQueueKeys.durable),
138
      queueConf(RabbitMQQueueKeys.exclusive),
139
      queueConf(RabbitMQQueueKeys.autoDelete),
140
      queueConf(RabbitMQQueueKeys.arguments).toJavaMap
141
    )
168
      channel.exchangeDeclare(
169
        exchangeName,
170
        exchangeConf(RabbitMQExchangeKeys.`type`).name,
171
        exchangeConf(RabbitMQExchangeKeys.durable),
172
        exchangeConf(RabbitMQExchangeKeys.autoDelete),
173
        exchangeConf(RabbitMQExchangeKeys.arguments).toJavaMap
174
      )
142 175

  
143
    val bindOK = channel.queueBind(queueName, exchangeName, routingKey)
176
      this._factory = factory
177
      this._connection = connection
178
      this._channel = channel
144 179

  
145
    channel.addShutdownListener(RabbitMQShutdownListener)
180
      val declareOK = channel.queueDeclare(
181
        queueName,
182
        queueConf(RabbitMQQueueKeys.durable),
183
        queueConf(RabbitMQQueueKeys.exclusive),
184
        queueConf(RabbitMQQueueKeys.autoDelete),
185
        queueConf(RabbitMQQueueKeys.arguments).toJavaMap
186
      )
146 187

  
147
    _channel.basicConsume(
148
      queueName,
149
      false, // We send explicit acknowledgements to RabbitMQ
150
      RabbitMQMessageConsumer
151
    )
152
  }
188
      val bindOK = channel.queueBind(queueName, exchangeName, routingKey)
189

  
190
      _channel.basicConsume(
191
        queueName,
192
        false, // We send explicit acknowledgements to RabbitMQ
193
        RabbitMQMessageConsumer
194
      )
153 195

  
154
  def start(): Unit = {
155
    logStartingF(toDebugString) {
156
      // The actual starting steps
157
      doFullStartupSequence()
158 196
      _state.set(Started)
159
    } {
160
      // If an exception was thrown during startup, run this before logging the error
161
      doSafeFullShutdownSequence(true)
197

  
198
      logger.info("Connected %s".format(infoString("Start")))
199
    }
200
    catch {
201
      case e: Exception ⇒
202
        val info = infoString(startReason.toString)
203
        startReason match {
204
          case LifecycleStartReason ⇒
205
            logger.error("While connecting %s".format(info), e)
206

  
207
          case ReconnectStartReason | PingStartReason ⇒
208
            val now = TimeHelpers.nowMillis()
209
            if(true/*_lastStartFailureMillis - now > 5*/) {
210
              logger.warn("Could not reconnect %s".format(info))
211
            }
212
            _lastStartFailureMillis = now
213
        }
214

  
215
        // Shutdown on failure
216
        doSafeShutdownSequence()
217
    }
218
    finally {
219
      if(!_pingIsScheduled.get()) {
220
        // Schedule periodic pings
221
        logger.info("Scheduling %s".format(infoString("Ping")))
222
        doSchedulePing()
223
        _pingIsScheduled.getAndSet(true)
224
      }
162 225
    }
163 226
  }
164 227

  
228
  def start(): Unit = {
229
    doSafeStartupSequence(LifecycleStartReason)
230
  }
231

  
165 232
  def stop() = {
166
    logStoppingF(toDebugString) {
167
      doSafeFullShutdownSequence(false)
168
    } {}
233
    doSafeShutdownSequence()
169 234
  }
170 235

  
171 236
  private[this] def postBusError(event: BusEvent): Unit = {
172 237
    Configurator.MasterConfigurator.eventBus ! event
173 238
  }
174 239

  
175
  private[this] def doRescheduleStartup(): Unit = {
176
    val timerService = Configurator.MasterConfigurator.timerService
177
    timerService.scheduleOnce(start(), 1000L * 1)
240
  private[this] def doSchedulePing(): Unit = {
241
    val info = infoString("Ping")
242

  
243
    timerService.scheduleOnce(
244
      info,
245
      {
246
        val isAlive = consumerSelf.isAlive()
247
//        logger.info("Ping state is %s (isAlive=%s) for %s".format(_state.get(), isAlive, info))
248

  
249
        if(!isAlive) {
250
          doSafeShutdownSequence()
251
          doSafeStartupSequence(PingStartReason)
252
        }
253

  
254
        // Reschedule the ping
255
        doSchedulePing()
256
      },
257
      reconnectPeriodMillis
258
    )
178 259
  }
179 260

  
180 261
  private[this] def doWithChannel[A](f: Channel ⇒ A): Unit = {
......
182 263
    catch {
183 264
      case e: Exception ⇒
184 265
        postBusError(RabbitMQError(e))
185
        doSafeFullShutdownSequence(true)
266
        doSafeShutdownSequence()
186 267
    }
187 268
  }
188 269

  
......
227 308
          case HandlerResultPanic ⇒
228 309
            // The other end is crucial to the overall operation and it is in panic mode,
229 310
            // so we stop delivering messages until further notice
230
            doSafeFullShutdownSequence(true)
311
            doSafeShutdownSequence()
231 312
        } (doError)
232 313
      } catch (doError)
233 314
    }
234 315
  }
235 316

  
236 317
  object RabbitMQShutdownListener extends ShutdownListener {
237
    @inline def isConnectionError(cause: ShutdownSignalException) = cause.isHardError
238
    @inline def isChannelError(cause: ShutdownSignalException) = !cause.isHardError
239

  
240 318
    def shutdownCompleted(cause: ShutdownSignalException) = {
241
      safeUnit { _channel.close() }
319
      logger.info("Got shutdown %s".format(cause))
242 320

  
243 321
      // Now, let's see what happened
244
      if(isConnectionError(cause)) {
245
      } else if(isChannelError(cause)) {
322
      if(cause.isHardError) {
323
        logger.info("Channel shutdown isHardError")
324
      } else {
325
        logger.info("Channel shutdown !isHardError")
246 326
      }
327

  
328
      doSafeShutdownSequence()
247 329
    }
248 330
  }
249 331

  

Also available in: Unified diff