Various performance related changes
authorGeorgios Gousios <gousiosg@gmail.com>
Tue, 3 Jan 2012 15:56:30 +0000 (17:56 +0200)
committerGeorgios Gousios <gousiosg@gmail.com>
Tue, 3 Jan 2012 15:56:30 +0000 (17:56 +0200)
-Start lots of actors by default, to compensate for slow I/O
-Start a fair amount of threads by default
-More detailed logging
-Don't create a new dispatcher per request

src/main/scala/gr/grnet/aquarium/processor/actor/EventProcessorService.scala
src/main/scala/gr/grnet/aquarium/processor/actor/ResourceEventProcessorService.scala
src/main/scala/gr/grnet/aquarium/processor/actor/UserEventProcessorService.scala

index 84ed0be..1f6c55b 100644 (file)
@@ -101,7 +101,9 @@ with Lifecycle {
 
   protected def persisterManager: PersisterManager
   protected def queueReaderManager: QueueReaderManager
-  
+
+  protected val numCPUs = Runtime.getRuntime.availableProcessors
+
   def start(): Unit
   def stop() : Unit
 
@@ -115,7 +117,7 @@ with Lifecycle {
         if (isRedeliver) {
           //Message could not be processed 3 times, just ignore it
           if (redeliveries.contains(event.id)) {
-            logger.warn("Event[%s] msg[%d] redelivered >2 times. Rejecting".format(event, deliveryTag))
+            logger.warn("Actor[%s] - Event[%s] msg[%d] redelivered >2 times. Rejecting".format(self.getUuid(), event, deliveryTag))
             queue ! Reject(deliveryTag, false)
             redeliveries.remove(event.id)
             inFlightEvents.remove(deliveryTag)
@@ -130,30 +132,35 @@ with Lifecycle {
         }
 
       case PersistOK(ackData) =>
-        logger.debug("Stored event[%s] msg[%d] - %d left".format(ackData.msgId, ackData.deliveryTag, inFlightEvents.size))
+        logger.debug("Actor[%s] - Stored event[%s] msg[%d] - %d left".format(self.getUuid(), ackData.msgId, ackData.deliveryTag, inFlightEvents.size))
         ackData.queue ! Acknowledge(ackData.deliveryTag)
 
       case PersistFailed(ackData) =>
         //Give the message a chance to be processed by other processors
-        logger.debug("Storing event[%s] msg[%d] failed".format(ackData.msgId, ackData.deliveryTag))
+        logger.error("Actor[%s] - Storing event[%s] msg[%d] failed".format(self.getUuid(), ackData.msgId, ackData.deliveryTag))
         inFlightEvents.remove(ackData.deliveryTag)
         ackData.queue ! Reject(ackData.deliveryTag, true)
 
       case Duplicate(ackData) =>
-        logger.debug("Event[%s] msg[%d] is setRcvMillis".format(ackData.msgId, ackData.deliveryTag))
+        logger.debug("Actor[%s] - Event[%s] msg[%d] is duplicate".format(self.getUuid(), ackData.msgId, ackData.deliveryTag))
         inFlightEvents.remove(ackData.deliveryTag)
         ackData.queue ! Reject(ackData.deliveryTag, false)
 
       case Acknowledged(deliveryTag) =>
-        logger.debug("Msg with tag [%d] acked. Forwarding...".format(deliveryTag))
+        logger.debug("Actor[%s] - Msg with tag [%d] acked. Forwarding...".format(self.getUuid(), deliveryTag))
         forward(inFlightEvents.remove(deliveryTag))
 
       case Rejected(deliveryTag) =>
-        logger.debug("Msg with tag [%d] rejected".format(deliveryTag))
+        logger.debug("Actor[%s] - Msg with tag [%d] rejected".format(self.getUuid(), deliveryTag))
 
       case _ => logger.warn("Unknown message")
     }
 
+    override def preStart = {
+      logger.debug("Starting actor QueueReader-%s".format(self.getUuid()))
+      super.preStart
+    }
+
     self.dispatcher = queueReaderManager.dispatcher
   }
 
@@ -161,6 +168,7 @@ with Lifecycle {
 
     def receive = {
       case Persist(event, sender, ackData) =>
+        logger.debug("Persister-%s attempting store".format(self.getUuid()))
         if (exists(event))
           sender ! Duplicate(ackData)
         else if (persist(event)) {
@@ -170,23 +178,27 @@ with Lifecycle {
       case _ => logger.warn("Unknown message")
     }
 
+    override def preStart = {
+      logger.debug("Starting actor Persister-%s".format(self.getUuid()))
+      super.preStart
+    }
+
     self.dispatcher = persisterManager.dispatcher
   }
 
   class QueueReaderManager {
-    val numCPUs = Runtime.getRuntime.availableProcessors
     lazy val lb = loadBalancerActor(new CyclicIterator(actors))
 
     lazy val dispatcher =
       Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "-queuereader")
         .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(1000)
-        .setMaxPoolSize(queueReaderThreads)
-        .setCorePoolSize(1)
+        .setMaxPoolSize(2 * numCPUs)
+        .setCorePoolSize(queueReaderThreads)
         .setKeepAliveTimeInMillis(60000)
         .setRejectionPolicy(new CallerRunsPolicy).build
 
     lazy val actors =
-      for (i <- 0 until numCPUs) yield {
+      for (i <- 0 until 4 * numCPUs) yield {
         val actor = actorOf(new QueueReader)
         supervisor.link(actor)
         actor.start()
@@ -197,19 +209,18 @@ with Lifecycle {
   }
 
   class PersisterManager {
-    val numCPUs = Runtime.getRuntime.availableProcessors
     lazy val lb = loadBalancerActor(new CyclicIterator(actors))
 
     val dispatcher =
       Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "-persister")
         .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(1000)
-        .setMaxPoolSize(persisterThreads)
-        .setCorePoolSize(1)
+        .setMaxPoolSize(2 * numCPUs)
+        .setCorePoolSize(persisterThreads)
         .setKeepAliveTimeInMillis(60000)
         .setRejectionPolicy(new CallerRunsPolicy).build
 
     lazy val actors =
-      for (i <- 0 until numCPUs) yield {
+      for (i <- 0 until 5 * numCPUs) yield {
         val actor = actorOf(new Persister)
         supervisor.link(actor)
         actor.start()
index 10ffde3..e677048 100644 (file)
@@ -1,7 +1,6 @@
 package gr.grnet.aquarium.processor.actor
 
 import com.ckkloverdos.maybe.{Just, Failed, NoVal}
-import gr.grnet.aquarium.actor.DispatcherRole
 import gr.grnet.aquarium.messaging.MessagingNames
 import gr.grnet.aquarium.logic.events.{AquariumEvent, ResourceEvent}
 
@@ -15,11 +14,13 @@ final class ResourceEventProcessorService extends EventProcessorService {
 
   override def decode(data: Array[Byte]) : AquariumEvent = ResourceEvent.fromBytes(data)
 
-  override def forward(evt: AquariumEvent): Unit = {
+  /*override def forward(evt: AquariumEvent): Unit = {
     val resourceEvent = evt.asInstanceOf[ResourceEvent]
     val businessLogicDispacther = _configurator.actorProvider.actorForRole(DispatcherRole)
     businessLogicDispacther ! ProcessResourceEvent(resourceEvent)
-  }
+  }*/
+
+  override def forward(evt: AquariumEvent): Unit = {}
 
   override def exists(event: AquariumEvent): Boolean =
     _configurator.resourceEventStore.findResourceEventById(event.id).isJust
@@ -29,18 +30,21 @@ final class ResourceEventProcessorService extends EventProcessorService {
     _configurator.resourceEventStore.storeResourceEvent(event) match {
       case Just(x) => true
       case x: Failed =>
-        logger.error("Could not save event: %s".format(event))
+        logger.error("Could not save event: %s. Reason:".format(event, x.toString))
         false
       case NoVal => false
     }
   }
 
-  override def queueReaderThreads: Int = 1
-  override def persisterThreads: Int = 2
+  override def queueReaderThreads: Int = 4
+  override def persisterThreads: Int = numCPUs
   override def name = "resevtproc"
 
-  override def persisterManager = new PersisterManager
-  override def queueReaderManager = new QueueReaderManager
+  val persister = new PersisterManager
+  val queueReader = new QueueReaderManager
+
+  override def persisterManager   = persister
+  override def queueReaderManager = queueReader
 
   def start() {
     logger.info("Starting resource event processor service")
index 98133eb..26149f1 100644 (file)
@@ -4,12 +4,13 @@ import gr.grnet.aquarium.messaging.MessagingNames
 import gr.grnet.aquarium.logic.events.{UserEvent, AquariumEvent}
 import com.ckkloverdos.maybe.{NoVal, Failed, Just}
 
+
 /**
  * An event processor service for user events coming from the IM system
  *
  * @author Georgios Gousios <gousiosg@gmail.com>
  */
-final class UserEventProcessorService extends EventProcessorService {
+class UserEventProcessorService extends EventProcessorService {
 
   override def decode(data: Array[Byte]) = UserEvent.fromBytes(data)
 
@@ -29,12 +30,15 @@ final class UserEventProcessorService extends EventProcessorService {
     }
   }
 
-  override def queueReaderThreads: Int = 1
-  override def persisterThreads: Int = 2
+  override def queueReaderThreads: Int = 4
+  override def persisterThreads: Int = numCPUs
   override def name = "usrevtproc"
 
-  override def persisterManager = new PersisterManager
-  override def queueReaderManager = new QueueReaderManager
+  lazy val persister = new PersisterManager
+  lazy val queueReader = new QueueReaderManager
+
+  override def persisterManager   = persister
+  override def queueReaderManager = queueReader
 
   def start() {
     logger.info("Starting user event processor service")