import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
import akka.config.Supervision.SupervisorConfig
import akka.config.Supervision.OneForOneStrategy
+import java.util.concurrent.ConcurrentSkipListSet
/**
* An actor that gets events from the queue, stores them persistently
case class PersistFailed(ackData: AckData)
case class Duplicate(ackData: AckData)
+ val redeliveries = new ConcurrentSkipListSet[String]()
+
class QueueReader extends Actor {
def receive = {
case Delivery(payload, _, deliveryTag, isRedeliver, _, queue) =>
val event = ResourceEvent.fromBytes(payload)
- PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
+ if (isRedeliver) {
+ //Message could not be processed 3 times, just
+ if (redeliveries.contains(event.id)) {
+ logger.warn("Event %s redelivered >2 times. Ignoring".format(event))
+ queue ! Reject(deliveryTag, false)
+ redeliveries.remove(event.id)
+ } else {
+ //Redeliver, but keep track of the message
+ redeliveries.add(event.id)
+ PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
+ }
+ } else
+ PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
case PersistOK(ackData) =>
logger.debug("Stored event:%s".format(ackData.msgId))
ackData.queue ! Acknowledge(ackData.deliveryTag)
case PersistFailed(ackData) =>
+ //Give the message a chance to be processed by other processors
logger.debug("Storing event:%s failed".format(ackData.msgId))
ackData.queue ! Reject(ackData.deliveryTag, true)