Event refactoring
[aquarium] / src / main / scala / gr / grnet / aquarium / service / EventProcessorService.scala
index 6498256..3f81aee 100644 (file)
@@ -35,7 +35,7 @@
 
 package gr.grnet.aquarium.service
 
-import gr.grnet.aquarium.util.{Lifecycle, Loggable, makeString}
+import gr.grnet.aquarium.util.{Lifecycle, Loggable}
 
 import akka.actor._
 import akka.actor.Actor._
@@ -48,10 +48,10 @@ import akka.config.Supervision.OneForOneStrategy
 import gr.grnet.aquarium.messaging.AkkaAMQP
 import akka.amqp._
 import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet}
-import com.ckkloverdos.maybe._
-import gr.grnet.aquarium.events.AquariumEvent
 import gr.grnet.aquarium.util.date.TimeHelpers
 import gr.grnet.aquarium.{AquariumException, Configurator}
+import gr.grnet.aquarium.actor.ReflectiveActor
+import gr.grnet.aquarium.event.model.ExternalEventModel
 
 /**
  * An abstract service that retrieves Aquarium events from a queue,
@@ -63,7 +63,7 @@ import gr.grnet.aquarium.{AquariumException, Configurator}
  *
  * @author Georgios Gousios <gousiosg@gmail.com>
  */
