Events are now replayed correctly
authorGeorgios Gousios <gousiosg@gmail.com>
Wed, 11 Jan 2012 20:06:25 +0000 (22:06 +0200)
committerGeorgios Gousios <gousiosg@gmail.com>
Wed, 11 Jan 2012 20:06:25 +0000 (22:06 +0200)
src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala
src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala

index 2bebab7..3bc4b47 100644 (file)
 package gr.grnet.aquarium
 package user
 
-import logic.accounting.dsl.{DSLResource, DSLAgreement}
-import collection.mutable
-import logic.events.WalletEntry
 import util.json.JsonSupport
 
-
 /**
  * Snapshot of data that are user-related.
  *
@@ -95,14 +91,11 @@ case class ResourceInstanceSnapshot(
     this.instanceId == instanceId
 }
 case class OwnedResourcesSnapshot(data: List[ResourceInstanceSnapshot], snapshotTime: Long)
-  extends UserDataSnapshot[List[ResourceInstanceSnapshot]] {
+  extends UserDataSnapshot[List[ResourceInstanceSnapshot]] with JsonSupport {
+
+  def findResourceSnapshot(name: String, instanceId: String): Option[ResourceInstanceSnapshot] =
+    data.find { x => name.equals(x.name) && instanceId.equals(x.instanceId) }
 
-  def findResourceSnapshot(name: String, instanceId: String): Option[ResourceInstanceSnapshot] = {
-    data find {
-      case ResourceInstanceSnapshot(name, instanceId, _, _) ⇒ true
-      case _ ⇒ false
-    }
-  }
   
   def addOrUpdateResourceSnapshot(name: String,
                                   instanceId: String,
@@ -113,7 +106,8 @@ case class OwnedResourcesSnapshot(data: List[ResourceInstanceSnapshot], snapshot
     val newData = oldRCInstanceOpt match {
       case Some(currentRCInstance) ⇒
         // Need to delete the old one and add the new one
-        newRCInstance :: (data.filterNot(_.isResource(name, instanceId)))
+        val newValue = newRCInstance.data + currentRCInstance.data
+        newRCInstance.copy(data = newValue) :: (data.filterNot(_.isResource(name, instanceId)))
       case None ⇒
         // Resource not found, so this is the first time and we just add the new snapshot
         newRCInstance :: data
index 9d75908..bb28536 100644 (file)
@@ -40,11 +40,11 @@ import gr.grnet.aquarium.Configurator
 import gr.grnet.aquarium.processor.actor._
 import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
 import gr.grnet.aquarium.logic.accounting.{AccountingException, Policy, Accounting}
-import gr.grnet.aquarium.util.{TimeHelpers, Loggable}
 import gr.grnet.aquarium.user._
 import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry, ResourceEvent}
 import gr.grnet.aquarium.logic.accounting.dsl.{DSLPolicy, DSLResource, DSLSimpleResource, DSLComplexResource}
 import java.util.Date
+import gr.grnet.aquarium.util.{DateUtils, TimeHelpers, Loggable}
 
 
 /**
@@ -52,7 +52,8 @@ import java.util.Date
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 
-class UserActor extends AquariumActor with Loggable with Accounting {
+class UserActor extends AquariumActor
+  with Loggable with Accounting with DateUtils {
   @volatile
   private[this] var _userId: String = _
   @volatile
@@ -544,13 +545,13 @@ class UserActor extends AquariumActor with Loggable with Accounting {
         case Just(userState) ⇒
           DEBUG("Loaded user state %s from DB", userState)
           //TODO: May be out of sync with the event store, rebuild it here
-          //rebuildState(this._userState.oldestSnapshotTime)
           this._userState = userState
+          rebuildState(this._userState.oldestSnapshotTime)
         case Failed(e, m) ⇒
           ERROR("While loading user state from DB: [%s][%s] %s", e.getClass.getName, e.getMessage, m)
         case NoVal ⇒
           //TODO: Rebuild actor state here.
-          //rebuildState(0)
+          rebuildState(0)
           WARN("Request for unknown (to Aquarium) user")
       }
     }
@@ -560,7 +561,8 @@ class UserActor extends AquariumActor with Loggable with Accounting {
    * Replay the event log for all events that affect the user state, starting
    * from the provided time instant.
    */
-  def rebuildState(from: Long): Unit = rebuildState(from, Integer.MAX_VALUE)
+  def rebuildState(from: Long): Unit =
+    rebuildState(from, oneYearAhead(new Date(), new Date(Long.MaxValue)).getTime)
 
   /**
    * Replay the event log for all events that affect the user state.
@@ -625,15 +627,15 @@ class UserActor extends AquariumActor with Loggable with Accounting {
     events
       .filter(e => e.occurredMillis >= from && e.occurredMillis < to)
       .foreach {
-      e =>
-        act = act.copy(
-          data = e.isStateActive, snapshotTime = e.occurredMillis)
-        // TODO: Implement the following
-        //_userState.agreement = _userState.agreement.copy(
-        //  data = e.newAgreement, e.occurredMillis)
-
-        rol = rol.copy(data = e.roles,
-          snapshotTime = e.occurredMillis)
+        e =>
+          act = act.copy(
+            data = e.isStateActive, snapshotTime = e.occurredMillis)
+          // TODO: Implement the following
+          //_userState.agreement = _userState.agreement.copy(
+          //  data = e.newAgreement, e.occurredMillis)
+
+          rol = rol.copy(data = e.roles,
+            snapshotTime = e.occurredMillis)
     }
     initState.copy(active = act, roles = rol)
   }
@@ -654,7 +656,6 @@ class UserActor extends AquariumActor with Loggable with Accounting {
           }
 
           val instanceId = e.getInstanceId(Policy.policy)
-
           res = res.addOrUpdateResourceSnapshot(name,
             instanceId, e.value, e.occurredMillis)._1
     }
@@ -754,6 +755,7 @@ class UserActor extends AquariumActor with Loggable with Accounting {
       } else {
         // FIXME: implement
         ERROR("FIXME: Should have properly computed the user state")
+        ensureUserState()
         self reply UserResponseGetState(userId, this._userState)
       }
   }