package gr.grnet.aquarium.service
-import gr.grnet.aquarium.util.{Lifecycle, Loggable, makeString}
+import gr.grnet.aquarium.util.{Lifecycle, Loggable}
import akka.actor._
import akka.actor.Actor._
import gr.grnet.aquarium.messaging.AkkaAMQP
import akka.amqp._
import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet}
-import com.ckkloverdos.maybe._
-import gr.grnet.aquarium.events.AquariumEvent
import gr.grnet.aquarium.util.date.TimeHelpers
import gr.grnet.aquarium.{AquariumException, Configurator}
-import gr.grnet.aquarium.store.RecordID
+import gr.grnet.aquarium.actor.ReflectiveActor
+import gr.grnet.aquarium.event.model.ExternalEventModel
/**
* An abstract service that retrieves Aquarium events from a queue,
*
* @author Georgios Gousios <gousiosg@gmail.com>
*/
-abstract class EventProcessorService[E <: AquariumEvent] extends AkkaAMQP with Loggable with Lifecycle {
+abstract class EventProcessorService[E <: ExternalEventModel] extends AkkaAMQP with Loggable with Lifecycle {
/* Messages exchanged between the persister and the queuereader */
case class AckData(msgId: String, deliveryTag: Long, queue: ActorRef)
protected def _configurator: Configurator = Configurator.MasterConfigurator
- protected def decode(data: Array[Byte]): E
+ protected def parseJsonBytes(data: Array[Byte]): E
protected def forward(event: E): Unit
- protected def exists(event: E): Boolean
+ protected def existsInStore(event: E): Boolean
- protected def persist(event: E, initialPayload: Array[Byte]): Unit
+ protected def storeParsedEvent(event: E, initialPayload: Array[Byte]): Unit
- protected def persistUnparsed(initialPayload: Array[Byte], exception: Throwable): Unit
+ protected def storeUnparsedEvent(initialPayload: Array[Byte], exception: Throwable): Unit
protected def queueReaderThreads: Int
def receive = {
case Delivery(payload, _, deliveryTag, isRedeliver, _, queue) =>
- val eventM = MaybeEither {
- decode(payload)
- } // either decoded or error
- eventM match {
- case Just(event) ⇒
- inFlightEvents.put(deliveryTag, event)
-
- if(isRedeliver) {
- //Message could not be processed 3 times, just ignore it
- if(redeliveries.contains(event.id)) {
- logger.warn("Actor[%s] - Event[%s] msg[%d] redelivered >2 times. Rejecting".format(self.getUuid(), event, deliveryTag))
- queue ! Reject(deliveryTag, false)
- redeliveries.remove(event.id)
- inFlightEvents.remove(deliveryTag)
- } else {
- //Redeliver, but keep track of the message
- redeliveries.add(event.id)
- persisterManager.lb ! Persist(event, payload, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
- }
+ try {
+ val event = parseJsonBytes(payload)
+ inFlightEvents.put(deliveryTag, event)
+
+ if(isRedeliver) {
+ //Message could not be processed 3 times, just ignore it
+ if(redeliveries.contains(event.id)) {
+ logger.warn("Actor[%s] - Event[%s] msg[%d] redelivered >2 times. Rejecting".format(self.getUuid(), event, deliveryTag))
+ queue ! Reject(deliveryTag, false)
+ redeliveries.remove(event.id)
+ inFlightEvents.remove(deliveryTag)
} else {
- val eventWithReceivedMillis = event.copyWithReceivedMillis(TimeHelpers.nowMillis).asInstanceOf[E]
- persisterManager.lb ! Persist(eventWithReceivedMillis, payload, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
- }
-
- case failed@Failed(e) ⇒
- logger.error("While decoding payload", e)
- logger.error("Offensive payload = \n{}", makeString(payload))
-
- // If we could not create an object from the incoming json, then we just store the message
- // and then ignore it.
- // TODO: Possibly the sending site should setup a queue to accept such erroneous messages?
- MaybeEither {
- persistUnparsed(payload, e)
- } match {
- case Just(_) ⇒
- logger.debug("Sending Acknowledge(deliveryTag) = {}", Acknowledge(deliveryTag))
- queue ! Acknowledge(deliveryTag)
- case Failed(e) ⇒
- logger.error("While persisting unparsed event", e)
- logger.debug("Sending Reject(deliveryTag, true) = {}", Reject(deliveryTag, true))
- queue ! Reject(deliveryTag, true)
+ //Redeliver, but keep track of the message
+ redeliveries.add(event.id)
+ persisterManager.lb ! Persist(event, payload, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
}
+ } else {
+ val eventWithReceivedMillis = event.withReceivedMillis(TimeHelpers.nowMillis()).asInstanceOf[E]
+ persisterManager.lb ! Persist(eventWithReceivedMillis, payload, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
+ }
+
+ } catch { case e: Exception ⇒
+ logger.error("While parsing incoming json bytes payload", e)
+
+ // If we could not create an object from the incoming json, then we just store the message
+ // and then ignore it.
+ // TODO: Possibly the sending site should setup a queue to accept such erroneous messages?
+ try {
+ storeUnparsedEvent(payload, e)
+ queue ! Acknowledge(deliveryTag)
+ } catch { case e: Exception ⇒
+ // Aquarium internal error here...
+ logger.error("Could not store unparsed json bytes payload", e)
+ queue ! Reject(deliveryTag, true)
+ }
}
case PersistOK(ackData) =>
self.dispatcher = queueReaderManager.dispatcher
}
- class Persister extends Actor with Loggable {
+ class Persister extends ReflectiveActor {
- def receive = {
- case Persist(event, initialPayload, sender, ackData) ⇒
- if(exists(event))
- sender ! Duplicate(ackData)
- else MaybeEither {
- persist(event, initialPayload)
- } forJust { just ⇒
- sender ! PersistOK(ackData)
- } forFailed { failed ⇒
+ def knownMessageTypes = Set(classOf[Persist])
+
+ override protected def onThrowable(t: Throwable, servicingMessage: AnyRef) {
+ logChainOfCauses(t)
+ servicingMessage match {
+ case Persist(event, initialPayload, sender, ackData) ⇒
sender ! PersistFailed(ackData)
- logger.warn("While persisting {}", event)
- }
+ logger.error("While persisting", t)
+ }
}
- override def preStart = {
- super.preStart
- logStarted(TimeHelpers.nowMillis, TimeHelpers.nowMillis)
+ def onPersist(persist: Persist): Unit = {
+ persist match {
+ case Persist(event, initialPayload, sender, ackData) ⇒
+ if(existsInStore(event)) {
+ sender ! Duplicate(ackData)
+ }
+ else {
+ storeParsedEvent(event, initialPayload)
+ sender ! PersistOK(ackData)
+ }
+ }
}
self.dispatcher = persisterManager.dispatcher