-abstract class EventProcessorService[E <: AquariumEvent] extends AkkaAMQP with Loggable with Lifecycle {
+abstract class EventProcessorService[E <: ExternalEventModel] extends AkkaAMQP with Loggable with Lifecycle {
 
   /* Messages exchanged between the persister and the queuereader */
   case class AckData(msgId: String, deliveryTag: Long, queue: ActorRef)
@@ -76,12 +76,19 @@ abstract class EventProcessorService[E <: AquariumEvent] extends AkkaAMQP with L
 
   case class Duplicate(ackData: AckData)
 
-  /* Short term storage for delivery tags to work around AMQP
+  /**
+   * Short term storage for delivery tags to work around AMQP
    * limitation with redelivering rejected messages to same host.
+   *
+   * FIXME: Grow unbounded???
    */
   private val redeliveries = new ConcurrentSkipListSet[String]()
 
-  /* Temporarily keeps track of messages while being processed */
+  /**
+   *  Temporarily keeps track of messages while being processed
+   *
+   *  FIXME: Grow unbounded???
+   */
   private val inFlightEvents = new ConcurrentHashMap[Long, E](200, 0.9F, 4)
 
   /* Supervisor actor for each event processing operation */
@@ -95,15 +102,15 @@ abstract class EventProcessorService[E <: AquariumEvent] extends AkkaAMQP with L
 
   protected def _configurator: Configurator = Configurator.MasterConfigurator
 
-  protected def decode(data: Array[Byte]): E
+  protected def parseJsonBytes(data: Array[Byte]): E
 
   protected def forward(event: E): Unit
 
-  protected def exists(event: E): Boolean
+  protected def existsInStore(event: E): Boolean
 
-  protected def persist(event: E, initialPayload: Array[Byte]): Boolean
+  protected def storeParsedEvent(event: E, initialPayload: Array[Byte]): Unit
 
-  protected def persistUnparsed(initialPayload: Array[Byte], exception: Throwable): Unit
+  protected def storeUnparsedEvent(initialPayload: Array[Byte], exception: Throwable): Unit
 
   protected def queueReaderThreads: Int
 
@@ -146,48 +153,41 @@ abstract class EventProcessorService[E <: AquariumEvent] extends AkkaAMQP with L
 
     def receive = {
       case Delivery(payload, _, deliveryTag, isRedeliver, _, queue) =>
-        val eventM = MaybeEither {
-          decode(payload)
-        } // either decoded or error
-        eventM match {
-          case Just(event) ⇒
-            inFlightEvents.put(deliveryTag, event)
-
-            if(isRedeliver) {
-              //Message could not be processed 3 times, just ignore it
-              if(redeliveries.contains(event.id)) {
-                logger.warn("Actor[%s] - Event[%s] msg[%d] redelivered >2 times. Rejecting".format(self.getUuid(), event, deliveryTag))
-                queue ! Reject(deliveryTag, false)
-                redeliveries.remove(event.id)
-                inFlightEvents.remove(deliveryTag)
-              } else {
-                //Redeliver, but keep track of the message
-                redeliveries.add(event.id)
-                persisterManager.lb ! Persist(event, payload, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
-              }
+        try {
+          val event = parseJsonBytes(payload)
+          inFlightEvents.put(deliveryTag, event)
+
+          if(isRedeliver) {
+            //Message could not be processed 3 times, just ignore it
+            if(redeliveries.contains(event.id)) {
+              logger.warn("Actor[%s] - Event[%s] msg[%d] redelivered >2 times. Rejecting".format(self.getUuid(), event, deliveryTag))
+              queue ! Reject(deliveryTag, false)
+              redeliveries.remove(event.id)
+              inFlightEvents.remove(deliveryTag)
             } else {
-              val eventWithReceivedMillis = event.copyWithReceivedMillis(TimeHelpers.nowMillis).asInstanceOf[E]
-              persisterManager.lb ! Persist(eventWithReceivedMillis, payload, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
-            }
-
-          case failed@Failed(e) ⇒
-            logger.error("While decoding payload", e)
-            logger.error("Offensive payload = \n{}", makeString(payload))
-
-            // If we could not create an object from the incoming json, then we just store the message
-            // and then ignore it.
-            // TODO: Possibly the sending site should setup a queue to accept such erroneous messages?
-            MaybeEither {
-              persistUnparsed(payload, e)
-            } match {
-              case Just(_) ⇒
-                logger.debug("Sending Acknowledge(deliveryTag) = {}", Acknowledge(deliveryTag))
-                queue ! Acknowledge(deliveryTag)
-              case Failed(e) ⇒
-                logger.error("While persisting unparsed event", e)
-                logger.debug("Sending Reject(deliveryTag, true) = {}", Reject(deliveryTag, true))
-                queue ! Reject(deliveryTag, true)
+              //Redeliver, but keep track of the message
+              redeliveries.add(event.id)
+              persisterManager.lb ! Persist(event, payload, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
             }
+          } else {
+            val eventWithReceivedMillis = event.withReceivedMillis(TimeHelpers.nowMillis()).asInstanceOf[E]
+            persisterManager.lb ! Persist(eventWithReceivedMillis, payload, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
+          }
+
+        } catch { case e: Exception ⇒
+          logger.error("While parsing incoming json bytes payload", e)
+
+          // If we could not create an object from the incoming json, then we just store the message
+          // and then ignore it.
+          // TODO: Possibly the sending site should setup a queue to accept such erroneous messages?
+          try {
+            storeUnparsedEvent(payload, e)
+            queue ! Acknowledge(deliveryTag)
+          } catch { case e: Exception ⇒
+            // Aquarium internal error here...
+            logger.error("Could not store unparsed json bytes payload", e)
+            queue ! Reject(deliveryTag, true)
+          }
         }
 
       case PersistOK(ackData) =>
@@ -223,25 +223,30 @@ abstract class EventProcessorService[E <: AquariumEvent] extends AkkaAMQP with L
     self.dispatcher = queueReaderManager.dispatcher
   }
 
-  class Persister extends Actor {
+  class Persister extends ReflectiveActor {
 
-    def receive = {
-      case Persist(event, initialPayload, sender, ackData) ⇒
-        logger.debug("Persister-%s attempting store".format(self.getUuid()))
-        //val time = TimeHelpers.nowMillis
-        if(exists(event))
-          sender ! Duplicate(ackData)
-        else if(persist(event, initialPayload)) {
-          //logger.debug("Persist time: %d ms".format(TimeHelpers.nowMillis - time))
-          sender ! PersistOK(ackData)
-        } else
+    def knownMessageTypes = Set(classOf[Persist])
+
+    override protected def onThrowable(t: Throwable, servicingMessage: AnyRef) {
+      logChainOfCauses(t)
+      servicingMessage match {
+        case Persist(event, initialPayload, sender, ackData) ⇒
           sender ! PersistFailed(ackData)
-      case _ => logger.warn("Unknown message")
+          logger.error("While persisting", t)
+      }
     }
 
-    override def preStart = {
-      logger.debug("Starting actor Persister-%s".format(self.getUuid()))
-      super.preStart
+    def onPersist(persist: Persist): Unit = {
+      persist match {
+        case Persist(event, initialPayload, sender, ackData) ⇒
+          if(existsInStore(event)) {
+            sender ! Duplicate(ackData)
+          }
+          else {
+            storeParsedEvent(event, initialPayload)
+            sender ! PersistOK(ackData)
+          }
+      }
     }
 
     self.dispatcher = persisterManager.dispatcher