abstract class EventProcessorService extends AkkaAMQP with Loggable
with Lifecycle {
- val redeliveries = new ConcurrentSkipListSet[String]()
- val inFlightEvents = new ConcurrentHashMap[Long, AquariumEvent](200, 0.9F, 4)
-
- protected def _configurator: Configurator = Configurator.MasterConfigurator
-
+ /* Messages exchanged between the persister and the queuereader */
case class AckData(msgId: String, deliveryTag: Long, queue: ActorRef)
case class Persist(event: AquariumEvent, sender: ActorRef, ackData: AckData)
case class PersistOK(ackData: AckData)
case class PersistFailed(ackData: AckData)
case class Duplicate(ackData: AckData)
+ /* Short term storage for delivery tags to work around AMQP
+ * limitation with redelivering rejected messages to same host.
+ */
+ private val redeliveries = new ConcurrentSkipListSet[String]()
+
+ /* Temporarily keeps track of messages while being processed */
+ private val inFlightEvents = new ConcurrentHashMap[Long, AquariumEvent](200, 0.9F, 4)
+
+ /* Supervisor actor for each event processing operation */
+ private lazy val supervisor = Supervisor(SupervisorConfig(
+ OneForOneStrategy(
+ List(classOf[Exception]), //What exceptions will be handled
+ 5, // maximum number of restart retries
+ 5000 // within time in millis
+ ), Nil
+ ))
+
+ protected def _configurator: Configurator = Configurator.MasterConfigurator
+
protected def decode(data: Array[Byte]): AquariumEvent
protected def forward(resourceEvent: AquariumEvent): Unit
protected def exists(event: AquariumEvent): Boolean
protected def queueReaderThreads: Int
protected def persisterThreads: Int
protected def name: String
+
+ protected def persisterManager: PersisterManager
+ protected def queueReaderManager: QueueReaderManager
def start(): Unit
def stop() : Unit
} else {
//Redeliver, but keep track of the message
redeliveries.add(event.id)
- PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
+ persisterManager.lb ! Persist(event, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
}
} else {
val eventWithReceivedMillis = event.setRcvMillis(System.currentTimeMillis())
- PersisterManager.lb ! Persist(eventWithReceivedMillis, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
+ persisterManager.lb ! Persist(eventWithReceivedMillis, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
}
case PersistOK(ackData) =>
case _ => logger.warn("Unknown message")
}
- self.dispatcher = QueueReaderManager.dispatcher
+ self.dispatcher = queueReaderManager.dispatcher
}
class Persister extends Actor {
case _ => logger.warn("Unknown message")
}
- self.dispatcher = PersisterManager.dispatcher
+ self.dispatcher = persisterManager.dispatcher
}
- lazy val supervisor = Supervisor(SupervisorConfig(
- OneForOneStrategy(
- List(classOf[Exception]), //What exceptions will be handled
- 3, // maximum number of restart retries
- 5000 // within time in millis
- ), Nil
-
- ))
-
- object QueueReaderManager {
+ class QueueReaderManager {
val numCPUs = Runtime.getRuntime.availableProcessors
- var actors: List[ActorRef] = _
lazy val lb = loadBalancerActor(new CyclicIterator(actors))
lazy val dispatcher =
- Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "queuereader")
+ Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "-queuereader")
.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(1000)
.setMaxPoolSize(queueReaderThreads)
.setCorePoolSize(1)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy).build
- def start() = {
- actors = {
- for (i <- 0 until numCPUs) yield {
- val actor = actorOf(new QueueReader)
- supervisor.link(actor)
- actor.start()
- actor
- }
- }.toList
- }
+ lazy val actors =
+ for (i <- 0 until numCPUs) yield {
+ val actor = actorOf(new QueueReader)
+ supervisor.link(actor)
+ actor.start()
+ actor
+ }
def stop() = dispatcher.stopAllAttachedActors
}
- object PersisterManager {
+ class PersisterManager {
val numCPUs = Runtime.getRuntime.availableProcessors
- var actors: List[ActorRef] = _
-
lazy val lb = loadBalancerActor(new CyclicIterator(actors))
val dispatcher =
- Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "persister")
+ Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "-persister")
.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(1000)
.setMaxPoolSize(persisterThreads)
.setCorePoolSize(1)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy).build
- def start() = {
- actors = {
- for (i <- 0 until numCPUs) yield {
- val actor = actorOf(new Persister)
- supervisor.link(actor)
- actor.start()
- actor
- }
- }.toList
- }
+ lazy val actors =
+ for (i <- 0 until numCPUs) yield {
+ val actor = actorOf(new Persister)
+ supervisor.link(actor)
+ actor.start()
+ actor
+ }
def stop() = dispatcher.stopAllAttachedActors
}
*
* @author Georgios Gousios <gousiosg@gmail.com>
*/
-class IMEventProcessorService extends EventProcessorService {
+final class IMEventProcessorService extends EventProcessorService {
- protected def decode(data: Array[Byte]) = null
+ override def decode(data: Array[Byte]) = null
- protected def forward(resourceEvent: AquariumEvent) {}
+ override def forward(resourceEvent: AquariumEvent) {}
- protected def exists(event: AquariumEvent) = false
+ override def exists(event: AquariumEvent) = false
- protected def persist(event: AquariumEvent) = false
+ override def persist(event: AquariumEvent) = false
override def queueReaderThreads: Int = 1
override def persisterThreads: Int = 2
override def name = "imevtproc"
+ override def persisterManager = new PersisterManager
+ override def queueReaderManager = new QueueReaderManager
+
def start() {
logger.info("Starting IM event processor service")
- QueueReaderManager.start()
- PersisterManager.start()
-
consumer("%s.#".format(MessagingNames.IM_EVENT_KEY),
MessagingNames.IM_EVENT_QUEUE, MessagingNames.IM_EXCHANGE,
- QueueReaderManager.lb, false)
+ queueReaderManager.lb, false)
}
def stop() {
- QueueReaderManager.stop()
- PersisterManager.stop()
+ queueReaderManager.stop()
+ persisterManager.stop()
logger.info("Stopping IM event processor service")
}