Pools and loadbalancing for resource event actors
authorGeorgios Gousios <gousiosg@gmail.com>
Fri, 16 Dec 2011 10:36:46 +0000 (12:36 +0200)
committerGeorgios Gousios <gousiosg@gmail.com>
Fri, 16 Dec 2011 10:37:31 +0000 (12:37 +0200)
logic/src/main/scala/gr/grnet/aquarium/Main.scala
logic/src/main/scala/gr/grnet/aquarium/processor/actor/ResourceEventProcessorService.scala

index 94bf175..dcdd1d7 100644 (file)
@@ -50,17 +50,13 @@ object Main extends Loggable {
 
     MasterConf.MasterConf.startServices()
 
-    addShutdownHook
-
-    logger.info("Started Aquarium")
-  }
-  
-  def addShutdownHook(): Unit = {
     Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
       def run = {
         logger.info("Shutting down Aquarium")
         MasterConf.MasterConf.stopServices()
       }
     }))
+
+    logger.info("Started Aquarium")
   }
 }
\ No newline at end of file
index ebd70ac..0ab5bc5 100644 (file)
@@ -1,5 +1,3 @@
-package gr.grnet.aquarium.processor.actor
-
 /*
  * Copyright 2011 GRNET S.A. All rights reserved.
  *
@@ -35,14 +33,24 @@ package gr.grnet.aquarium.processor.actor
  * or implied, of GRNET S.A.
  */
 
+package gr.grnet.aquarium.processor.actor
+
 import gr.grnet.aquarium.messaging.AkkaAMQP
-import akka.actor.{ActorRef, Actor}
 import gr.grnet.aquarium.logic.events.ResourceEvent
 import akka.amqp.{Reject, Acknowledge, Acknowledged, Delivery}
 import com.ckkloverdos.maybe.{NoVal, Failed, Just}
 import gr.grnet.aquarium.{MasterConf}
 import gr.grnet.aquarium.util.{Lifecycle, Loggable}
 
+import akka.actor._
+import akka.actor.Actor._
+import akka.routing.CyclicIterator
+import akka.routing.Routing._
+import akka.dispatch.Dispatchers
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
+import akka.config.Supervision.SupervisorConfig
+import akka.config.Supervision.OneForOneStrategy
+
 /**
  * An actor that gets events from the queue, stores them persistently
  * and forwards them for further processing.
@@ -50,10 +58,7 @@ import gr.grnet.aquarium.util.{Lifecycle, Loggable}
  * @author Georgios Gousios <gousiosg@gmail.com>
  */
 class ResourceEventProcessorService extends AkkaAMQP with Loggable
-  with Lifecycle {
-
-  private var persister: ActorRef = _
-  private var queueReader: ActorRef = _
+with Lifecycle {
 
   case class AckData(deliveryTag: Long, queue: ActorRef)
   case class Persist(event: ResourceEvent, sender: ActorRef, ackData: AckData)
@@ -61,12 +66,12 @@ class ResourceEventProcessorService extends AkkaAMQP with Loggable
   case class PersistFailed(ackData: AckData)
   case class Duplicate(ackData: AckData)
 
-  class QueueReader(persister: ActorRef) extends Actor {
+  class QueueReader extends Actor {
 
     def receive = {
       case Delivery(payload, _, deliveryTag, isRedeliver, _, queue) =>
         val event = ResourceEvent.fromBytes(payload)
-        persister ! Persist(event, queueReader, AckData(deliveryTag, queue.get))
+        PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(deliveryTag, queue.get))
 
       case PersistOK(ackData) =>
         logger.debug("Stored res event:%s".format(ackData.deliveryTag))
@@ -81,10 +86,12 @@ class ResourceEventProcessorService extends AkkaAMQP with Loggable
         ackData.queue ! Reject(ackData.deliveryTag, false)
 
       case Acknowledged(deliveryTag) =>
-        //Forward to the dispatcher
+      //Forward to the dispatcher
 
       case _ => logger.warn("Unknown message")
     }
+
+    self.dispatcher = QueueReaderManager.dispatcher
   }
 
   class Persister extends Actor {
@@ -105,33 +112,100 @@ class ResourceEventProcessorService extends AkkaAMQP with Loggable
 
     def persist(event: ResourceEvent): Boolean = {
       MasterConf.MasterConf.eventStore.storeEvent(event) match {
-          case Just(x) => true
-          case x: Failed =>
-            logger.error("Could not save event: %s".format(event))
-            false
-          case NoVal => false
+        case Just(x) => true
+        case x: Failed =>
+          logger.error("Could not save event: %s".format(event))
+          false
+        case NoVal => false
+      }
+    }
+
+    self.dispatcher = PersisterManager.dispatcher
+  }
+
+  lazy val supervisor = Supervisor(SupervisorConfig(
+    OneForOneStrategy(
+      List(classOf[Exception]), //What exceptions will be handled
+      3, // maximum number of restart retries
+      5000 // within time in millis
+    ), Nil
+
+  ))
+
+  object QueueReaderManager {
+    val numCPUs = Runtime.getRuntime.availableProcessors
+    var actors: List[ActorRef] = _
+
+    // sets up load balancing among the actors created above to allow multithreading
+    lazy val lb = loadBalancerActor(new CyclicIterator(actors))
+
+    lazy val dispatcher =
+      Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("QueueReaderDispatcher")
+        .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
+        .setMaxPoolSize(numCPUs)
+        .setCorePoolSize(2)
+        .setKeepAliveTimeInMillis(60000)
+        .setRejectionPolicy(new CallerRunsPolicy).build
+
+    def start() = {
+      actors = {
+        for (i <- 0 until numCPUs) yield {
+          val actor = actorOf(new QueueReader)
+          supervisor.link(actor)
+          actor.start()
+          actor
         }
+      }.toList
+    }
+
+    def stop() = {
+      actors.foreach(a => a.stop)
     }
   }
 
-  def start() {
-    logger.info("Starting resource event processor service")
+  object PersisterManager {
+    val numCPUs = Runtime.getRuntime.availableProcessors
+    var actors: List[ActorRef] = _
+
+    lazy val lb = loadBalancerActor(new CyclicIterator(actors))
+
+    val dispatcher =
+      Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("PersisterDispatcher")
+        .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
+        .setMaxPoolSize(numCPUs)
+        .setCorePoolSize(2)
+        .setKeepAliveTimeInMillis(60000)
+        .setRejectionPolicy(new CallerRunsPolicy).build
+
+    def start() = {
+      actors = {
+        for (i <- 0 until numCPUs) yield {
+          val actor = actorOf(new Persister)
+          supervisor.link(actor)
+          actor.start()
+          actor
+        }
+      }.toList
+    }
 
-    persister = Actor.actorOf(new Persister)
-    queueReader = Actor.actorOf(new QueueReader(persister))
+    def stop() = {
+      actors.foreach(a => a.stop)
+    }
+  }
 
-    queueReader.startLink(persister)
+  def start() {
+    logger.info("Starting resource event processor service")
 
-    queueReader.start()
-    persister.start()
+    QueueReaderManager.start()
+    PersisterManager.start()
 
-    consumer("event.#", "resource-events", "aquarium", queueReader, false)
+    consumer("event.#", "resource-events", "aquarium", QueueReaderManager.lb, false)
   }
 
   def stop() {
-    logger.info("Stopping resource event processor service")
+    QueueReaderManager.stop()
+    PersisterManager.stop()
 
-    persister.stop()
-    queueReader.stop()
+    logger.info("Stopping resource event processor service")
   }
 }
\ No newline at end of file