Better logging, better stopping of actors
[aquarium] / logic / src / main / scala / gr / grnet / aquarium / processor / actor / ResourceEventProcessorService.scala
index 35504d5..9e249d8 100644 (file)
@@ -1,5 +1,3 @@
-package gr.grnet.aquarium.processor.actor
-
 /*
  * Copyright 2011 GRNET S.A. All rights reserved.
  *
@@ -35,14 +33,24 @@ package gr.grnet.aquarium.processor.actor
  * or implied, of GRNET S.A.
  */
 
+package gr.grnet.aquarium.processor.actor
+
 import gr.grnet.aquarium.messaging.AkkaAMQP
-import akka.actor.{ActorRef, Actor}
 import gr.grnet.aquarium.logic.events.ResourceEvent
 import akka.amqp.{Reject, Acknowledge, Acknowledged, Delivery}
 import com.ckkloverdos.maybe.{NoVal, Failed, Just}
 import gr.grnet.aquarium.{MasterConf}
 import gr.grnet.aquarium.util.{Lifecycle, Loggable}
 
+import akka.actor._
+import akka.actor.Actor._
+import akka.routing.CyclicIterator
+import akka.routing.Routing._
+import akka.dispatch.Dispatchers
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
+import akka.config.Supervision.SupervisorConfig
+import akka.config.Supervision.OneForOneStrategy
+
 /**
  * An actor that gets events from the queue, stores them persistently
  * and forwards them for further processing.
@@ -50,41 +58,40 @@ import gr.grnet.aquarium.util.{Lifecycle, Loggable}
  * @author Georgios Gousios <gousiosg@gmail.com>
  */
 class ResourceEventProcessorService extends AkkaAMQP with Loggable
-  with Lifecycle {
-
-  private var persister: ActorRef = _
-  private var queueReader: ActorRef = _
+with Lifecycle {
 
-  case class AckData(deliveryTag: Long, queue: ActorRef)
+  case class AckData(msgId: String, deliveryTag: Long, queue: ActorRef)
   case class Persist(event: ResourceEvent, sender: ActorRef, ackData: AckData)
   case class PersistOK(ackData: AckData)
   case class PersistFailed(ackData: AckData)
   case class Duplicate(ackData: AckData)
 
-  class QueueReader(persister: ActorRef) extends Actor {
+  class QueueReader extends Actor {
 
     def receive = {
       case Delivery(payload, _, deliveryTag, isRedeliver, _, queue) =>
         val event = ResourceEvent.fromBytes(payload)
-        persister ! Persist(event, Actor.actorOf(this), AckData(deliveryTag, queue.get))
+        PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
 
       case PersistOK(ackData) =>
-        logger.debug("Stored res event:%s".format(ackData.deliveryTag))
+        logger.debug("Stored event:%s".format(ackData.msgId))
         ackData.queue ! Acknowledge(ackData.deliveryTag)
 
       case PersistFailed(ackData) =>
-        logger.debug("Storing res event:%s failed".format(ackData.deliveryTag))
+        logger.debug("Storing event:%s failed".format(ackData.msgId))
         ackData.queue ! Reject(ackData.deliveryTag, true)
 
       case Duplicate(ackData) =>
-        logger.debug("Res event:%s is duplicate".format(ackData.deliveryTag))
+        logger.debug("Event:%s is duplicate".format(ackData.msgId))
         ackData.queue ! Reject(ackData.deliveryTag, false)
 
       case Acknowledged(deliveryTag) =>
-        //Forward to the dispatcher
+      //Forward to the dispatcher
 
       case _ => logger.warn("Unknown message")
     }
+
+    self.dispatcher = QueueReaderManager.dispatcher
   }
 
   class Persister extends Actor {
@@ -101,37 +108,97 @@ class ResourceEventProcessorService extends AkkaAMQP with Loggable
     }
 
     def exists(event: ResourceEvent): Boolean =
-      MasterConf.MasterConf.eventStore.findEventById(event.id).isEmpty
+      !MasterConf.MasterConf.eventStore.findEventById(event.id).isEmpty
 
     def persist(event: ResourceEvent): Boolean = {
       MasterConf.MasterConf.eventStore.storeEvent(event) match {
-          case Just(x) => true
-          case x: Failed =>
-            logger.error("Could not save event: %s".format(event))
-            false
-          case NoVal => false
-        }
+        case Just(x) => true
+        case x: Failed =>
+          logger.error("Could not save event: %s".format(event))
+          false
+        case NoVal => false
+      }
     }
+
+    self.dispatcher = PersisterManager.dispatcher
   }
 
-  def start() {
-    logger.info("Starting resource event processor service")
+  lazy val supervisor = Supervisor(SupervisorConfig(
+    OneForOneStrategy(
+      List(classOf[Exception]), //What exceptions will be handled
+      3, // maximum number of restart retries
+      5000 // within time in millis
+    ), Nil
+
+  ))
+
+  object QueueReaderManager {
+    val numCPUs = Runtime.getRuntime.availableProcessors
+    var actors: List[ActorRef] = _
+    lazy val lb = loadBalancerActor(new CyclicIterator(actors))
+
+    lazy val dispatcher =
+      Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("QueueReaderDispatcher")
+        .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
+        .setMaxPoolSize(numCPUs)
+        .setCorePoolSize(2)
+        .setKeepAliveTimeInMillis(60000)
+        .setRejectionPolicy(new CallerRunsPolicy).build
+
+    def start() = {
+      actors = {
+        for (i <- 0 until numCPUs) yield {
+          val actor = actorOf(new QueueReader)
+          supervisor.link(actor)
+          actor.start()
+          actor
+        }
+      }.toList
+    }
 
-    persister = Actor.actorOf(new Persister)
-    queueReader = Actor.actorOf(new QueueReader(persister))
+    def stop() = dispatcher.stopAllAttachedActors
+  }
 
-    queueReader.link(persister)
+  object PersisterManager {
+    val numCPUs = Runtime.getRuntime.availableProcessors
+    var actors: List[ActorRef] = _
+
+    lazy val lb = loadBalancerActor(new CyclicIterator(actors))
+
+    val dispatcher =
+      Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("PersisterDispatcher")
+        .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
+        .setMaxPoolSize(numCPUs)
+        .setCorePoolSize(2)
+        .setKeepAliveTimeInMillis(60000)
+        .setRejectionPolicy(new CallerRunsPolicy).build
+
+    def start() = {
+      actors = {
+        for (i <- 0 until numCPUs) yield {
+          val actor = actorOf(new Persister)
+          supervisor.link(actor)
+          actor.start()
+          actor
+        }
+      }.toList
+    }
 
-    queueReader.start()
-    persister.start()
+    def stop() = dispatcher.stopAllAttachedActors
+  }
 
-    consumer("event.#", "resource-events", "aquarium", queueReader, false)
+  def start() {
+    logger.info("Starting resource event processor service")
+
+    QueueReaderManager.start()
+    PersisterManager.start()
+    consumer("event.#", "resource-events", "aquarium", QueueReaderManager.lb, false)
   }
 
   def stop() {
-    logger.info("Stopping resource event processor service")
+    QueueReaderManager.stop()
+    PersisterManager.stop()
 
-    persister.stop()
-    queueReader.stop()
+    logger.info("Stopping resource event processor service")
   }
 }
\ No newline at end of file