Better logging, better stopping of actors
authorGeorgios Gousios <gousiosg@gmail.com>
Fri, 16 Dec 2011 12:06:10 +0000 (14:06 +0200)
committerGeorgios Gousios <gousiosg@gmail.com>
Fri, 16 Dec 2011 12:07:20 +0000 (14:07 +0200)
logic/src/main/scala/gr/grnet/aquarium/processor/actor/ResourceEventProcessorService.scala

index 0ab5bc5..9e249d8 100644 (file)
@@ -60,7 +60,7 @@ import akka.config.Supervision.OneForOneStrategy
 class ResourceEventProcessorService extends AkkaAMQP with Loggable
 with Lifecycle {
 
-  case class AckData(deliveryTag: Long, queue: ActorRef)
+  case class AckData(msgId: String, deliveryTag: Long, queue: ActorRef)
   case class Persist(event: ResourceEvent, sender: ActorRef, ackData: AckData)
   case class PersistOK(ackData: AckData)
   case class PersistFailed(ackData: AckData)
@@ -71,18 +71,18 @@ with Lifecycle {
     def receive = {
       case Delivery(payload, _, deliveryTag, isRedeliver, _, queue) =>
         val event = ResourceEvent.fromBytes(payload)
-        PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(deliveryTag, queue.get))
+        PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
 
       case PersistOK(ackData) =>
-        logger.debug("Stored res event:%s".format(ackData.deliveryTag))
+        logger.debug("Stored event:%s".format(ackData.msgId))
         ackData.queue ! Acknowledge(ackData.deliveryTag)
 
       case PersistFailed(ackData) =>
-        logger.debug("Storing res event:%s failed".format(ackData.deliveryTag))
+        logger.debug("Storing event:%s failed".format(ackData.msgId))
         ackData.queue ! Reject(ackData.deliveryTag, true)
 
       case Duplicate(ackData) =>
-        logger.debug("Res event:%s is duplicate".format(ackData.deliveryTag))
+        logger.debug("Event:%s is duplicate".format(ackData.msgId))
         ackData.queue ! Reject(ackData.deliveryTag, false)
 
       case Acknowledged(deliveryTag) =>
@@ -135,8 +135,6 @@ with Lifecycle {
   object QueueReaderManager {
     val numCPUs = Runtime.getRuntime.availableProcessors
     var actors: List[ActorRef] = _
-
-    // sets up load balancing among the actors created above to allow multithreading
     lazy val lb = loadBalancerActor(new CyclicIterator(actors))
 
     lazy val dispatcher =
@@ -158,9 +156,7 @@ with Lifecycle {
       }.toList
     }
 
-    def stop() = {
-      actors.foreach(a => a.stop)
-    }
+    def stop() = dispatcher.stopAllAttachedActors
   }
 
   object PersisterManager {
@@ -188,9 +184,7 @@ with Lifecycle {
       }.toList
     }
 
-    def stop() = {
-      actors.foreach(a => a.stop)
-    }
+    def stop() = dispatcher.stopAllAttachedActors
   }
 
   def start() {
@@ -198,7 +192,6 @@ with Lifecycle {
 
     QueueReaderManager.start()
     PersisterManager.start()
-
     consumer("event.#", "resource-events", "aquarium", QueueReaderManager.lb, false)
   }