New logic on what to do with incoming im event
authorChristos KK Loverdos <loverdos@gmail.com>
Thu, 24 May 2012 12:22:27 +0000 (15:22 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Thu, 24 May 2012 12:22:27 +0000 (15:22 +0300)
src/main/scala/gr/grnet/aquarium/connector/handler/IMEventPayloadHandler.scala

index cf3c19a..8fa0639 100644 (file)
@@ -43,6 +43,7 @@ import gr.grnet.aquarium.actor.RouterRole
 import gr.grnet.aquarium.store.{IMEventStore, LocalFSEventStore}
 import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
 import gr.grnet.aquarium.actor.message.event.ProcessIMEvent
+import gr.grnet.aquarium.util.date.MutableDateCalc
 
 /**
  * A [[gr.grnet.aquarium.connector.handler.PayloadHandler]] for
@@ -91,27 +92,64 @@ class IMEventPayloadHandler(aquarium: Aquarium, logger: Logger)
         // 1. Check if the same ID exists. Note that we use the ID sent by the event producer.
         //    It is a requirement that this ID is unique.
         val store = aquarium.imEventStore
+
+        val imEventDebugString = "%s(ID=%s)".format(className, id)
+
         store.findIMEventById(id) match {
          case Some(_) ⇒
            // Reject the duplicate
-           Some(HandlerResultReject("Duplicate %s with id = %s".format(className, id)))
+           logger.debug("Rejecting duplicate ID for %s".format(imEventDebugString))
+           Some(HandlerResultReject("Duplicate ID for %s".format(imEventDebugString)))
 
          case None ⇒
            // 2. Check that the new event is not older than our most recent event in DB.
            //    Sorry. We cannot tolerate out-of-order events here, since they really mess with the
            //    agreements selection and thus with the charging procedure.
            //
-           // TODO: We really need to store these events anyway but somewhere else (BadEventsStore?)
+           // 2.1 The only exception is the very first activation ever. We allow late arrival, since
+           //     the rest of Aquarium does nothing (but accumulate events) if the user has never
+           //     been activated.
+           //
+           // TODO: We really need to store these bad events anyway but somewhere else (BadEventsStore?)
            val userID = imEvent.userID
 
            store.findLatestIMEventByUserID(userID) match {
              case Some(latestStoredEvent) ⇒
                val occurredMillis = imEvent.occurredMillis
                val latestOccurredMillis = latestStoredEvent.occurredMillis
+
                if(occurredMillis < latestOccurredMillis) {
-                 Some(HandlerResultReject(
-                   "Out of order %s (%s < %s)".format(className, occurredMillis, latestOccurredMillis)))
+                 // OK this is older than our most recent event. Essentially a glimpse in the past.
+                 def rejectMessage = {
+                   logger.debug("Rejecting newer %s. [%s] is newer than [%s]".format(
+                     className,
+                     new MutableDateCalc(occurredMillis).toYYYYMMDDHHMMSSSSS,
+                     new MutableDateCalc(latestOccurredMillis).toYYYYMMDDHHMMSSSSS
+                   ))
+
+                   Some(
+                     HandlerResultReject(
+                       "Out of order %s (%s < %s)".format(
+                         className, occurredMillis, latestOccurredMillis)))
+                 }
+
+                 // Has the user been activated before?
+                 store.findFirstIsActiveIMEventByUserID(userID) match {
+                   case Some(_) ⇒
+                     // Yes, so the new event must be rejected
+                     rejectMessage
+
+                   case None ⇒
+                     // No. Process the new event only if it is an activation.
+                     if(imEvent.isActive) {
+                       logger.info("First activation for userID=%s in %s".format(userID, imEventDebugString))
+                       None
+                     } else {
+                       rejectMessage
+                     }
+                 }
                } else {
+                 // We accept all newer events
                  None
                }