More verbose debug logging, save custom timestamp
authorGeorgios Gousios <gousiosg@gmail.com>
Fri, 23 Dec 2011 12:20:47 +0000 (14:20 +0200)
committerGeorgios Gousios <gousiosg@gmail.com>
Thu, 29 Dec 2011 08:36:56 +0000 (10:36 +0200)
src/main/scala/gr/grnet/aquarium/processor/actor/ResourceEventProcessorService.scala

index c78743e..c2f35d9 100644 (file)
@@ -36,7 +36,6 @@
 package gr.grnet.aquarium.processor.actor
 
 import gr.grnet.aquarium.logic.events.ResourceEvent
-import akka.amqp.{Reject, Acknowledge, Acknowledged, Delivery}
 import com.ckkloverdos.maybe.{NoVal, Failed, Just}
 import gr.grnet.aquarium.{MasterConf}
 import gr.grnet.aquarium.util.{Lifecycle, Loggable}
@@ -51,6 +50,7 @@ import akka.config.Supervision.SupervisorConfig
 import akka.config.Supervision.OneForOneStrategy
 import java.util.concurrent.ConcurrentSkipListSet
 import gr.grnet.aquarium.messaging.{MessagingNames, AkkaAMQP}
+import akka.amqp._
 
 /**
  * An actor that gets events from the queue, stores them persistently
@@ -77,7 +77,7 @@ with Lifecycle {
         if (isRedeliver) {
           //Message could not be processed 3 times, just
           if (redeliveries.contains(event.id)) {
-            logger.warn("Event %s redelivered >2 times. Ignoring".format(event))
+            logger.warn("Event[%s] msg[%d] redelivered >2 times. Rejecting".format(event, deliveryTag))
             queue ! Reject(deliveryTag, false)
             redeliveries.remove(event.id)
           } else {
@@ -89,20 +89,24 @@ with Lifecycle {
           PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
 
       case PersistOK(ackData) =>
-        logger.debug("Stored event:%s".format(ackData.msgId))
+        logger.debug("Stored event[%s] msg[%d]".format(ackData.msgId, ackData.deliveryTag))
         ackData.queue ! Acknowledge(ackData.deliveryTag)
 
       case PersistFailed(ackData) =>
         //Give the message a chance to be processed by other processors
-        logger.debug("Storing event:%s failed".format(ackData.msgId))
+        logger.debug("Storing event[%s] msg[%d] failed".format(ackData.msgId, ackData.deliveryTag))
         ackData.queue ! Reject(ackData.deliveryTag, true)
 
       case Duplicate(ackData) =>
-        logger.debug("Event:%s is duplicate".format(ackData.msgId))
+        logger.debug("Event[%s] msg[%d] is duplicate".format(ackData.msgId, ackData.deliveryTag))
         ackData.queue ! Reject(ackData.deliveryTag, false)
 
       case Acknowledged(deliveryTag) =>
-      //Forward to the dispatcher
+        logger.debug("Msg with tag [%d] acked".format(deliveryTag))
+      //TODO: Forward to the dispatcher
+
+      case Rejected(deliveryTag) =>
+        logger.debug("Msg with tag [%d] rejected".format(deliveryTag))
 
       case _ => logger.warn("Unknown message")
     }
@@ -127,6 +131,7 @@ with Lifecycle {
       !MasterConf.MasterConf.eventStore.findEventById(event.id).isEmpty
 
     def persist(event: ResourceEvent): Boolean = {
+      event.aqTimestamp = System.currentTimeMillis()
       MasterConf.MasterConf.eventStore.storeEvent(event) match {
         case Just(x) => true
         case x: Failed =>