package gr.grnet.aquarium.processor.actor
import gr.grnet.aquarium.logic.events.ResourceEvent
-import akka.amqp.{Reject, Acknowledge, Acknowledged, Delivery}
import com.ckkloverdos.maybe.{NoVal, Failed, Just}
import gr.grnet.aquarium.{MasterConf}
import gr.grnet.aquarium.util.{Lifecycle, Loggable}
import akka.config.Supervision.OneForOneStrategy
import java.util.concurrent.ConcurrentSkipListSet
import gr.grnet.aquarium.messaging.{MessagingNames, AkkaAMQP}
+import akka.amqp._
/**
* An actor that gets events from the queue, stores them persistently
if (isRedeliver) {
//Message could not be processed 3 times, just
if (redeliveries.contains(event.id)) {
- logger.warn("Event %s redelivered >2 times. Ignoring".format(event))
+ logger.warn("Event[%s] msg[%d] redelivered >2 times. Rejecting".format(event, deliveryTag))
queue ! Reject(deliveryTag, false)
redeliveries.remove(event.id)
} else {
PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
case PersistOK(ackData) =>
- logger.debug("Stored event:%s".format(ackData.msgId))
+ logger.debug("Stored event[%s] msg[%d]".format(ackData.msgId, ackData.deliveryTag))
ackData.queue ! Acknowledge(ackData.deliveryTag)
case PersistFailed(ackData) =>
//Give the message a chance to be processed by other processors
- logger.debug("Storing event:%s failed".format(ackData.msgId))
+ logger.debug("Storing event[%s] msg[%d] failed".format(ackData.msgId, ackData.deliveryTag))
ackData.queue ! Reject(ackData.deliveryTag, true)
case Duplicate(ackData) =>
- logger.debug("Event:%s is duplicate".format(ackData.msgId))
+ logger.debug("Event[%s] msg[%d] is duplicate".format(ackData.msgId, ackData.deliveryTag))
ackData.queue ! Reject(ackData.deliveryTag, false)
case Acknowledged(deliveryTag) =>
- //Forward to the dispatcher
+ logger.debug("Msg with tag [%d] acked".format(deliveryTag))
+ //TODO: Forward to the dispatcher
+
+ case Rejected(deliveryTag) =>
+ logger.debug("Msg with tag [%d] rejected".format(deliveryTag))
case _ => logger.warn("Unknown message")
}
!MasterConf.MasterConf.eventStore.findEventById(event.id).isEmpty
def persist(event: ResourceEvent): Boolean = {
+ event.aqTimestamp = System.currentTimeMillis()
MasterConf.MasterConf.eventStore.storeEvent(event) match {
case Just(x) => true
case x: Failed =>