-package gr.grnet.aquarium.processor.actor
-
/*
* Copyright 2011 GRNET S.A. All rights reserved.
*
* or implied, of GRNET S.A.
*/
+package gr.grnet.aquarium.processor.actor
+
import gr.grnet.aquarium.messaging.AkkaAMQP
-import akka.actor.{ActorRef, Actor}
import gr.grnet.aquarium.logic.events.ResourceEvent
import akka.amqp.{Reject, Acknowledge, Acknowledged, Delivery}
import com.ckkloverdos.maybe.{NoVal, Failed, Just}
import gr.grnet.aquarium.{MasterConf}
import gr.grnet.aquarium.util.{Lifecycle, Loggable}
+import akka.actor._
+import akka.actor.Actor._
+import akka.routing.CyclicIterator
+import akka.routing.Routing._
+import akka.dispatch.Dispatchers
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
+import akka.config.Supervision.SupervisorConfig
+import akka.config.Supervision.OneForOneStrategy
+
/**
* An actor that gets events from the queue, stores them persistently
* and forwards them for further processing.
* @author Georgios Gousios <gousiosg@gmail.com>
*/
class ResourceEventProcessorService extends AkkaAMQP with Loggable
- with Lifecycle {
-
- private var persister: ActorRef = _
- private var queueReader: ActorRef = _
+with Lifecycle {
case class AckData(deliveryTag: Long, queue: ActorRef)
case class Persist(event: ResourceEvent, sender: ActorRef, ackData: AckData)
case class PersistFailed(ackData: AckData)
case class Duplicate(ackData: AckData)
- class QueueReader(persister: ActorRef) extends Actor {
+ class QueueReader extends Actor {
def receive = {
case Delivery(payload, _, deliveryTag, isRedeliver, _, queue) =>
val event = ResourceEvent.fromBytes(payload)
- persister ! Persist(event, queueReader, AckData(deliveryTag, queue.get))
+ PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(deliveryTag, queue.get))
case PersistOK(ackData) =>
logger.debug("Stored res event:%s".format(ackData.deliveryTag))
ackData.queue ! Reject(ackData.deliveryTag, false)
case Acknowledged(deliveryTag) =>
- //Forward to the dispatcher
+ //Forward to the dispatcher
case _ => logger.warn("Unknown message")
}
+
+ self.dispatcher = QueueReaderManager.dispatcher
}
class Persister extends Actor {
def persist(event: ResourceEvent): Boolean = {
MasterConf.MasterConf.eventStore.storeEvent(event) match {
- case Just(x) => true
- case x: Failed =>
- logger.error("Could not save event: %s".format(event))
- false
- case NoVal => false
+ case Just(x) => true
+ case x: Failed =>
+ logger.error("Could not save event: %s".format(event))
+ false
+ case NoVal => false
+ }
+ }
+
+ self.dispatcher = PersisterManager.dispatcher
+ }
+
+ lazy val supervisor = Supervisor(SupervisorConfig(
+ OneForOneStrategy(
+ List(classOf[Exception]), //What exceptions will be handled
+ 3, // maximum number of restart retries
+ 5000 // within time in millis
+ ), Nil
+
+ ))
+
+ object QueueReaderManager {
+ val numCPUs = Runtime.getRuntime.availableProcessors
+ var actors: List[ActorRef] = _
+
+ // sets up load balancing among the actors created above to allow multithreading
+ lazy val lb = loadBalancerActor(new CyclicIterator(actors))
+
+ lazy val dispatcher =
+ Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("QueueReaderDispatcher")
+ .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
+ .setMaxPoolSize(numCPUs)
+ .setCorePoolSize(2)
+ .setKeepAliveTimeInMillis(60000)
+ .setRejectionPolicy(new CallerRunsPolicy).build
+
+ def start() = {
+ actors = {
+ for (i <- 0 until numCPUs) yield {
+ val actor = actorOf(new QueueReader)
+ supervisor.link(actor)
+ actor.start()
+ actor
}
+ }.toList
+ }
+
+ def stop() = {
+ actors.foreach(a => a.stop)
}
}
- def start() {
- logger.info("Starting resource event processor service")
+ object PersisterManager {
+ val numCPUs = Runtime.getRuntime.availableProcessors
+ var actors: List[ActorRef] = _
+
+ lazy val lb = loadBalancerActor(new CyclicIterator(actors))
+
+ val dispatcher =
+ Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("PersisterDispatcher")
+ .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
+ .setMaxPoolSize(numCPUs)
+ .setCorePoolSize(2)
+ .setKeepAliveTimeInMillis(60000)
+ .setRejectionPolicy(new CallerRunsPolicy).build
+
+ def start() = {
+ actors = {
+ for (i <- 0 until numCPUs) yield {
+ val actor = actorOf(new Persister)
+ supervisor.link(actor)
+ actor.start()
+ actor
+ }
+ }.toList
+ }
- persister = Actor.actorOf(new Persister)
- queueReader = Actor.actorOf(new QueueReader(persister))
+ def stop() = {
+ actors.foreach(a => a.stop)
+ }
+ }
- queueReader.startLink(persister)
+ def start() {
+ logger.info("Starting resource event processor service")
- queueReader.start()
- persister.start()
+ QueueReaderManager.start()
+ PersisterManager.start()
- consumer("event.#", "resource-events", "aquarium", queueReader, false)
+ consumer("event.#", "resource-events", "aquarium", QueueReaderManager.lb, false)
}
def stop() {
- logger.info("Stopping resource event processor service")
+ QueueReaderManager.stop()
+ PersisterManager.stop()
- persister.stop()
- queueReader.stop()
+ logger.info("Stopping resource event processor service")
}
}
\ No newline at end of file