Alternative way to calculate state, with entry points commented out
authorGeorgios Gousios <gousiosg@gmail.com>
Thu, 12 Jan 2012 11:38:08 +0000 (13:38 +0200)
committerGeorgios Gousios <gousiosg@gmail.com>
Thu, 12 Jan 2012 11:38:08 +0000 (13:38 +0200)
src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala

index bb28536..e9beaa6 100644 (file)
@@ -555,6 +555,12 @@ class UserActor extends AquariumActor
           WARN("Request for unknown (to Aquarium) user")
       }
     }
+    /*
+    if (_userState == null)
+      rebuildState(0)
+    else
+      rebuildState(_userState.newestSnapshotTime, System.currentTimeMillis())
+    */
   }
 
   /**
@@ -588,7 +594,7 @@ class UserActor extends AquariumActor
     val wallet = _configurator.storeProvider.walletEntryStore
     val walletEnties = wallet.findUserWalletEntriesFromTo(_userId, new Date(from), new Date(to))
     val numWalletEntries = walletEnties.size
-    _userState = replayResourceEvents(_userState, resourceEvents, from, to)
+    _userState = replayWalletEntries(_userState, walletEnties, from, to)
 
     INFO(("Rebuilt state from %d events (%d user events, " +
       "%d resource events, %d wallet entries) in %d msec").format(
@@ -671,12 +677,70 @@ class UserActor extends AquariumActor
     events
       .filter(e => e.occurredMillis >= from && e.occurredMillis < to)
       .foreach {
-        w => cred = cred.copy(data = w.value, snapshotTime = w.occurredMillis)
+        w =>
+          val newVal = cred.data + w.value
+          cred = cred.copy(data = newVal, snapshotTime = w.occurredMillis)
     }
     initState.copy(credits = cred)
   }
 
   /**
+   * Update wallet entries for all unprocessed events
+   */
+  def calcWalletEntries(): Unit = {
+    ensureUserState
+
+    if (_userState.ownedResources.snapshotTime < _userState.credits.snapshotTime) return List()
+    println("%d %d".format(_userState.ownedResources.snapshotTime, _userState.credits.snapshotTime))
+    val eventsDB = _configurator.storeProvider.resourceEventStore
+    val resourceEvents = eventsDB.findResourceEventsByUserIdAfterTimestamp(_userId, _userState.credits.snapshotTime)
+    val policy = Policy.policy
+
+    val walletEntries = resourceEvents.map {
+      ev =>
+        // TODO: Check that agreement exists
+        val agr = policy.findAgreement(_userState.agreement.data).get
+
+        // Since we always process resource updates before producing wallet
+        // entries, it is safe to assume that a resource instance has already
+        // been recorded in the state and avoid further checks. If not, a
+        // bug lurks somewhere, so just let it crash.
+        val prevUpdate = _userState.ownedResources.findResourceSnapshot(
+          ev.resource, ev.getInstanceId(policy)).get.snapshotTime
+        val resource = policy.findResource(ev.resource).get
+
+        val instid = resource.isComplex match {
+          case true => ev.details.get(resource.asInstanceOf[DSLComplexResource].descriminatorField)
+          case false => None
+        }
+        val resHistory = eventsDB.findResourceEventHistory(_userId, ev.resource, instid, ev.occurredMillis)
+
+        var res = OwnedResourcesSnapshot(List(), 0)
+        resHistory.foreach {
+          e => 
+            res = res.addOrUpdateResourceSnapshot(ev.resource, ev.getInstanceId(policy), ev.value, ev.occurredMillis)._1
+        }
+
+        val curValue = res.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)).get.data
+        val lastUpdate = res.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)).get.snapshotTime
+
+        val entries = chargeEvent(ev, agr, curValue, new Date(lastUpdate),
+          findRelatedEntries(resource, ev.getInstanceId(policy)))
+
+        entries match {
+          case Just(x) => x
+          case Failed(e, r) => List()
+          case NoVal => List()
+        }
+    }.flatten
+
+    val walletDB = _configurator.storeProvider.walletEntryStore
+    walletEntries.foreach(w => walletDB.storeWalletEntry(w))
+
+    ensureUserState
+  }
+
+  /**
    * Persist current user state
    */
   private[this] def saveUserState(): Unit = {
@@ -746,6 +810,8 @@ class UserActor extends AquariumActor
           ERROR(errMsg)
           self reply ResponseUserBalance(userId, 0, Some(errMsg))
         }
+        //calcWalletEntries()
+        self reply UserResponseGetBalance(userId, _userState.credits.data)
       }
 
     case m @ UserRequestGetState(userId, timestamp) ⇒