Handle unlimited redeliveries of failed messages
authorGeorgios Gousios <gousiosg@gmail.com>
Fri, 16 Dec 2011 14:17:09 +0000 (16:17 +0200)
committerGeorgios Gousios <gousiosg@gmail.com>
Fri, 16 Dec 2011 14:17:09 +0000 (16:17 +0200)
logic/src/main/scala/gr/grnet/aquarium/messaging/AkkaAMQP.scala
logic/src/main/scala/gr/grnet/aquarium/processor/actor/ResourceEventProcessorService.scala
logic/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala

index 0610de6..a518a95 100644 (file)
@@ -41,10 +41,6 @@ import akka.amqp.AMQP._
 import gr.grnet.aquarium.MasterConf
 import com.rabbitmq.client.Address
 import gr.grnet.aquarium.util.Loggable
-import akka.dispatch.Dispatchers
-import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
-import akka.routing.Routing._
-import akka.routing.CyclicIterator
 
 /**
  * Functionality for working with queues.
index 9e249d8..3d666dd 100644 (file)
@@ -50,6 +50,7 @@ import akka.dispatch.Dispatchers
 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
 import akka.config.Supervision.SupervisorConfig
 import akka.config.Supervision.OneForOneStrategy
+import java.util.concurrent.ConcurrentSkipListSet
 
 /**
  * An actor that gets events from the queue, stores them persistently
@@ -66,18 +67,33 @@ with Lifecycle {
   case class PersistFailed(ackData: AckData)
   case class Duplicate(ackData: AckData)
 
+  val redeliveries = new ConcurrentSkipListSet[String]()
+
   class QueueReader extends Actor {
 
     def receive = {
       case Delivery(payload, _, deliveryTag, isRedeliver, _, queue) =>
         val event = ResourceEvent.fromBytes(payload)
-        PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
+        if (isRedeliver) {
+          //Message could not be processed 3 times, just
+          if (redeliveries.contains(event.id)) {
+            logger.warn("Event %s redelivered >2 times. Ignoring".format(event))
+            queue ! Reject(deliveryTag, false)
+            redeliveries.remove(event.id)
+          } else {
+            //Redeliver, but keep track of the message
+            redeliveries.add(event.id)
+            PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
+          }
+        } else
+          PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
 
       case PersistOK(ackData) =>
         logger.debug("Stored event:%s".format(ackData.msgId))
         ackData.queue ! Acknowledge(ackData.deliveryTag)
 
       case PersistFailed(ackData) =>
+        //Give the message a chance to be processed by other processors
         logger.debug("Storing event:%s failed".format(ackData.msgId))
         ackData.queue ! Reject(ackData.deliveryTag, true)
 
index 6c39655..ac4cae3 100644 (file)
@@ -130,6 +130,10 @@ class MongoDBStore(
       // Get back to retrieve unique id
       val cursor = events.find(_prepareFieldQuery("id", event.id))
 
+      if (!cursor.hasNext) {
+        logger.error("Failed to store event: %s".format(event))
+        return Failed(new StoreException("Failed to store event: %s".format(event)))
+      }
 
       Just(RecordID(cursor.next.get("_id").toString))
     } catch {