Revision 61bfaf2e src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQConsumer.scala
b/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQConsumer.scala | ||
---|---|---|
42 | 42 |
import com.rabbitmq.client.AMQP.BasicProperties |
43 | 43 |
import gr.grnet.aquarium.Configurator |
44 | 44 |
import gr.grnet.aquarium.connector.rabbitmq.eventbus.RabbitMQError |
45 |
import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean} |
|
46 | 45 |
import gr.grnet.aquarium.service.event.BusEvent |
47 | 46 |
import gr.grnet.aquarium.connector.handler.{PayloadHandlerExecutor, HandlerResultPanic, HandlerResultRequeue, HandlerResultReject, HandlerResultSuccess, PayloadHandler} |
48 |
import com.ckkloverdos.maybe.MaybeEither |
|
49 | 47 |
import gr.grnet.aquarium.util.date.TimeHelpers |
50 | 48 |
import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.{RabbitMQConKeys, RabbitMQQueueKeys, RabbitMQExchangeKeys, RabbitMQChannelKeys} |
49 |
import java.util.concurrent.atomic.{AtomicLong, AtomicReference, AtomicBoolean} |
|
50 |
import com.ckkloverdos.maybe.{Just, Failed, MaybeEither} |
|
51 | 51 |
|
52 | 52 |
/** |
53 | 53 |
* A basic `RabbitMQ` consumer. Sufficiently generalized, sufficiently tied to Aquarium. |
... | ... | |
57 | 57 |
|
58 | 58 |
class RabbitMQConsumer(conf: RabbitMQConsumerConf, |
59 | 59 |
handler: PayloadHandler, |
60 |
executor: PayloadHandlerExecutor) extends Loggable with Lifecycle { |
|
60 |
executor: PayloadHandlerExecutor) extends Loggable with Lifecycle { consumerSelf ⇒
|
|
61 | 61 |
private[this] var _factory: ConnectionFactory = _ |
62 | 62 |
private[this] var _connection: Connection = _ |
63 | 63 |
private[this] var _channel: Channel = _ |
64 | 64 |
// private[this] val _isAlive = new AtomicBoolean(false) |
65 | 65 |
private[this] val _state = new AtomicReference[State](Shutdown) |
66 |
private[this] var _lastStartFailureMillis = -1L |
|
67 |
private[this] val _pingIsScheduled = new AtomicBoolean(false) |
|
66 | 68 |
|
67 |
def isStarted() = { |
|
68 |
_state.get().isStarted && MaybeEither(_channel.isOpen).getOr(false) |
|
69 |
def isAlive() = { |
|
70 |
val isChannelOpen = MaybeEither((_channel ne null) && _channel.isOpen) match { |
|
71 |
case failed @ Failed(e) ⇒ |
|
72 |
logger.error("isChannelOpen", e) |
|
73 |
false |
|
74 |
|
|
75 |
case Just(x) ⇒ |
|
76 |
x |
|
77 |
} |
|
78 |
|
|
79 |
val isConnectionOpen = MaybeEither((_connection ne null) && _connection.isOpen) match { |
|
80 |
case failed @ Failed(e) ⇒ |
|
81 |
logger.error("isConnectionOpen", e) |
|
82 |
false |
|
83 |
|
|
84 |
case Just(x) ⇒ |
|
85 |
x |
|
86 |
} |
|
87 |
|
|
88 |
_state.get().isStarted && isChannelOpen && isConnectionOpen |
|
69 | 89 |
} |
70 | 90 |
|
71 | 91 |
sealed trait State { |
... | ... | |
73 | 93 |
} |
74 | 94 |
case object StartupSequence extends State |
75 | 95 |
case class BadStart(e: Exception) extends State |
76 |
case object Started extends State { override def isStarted = true } |
|
96 |
case object Started extends State { |
|
97 |
override def isStarted = true |
|
98 |
} |
|
77 | 99 |
case object ShutdownSequence extends State |
78 | 100 |
case object Shutdown extends State |
79 | 101 |
|
80 |
private[this] def doFullShutdownSequence(): Unit = { |
|
102 |
sealed trait StartReason |
|
103 |
case object LifecycleStartReason extends StartReason |
|
104 |
case object ReconnectStartReason extends StartReason |
|
105 |
case object PingStartReason extends StartReason |
|
106 |
|
|
107 |
private[this] def timerService = Configurator.MasterConfigurator.timerService |
|
108 |
|
|
109 |
private[this] def doSafeShutdownSequence(): Unit = { |
|
81 | 110 |
_state.set(ShutdownSequence) |
82 | 111 |
safeUnit(_channel.close()) |
83 | 112 |
safeUnit(_connection.close()) |
84 | 113 |
_state.set(Shutdown) |
85 | 114 |
} |
86 | 115 |
|
87 |
private[this] def doSafeFullShutdownSequence(rescheduleStartup: Boolean): Unit = { |
|
88 |
safeUnit(doFullShutdownSequence()) |
|
89 |
_state.set(Shutdown) |
|
90 |
if(rescheduleStartup) { |
|
91 |
doRescheduleStartup() |
|
92 |
} |
|
116 |
private[this] lazy val servers = { |
|
117 |
conf.connectionConf(RabbitMQConKeys.servers) |
|
93 | 118 |
} |
94 | 119 |
|
95 |
private[this] def servers = {
|
|
96 |
conf.connectionConf(RabbitMQConKeys.servers)
|
|
120 |
private[this] lazy val reconnectPeriodMillis = {
|
|
121 |
conf.connectionConf(RabbitMQConKeys.reconnect_period_millis)
|
|
97 | 122 |
} |
98 | 123 |
|
99 |
private[this] def serversToDebugStrings = { |
|
100 |
servers.map(address ⇒ "%s:%s".format(address.getHost, address.getPort)) |
|
124 |
private[this] lazy val serversToDebugStrings = { |
|
125 |
servers.map(address ⇒ "%s:%s".format(address.getHost, address.getPort)).toList |
|
126 |
} |
|
127 |
|
|
128 |
private[this] def infoList(what: String): List[String] = { |
|
129 |
List(what) ++ |
|
130 |
List(serversToDebugStrings.mkString("(", ", ", ")")) ++ |
|
131 |
List("%s:%s:%s".format( |
|
132 |
conf.exchangeName, |
|
133 |
conf.routingKey, |
|
134 |
conf.queueName |
|
135 |
)) |
|
101 | 136 |
} |
102 | 137 |
|
103 |
private[this] def doFullStartupSequence() = { |
|
138 |
private[this] def infoString(what: String) = infoList(what).mkString("[", ", ", "]") |
|
139 |
|
|
140 |
private[this] def doSafeStartupSequence(startReason: StartReason): Unit = { |
|
104 | 141 |
import this.conf._ |
105 | 142 |
|
106 |
_state.set(StartupSequence) |
|
143 |
if(isAlive()) { |
|
144 |
return |
|
145 |
} |
|
107 | 146 |
|
108 |
val factory = new ConnectionFactory |
|
109 |
factory.setUsername(connectionConf(RabbitMQConKeys.username)) |
|
110 |
factory.setPassword(connectionConf(RabbitMQConKeys.password)) |
|
111 |
factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost)) |
|
147 |
try { |
|
148 |
_state.set(StartupSequence) |
|
112 | 149 |
|
113 |
val connection = factory.newConnection(servers) |
|
150 |
val factory = new ConnectionFactory |
|
151 |
factory.setConnectionTimeout(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt) |
|
152 |
factory.setUsername(connectionConf(RabbitMQConKeys.username)) |
|
153 |
factory.setPassword(connectionConf(RabbitMQConKeys.password)) |
|
154 |
factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost)) |
|
114 | 155 |
|
115 |
val channel = connection.createChannel()
|
|
156 |
val connection = factory.newConnection(servers)
|
|
116 | 157 |
|
117 |
channel.basicQos( |
|
118 |
channelConf(RabbitMQChannelKeys.qosPrefetchSize), |
|
119 |
channelConf(RabbitMQChannelKeys.qosPrefetchCount), |
|
120 |
channelConf(RabbitMQChannelKeys.qosGlobal) |
|
121 |
) |
|
158 |
val channel = connection.createChannel() |
|
122 | 159 |
|
123 |
channel.exchangeDeclare( |
|
124 |
exchangeName, |
|
125 |
exchangeConf(RabbitMQExchangeKeys.`type`).name, |
|
126 |
exchangeConf(RabbitMQExchangeKeys.durable), |
|
127 |
exchangeConf(RabbitMQExchangeKeys.autoDelete), |
|
128 |
exchangeConf(RabbitMQExchangeKeys.arguments).toJavaMap |
|
129 |
) |
|
160 |
channel.addShutdownListener(RabbitMQShutdownListener) |
|
130 | 161 |
|
131 |
this._factory = factory |
|
132 |
this._connection = connection |
|
133 |
this._channel = channel |
|
162 |
channel.basicQos( |
|
163 |
channelConf(RabbitMQChannelKeys.qosPrefetchSize), |
|
164 |
channelConf(RabbitMQChannelKeys.qosPrefetchCount), |
|
165 |
channelConf(RabbitMQChannelKeys.qosGlobal) |
|
166 |
) |
|
134 | 167 |
|
135 |
val declareOK = channel.queueDeclare(
|
|
136 |
queueName,
|
|
137 |
queueConf(RabbitMQQueueKeys.durable),
|
|
138 |
queueConf(RabbitMQQueueKeys.exclusive),
|
|
139 |
queueConf(RabbitMQQueueKeys.autoDelete),
|
|
140 |
queueConf(RabbitMQQueueKeys.arguments).toJavaMap
|
|
141 |
) |
|
168 |
channel.exchangeDeclare(
|
|
169 |
exchangeName,
|
|
170 |
exchangeConf(RabbitMQExchangeKeys.`type`).name,
|
|
171 |
exchangeConf(RabbitMQExchangeKeys.durable),
|
|
172 |
exchangeConf(RabbitMQExchangeKeys.autoDelete),
|
|
173 |
exchangeConf(RabbitMQExchangeKeys.arguments).toJavaMap
|
|
174 |
)
|
|
142 | 175 |
|
143 |
val bindOK = channel.queueBind(queueName, exchangeName, routingKey) |
|
176 |
this._factory = factory |
|
177 |
this._connection = connection |
|
178 |
this._channel = channel |
|
144 | 179 |
|
145 |
channel.addShutdownListener(RabbitMQShutdownListener) |
|
180 |
val declareOK = channel.queueDeclare( |
|
181 |
queueName, |
|
182 |
queueConf(RabbitMQQueueKeys.durable), |
|
183 |
queueConf(RabbitMQQueueKeys.exclusive), |
|
184 |
queueConf(RabbitMQQueueKeys.autoDelete), |
|
185 |
queueConf(RabbitMQQueueKeys.arguments).toJavaMap |
|
186 |
) |
|
146 | 187 |
|
147 |
_channel.basicConsume( |
|
148 |
queueName, |
|
149 |
false, // We send explicit acknowledgements to RabbitMQ |
|
150 |
RabbitMQMessageConsumer |
|
151 |
) |
|
152 |
} |
|
188 |
val bindOK = channel.queueBind(queueName, exchangeName, routingKey) |
|
189 |
|
|
190 |
_channel.basicConsume( |
|
191 |
queueName, |
|
192 |
false, // We send explicit acknowledgements to RabbitMQ |
|
193 |
RabbitMQMessageConsumer |
|
194 |
) |
|
153 | 195 |
|
154 |
def start(): Unit = { |
|
155 |
logStartingF(toDebugString) { |
|
156 |
// The actual starting steps |
|
157 |
doFullStartupSequence() |
|
158 | 196 |
_state.set(Started) |
159 |
} { |
|
160 |
// If an exception was thrown during startup, run this before logging the error |
|
161 |
doSafeFullShutdownSequence(true) |
|
197 |
|
|
198 |
logger.info("Connected %s".format(infoString("Start"))) |
|
199 |
} |
|
200 |
catch { |
|
201 |
case e: Exception ⇒ |
|
202 |
val info = infoString(startReason.toString) |
|
203 |
startReason match { |
|
204 |
case LifecycleStartReason ⇒ |
|
205 |
logger.error("While connecting %s".format(info), e) |
|
206 |
|
|
207 |
case ReconnectStartReason | PingStartReason ⇒ |
|
208 |
val now = TimeHelpers.nowMillis() |
|
209 |
if(true/*_lastStartFailureMillis - now > 5*/) { |
|
210 |
logger.warn("Could not reconnect %s".format(info)) |
|
211 |
} |
|
212 |
_lastStartFailureMillis = now |
|
213 |
} |
|
214 |
|
|
215 |
// Shutdown on failure |
|
216 |
doSafeShutdownSequence() |
|
217 |
} |
|
218 |
finally { |
|
219 |
if(!_pingIsScheduled.get()) { |
|
220 |
// Schedule periodic pings |
|
221 |
logger.info("Scheduling %s".format(infoString("Ping"))) |
|
222 |
doSchedulePing() |
|
223 |
_pingIsScheduled.getAndSet(true) |
|
224 |
} |
|
162 | 225 |
} |
163 | 226 |
} |
164 | 227 |
|
228 |
def start(): Unit = { |
|
229 |
doSafeStartupSequence(LifecycleStartReason) |
|
230 |
} |
|
231 |
|
|
165 | 232 |
def stop() = { |
166 |
logStoppingF(toDebugString) { |
|
167 |
doSafeFullShutdownSequence(false) |
|
168 |
} {} |
|
233 |
doSafeShutdownSequence() |
|
169 | 234 |
} |
170 | 235 |
|
171 | 236 |
private[this] def postBusError(event: BusEvent): Unit = { |
172 | 237 |
Configurator.MasterConfigurator.eventBus ! event |
173 | 238 |
} |
174 | 239 |
|
175 |
private[this] def doRescheduleStartup(): Unit = { |
|
176 |
val timerService = Configurator.MasterConfigurator.timerService |
|
177 |
timerService.scheduleOnce(start(), 1000L * 1) |
|
240 |
private[this] def doSchedulePing(): Unit = { |
|
241 |
val info = infoString("Ping") |
|
242 |
|
|
243 |
timerService.scheduleOnce( |
|
244 |
info, |
|
245 |
{ |
|
246 |
val isAlive = consumerSelf.isAlive() |
|
247 |
// logger.info("Ping state is %s (isAlive=%s) for %s".format(_state.get(), isAlive, info)) |
|
248 |
|
|
249 |
if(!isAlive) { |
|
250 |
doSafeShutdownSequence() |
|
251 |
doSafeStartupSequence(PingStartReason) |
|
252 |
} |
|
253 |
|
|
254 |
// Reschedule the ping |
|
255 |
doSchedulePing() |
|
256 |
}, |
|
257 |
reconnectPeriodMillis |
|
258 |
) |
|
178 | 259 |
} |
179 | 260 |
|
180 | 261 |
private[this] def doWithChannel[A](f: Channel ⇒ A): Unit = { |
... | ... | |
182 | 263 |
catch { |
183 | 264 |
case e: Exception ⇒ |
184 | 265 |
postBusError(RabbitMQError(e)) |
185 |
doSafeFullShutdownSequence(true)
|
|
266 |
doSafeShutdownSequence()
|
|
186 | 267 |
} |
187 | 268 |
} |
188 | 269 |
|
... | ... | |
227 | 308 |
case HandlerResultPanic ⇒ |
228 | 309 |
// The other end is crucial to the overall operation and it is in panic mode, |
229 | 310 |
// so we stop delivering messages until further notice |
230 |
doSafeFullShutdownSequence(true)
|
|
311 |
doSafeShutdownSequence()
|
|
231 | 312 |
} (doError) |
232 | 313 |
} catch (doError) |
233 | 314 |
} |
234 | 315 |
} |
235 | 316 |
|
236 | 317 |
object RabbitMQShutdownListener extends ShutdownListener { |
237 |
@inline def isConnectionError(cause: ShutdownSignalException) = cause.isHardError |
|
238 |
@inline def isChannelError(cause: ShutdownSignalException) = !cause.isHardError |
|
239 |
|
|
240 | 318 |
def shutdownCompleted(cause: ShutdownSignalException) = { |
241 |
safeUnit { _channel.close() }
|
|
319 |
logger.info("Got shutdown %s".format(cause))
|
|
242 | 320 |
|
243 | 321 |
// Now, let's see what happened |
244 |
if(isConnectionError(cause)) { |
|
245 |
} else if(isChannelError(cause)) { |
|
322 |
if(cause.isHardError) { |
|
323 |
logger.info("Channel shutdown isHardError") |
|
324 |
} else { |
|
325 |
logger.info("Channel shutdown !isHardError") |
|
246 | 326 |
} |
327 |
|
|
328 |
doSafeShutdownSequence() |
|
247 | 329 |
} |
248 | 330 |
} |
249 | 331 |
|
Also available in: Unified diff