Make the load balancing managers regular objects. Docs++
authorGeorgios Gousios <gousiosg@gmail.com>
Tue, 3 Jan 2012 12:09:09 +0000 (14:09 +0200)
committerGeorgios Gousios <gousiosg@gmail.com>
Tue, 3 Jan 2012 12:09:09 +0000 (14:09 +0200)
src/main/scala/gr/grnet/aquarium/processor/actor/EventProcessorService.scala
src/main/scala/gr/grnet/aquarium/processor/actor/IMEventProcessorService.scala
src/main/scala/gr/grnet/aquarium/processor/actor/ResourceEventProcessorService.scala

index 60c0f8e..84ed0be 100644 (file)
@@ -64,17 +64,32 @@ import gr.grnet.aquarium.logic.events.AquariumEvent
 abstract class EventProcessorService extends AkkaAMQP with Loggable
 with Lifecycle {
 
-  val redeliveries = new ConcurrentSkipListSet[String]()
-  val inFlightEvents = new ConcurrentHashMap[Long, AquariumEvent](200, 0.9F, 4)
-
-  protected def _configurator: Configurator = Configurator.MasterConfigurator
-
+  /* Messages exchanged between the persister and the queuereader */
   case class AckData(msgId: String, deliveryTag: Long, queue: ActorRef)
   case class Persist(event: AquariumEvent, sender: ActorRef, ackData: AckData)
   case class PersistOK(ackData: AckData)
   case class PersistFailed(ackData: AckData)
   case class Duplicate(ackData: AckData)
 
+  /* Short term storage for delivery tags to work around AMQP
+   * limitation with redelivering rejected messages to same host.
+   */
+  private val redeliveries = new ConcurrentSkipListSet[String]()
+
+  /* Temporarily keeps track of messages while being processed */
+  private val inFlightEvents = new ConcurrentHashMap[Long, AquariumEvent](200, 0.9F, 4)
+
+  /* Supervisor actor for each event processing operation */
+  private lazy val supervisor = Supervisor(SupervisorConfig(
+    OneForOneStrategy(
+      List(classOf[Exception]), //What exceptions will be handled
+      5, // maximum number of restart retries
+      5000 // within time in millis
+    ), Nil
+  ))
+
+  protected def _configurator: Configurator = Configurator.MasterConfigurator
+
   protected def decode(data: Array[Byte]): AquariumEvent
   protected def forward(resourceEvent: AquariumEvent): Unit
   protected def exists(event: AquariumEvent): Boolean
@@ -83,6 +98,9 @@ with Lifecycle {
   protected def queueReaderThreads: Int
   protected def persisterThreads: Int
   protected def name: String
+
+  protected def persisterManager: PersisterManager
+  protected def queueReaderManager: QueueReaderManager
   
   def start(): Unit
   def stop() : Unit
@@ -104,11 +122,11 @@ with Lifecycle {
           } else {
             //Redeliver, but keep track of the message
             redeliveries.add(event.id)
-            PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
+            persisterManager.lb ! Persist(event, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
           }
         } else {
           val eventWithReceivedMillis = event.setRcvMillis(System.currentTimeMillis())
-          PersisterManager.lb ! Persist(eventWithReceivedMillis, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
+          persisterManager.lb ! Persist(eventWithReceivedMillis, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
         }
 
       case PersistOK(ackData) =>
@@ -136,7 +154,7 @@ with Lifecycle {
       case _ => logger.warn("Unknown message")
     }
 
-    self.dispatcher = QueueReaderManager.dispatcher
+    self.dispatcher = queueReaderManager.dispatcher
   }
 
   class Persister extends Actor {
@@ -152,69 +170,51 @@ with Lifecycle {
       case _ => logger.warn("Unknown message")
     }
 
-    self.dispatcher = PersisterManager.dispatcher
+    self.dispatcher = persisterManager.dispatcher
   }
 
-  lazy val supervisor = Supervisor(SupervisorConfig(
-    OneForOneStrategy(
-      List(classOf[Exception]), //What exceptions will be handled
-      3, // maximum number of restart retries
-      5000 // within time in millis
-    ), Nil
-
-  ))
-
-  object QueueReaderManager {
+  class QueueReaderManager {
     val numCPUs = Runtime.getRuntime.availableProcessors
-    var actors: List[ActorRef] = _
     lazy val lb = loadBalancerActor(new CyclicIterator(actors))
 
     lazy val dispatcher =
-      Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "queuereader")
+      Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "-queuereader")
         .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(1000)
         .setMaxPoolSize(queueReaderThreads)
         .setCorePoolSize(1)
         .setKeepAliveTimeInMillis(60000)
         .setRejectionPolicy(new CallerRunsPolicy).build
 
-    def start() = {
-      actors = {
-        for (i <- 0 until numCPUs) yield {
-          val actor = actorOf(new QueueReader)
-          supervisor.link(actor)
-          actor.start()
-          actor
-        }
-      }.toList
-    }
+    lazy val actors =
+      for (i <- 0 until numCPUs) yield {
+        val actor = actorOf(new QueueReader)
+        supervisor.link(actor)
+        actor.start()
+        actor
+      }
 
     def stop() = dispatcher.stopAllAttachedActors
   }
 
-  object PersisterManager {
+  class PersisterManager {
     val numCPUs = Runtime.getRuntime.availableProcessors
-    var actors: List[ActorRef] = _
-
     lazy val lb = loadBalancerActor(new CyclicIterator(actors))
 
     val dispatcher =
-      Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "persister")
+      Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "-persister")
         .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(1000)
         .setMaxPoolSize(persisterThreads)
         .setCorePoolSize(1)
         .setKeepAliveTimeInMillis(60000)
         .setRejectionPolicy(new CallerRunsPolicy).build
 
-    def start() = {
-      actors = {
-        for (i <- 0 until numCPUs) yield {
-          val actor = actorOf(new Persister)
-          supervisor.link(actor)
-          actor.start()
-          actor
-        }
-      }.toList
-    }
+    lazy val actors =
+      for (i <- 0 until numCPUs) yield {
+        val actor = actorOf(new Persister)
+        supervisor.link(actor)
+        actor.start()
+        actor
+      }
 
     def stop() = dispatcher.stopAllAttachedActors
   }
index c72441f..e50a73e 100644 (file)
@@ -8,34 +8,34 @@ import gr.grnet.aquarium.logic.events.AquariumEvent
  *
  * @author Georgios Gousios <gousiosg@gmail.com>
  */
-class IMEventProcessorService extends EventProcessorService {
+final class IMEventProcessorService extends EventProcessorService {
 
-  protected def decode(data: Array[Byte]) = null
+  override def decode(data: Array[Byte]) = null
 
-  protected def forward(resourceEvent: AquariumEvent) {}
+  override def forward(resourceEvent: AquariumEvent) {}
 
-  protected def exists(event: AquariumEvent) = false
+  override def exists(event: AquariumEvent) = false
 
-  protected def persist(event: AquariumEvent) = false
+  override def persist(event: AquariumEvent) = false
 
   override def queueReaderThreads: Int = 1
   override def persisterThreads: Int = 2
   override def name = "imevtproc"
 
+  override def persisterManager = new PersisterManager
+  override def queueReaderManager = new QueueReaderManager
+
   def start() {
     logger.info("Starting IM event processor service")
 
-    QueueReaderManager.start()
-    PersisterManager.start()
-
     consumer("%s.#".format(MessagingNames.IM_EVENT_KEY),
       MessagingNames.IM_EVENT_QUEUE, MessagingNames.IM_EXCHANGE,
-      QueueReaderManager.lb, false)
+      queueReaderManager.lb, false)
   }
 
   def stop() {
-    QueueReaderManager.stop()
-    PersisterManager.stop()
+    queueReaderManager.stop()
+    persisterManager.stop()
 
     logger.info("Stopping IM event processor service")
   }
index 6458fdb..10ffde3 100644 (file)
@@ -39,20 +39,20 @@ final class ResourceEventProcessorService extends EventProcessorService {
   override def persisterThreads: Int = 2
   override def name = "resevtproc"
 
+  override def persisterManager = new PersisterManager
+  override def queueReaderManager = new QueueReaderManager
+
   def start() {
     logger.info("Starting resource event processor service")
 
-    QueueReaderManager.start()
-    PersisterManager.start()
-
     consumer("%s.#".format(MessagingNames.RES_EVENT_KEY),
       MessagingNames.RESOURCE_EVENT_QUEUE, MessagingNames.AQUARIUM_EXCHANGE,
-      QueueReaderManager.lb, false)
+      queueReaderManager.lb, false)
   }
 
   def stop() {
-    QueueReaderManager.stop()
-    PersisterManager.stop()
+    queueReaderManager.stop()
+    persisterManager.stop()
 
     logger.info("Stopping resource event processor service")
   }