Merge stuff from benchmark branch
authorChristos KK Loverdos <loverdos@gmail.com>
Mon, 16 Jan 2012 13:04:17 +0000 (15:04 +0200)
committerChristos KK Loverdos <loverdos@gmail.com>
Mon, 16 Jan 2012 13:04:48 +0000 (15:04 +0200)
src/main/scala/gr/grnet/aquarium/logic/events/ResourceEvent.scala
src/main/scala/gr/grnet/aquarium/logic/events/WalletEntry.scala
src/main/scala/gr/grnet/aquarium/processor/actor/ResourceEventProcessorService.scala
src/main/scala/gr/grnet/aquarium/store/WalletEntryStore.scala
src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala
src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala
src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala
src/main/scala/gr/grnet/aquarium/user/actor/UserActorsLRU.scala

index 33e067c..171b570 100644 (file)
@@ -165,4 +165,6 @@ object ResourceEvent {
     final val vmId = "vmId"
     final val action = "action" // "on", "off"
   }
+
+  def emtpy = ResourceEvent("", 0, 0, "", "1", "", "", 0, Map())
 }
\ No newline at end of file
index c4818dd..78c1fee 100644 (file)
@@ -25,7 +25,7 @@ case class WalletEntry(
   extends AquariumEvent(id, occurredMillis, receivedMillis) {
 
   assert(occurredMillis > 0)
-  assert(value > 0F)
+  assert(value >= 0F)
   assert(!userId.isEmpty)
 
   def validate = true
index 4b2d009..17dbe00 100644 (file)
@@ -21,7 +21,7 @@ final class ResourceEventProcessorService extends EventProcessorService[Resource
 
   override def forward(event: ResourceEvent): Unit = {
     val businessLogicDispacther = _configurator.actorProvider.actorForRole(DispatcherRole)
-    //businessLogicDispacther ! ProcessResourceEvent(event)
+    businessLogicDispacther ! ProcessResourceEvent(event)
   }
 
   override def exists(event: ResourceEvent): Boolean =
index ba45bfd..77a825c 100644 (file)
@@ -31,4 +31,6 @@ trait WalletEntryStore {
    */
   def findPreviousEntry(userId: String, resource: String,
                         instanceId: String, finalized: Option[Boolean]): List[WalletEntry]
+
+  def findWalletEntriesAfter(userId: String, from: Date): List[WalletEntry]
 }
\ No newline at end of file
index 8e84d1f..950621d 100644 (file)
@@ -120,7 +120,7 @@ class MongoDBStore(
       while(cursor.hasNext) {
         buffer += MongoDBStore.dbObjectToResourceEvent(cursor.next())
       }
-      buffer.toList
+      buffer.toList.sortWith(_sortByTimestampAsc)
     } finally {
       cursor.close()
     }
@@ -130,7 +130,7 @@ class MongoDBStore(
                                instid: Option[String], upTo: Long) : List[ResourceEvent] = {
     val query = new BasicDBObject()
     query.put(ResourceJsonNames.userId, userId)
-    query.put(ResourceJsonNames.occurredMillis, new BasicDBObject("$lte", upTo))
+    query.put(ResourceJsonNames.occurredMillis, new BasicDBObject("$lt", upTo))
     query.put(ResourceJsonNames.resource, resName)
 
     instid match {
@@ -151,7 +151,7 @@ class MongoDBStore(
       while(cursor.hasNext) {
         buffer += MongoDBStore.dbObjectToResourceEvent(cursor.next())
       }
-      buffer.toList
+      buffer.toList.sortWith(_sortByTimestampAsc)
     } finally {
       cursor.close()
     }
@@ -207,6 +207,14 @@ class MongoDBStore(
     MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
   }
 
+  def findWalletEntriesAfter(userId: String, from: Date) : List[WalletEntry] = {
+    val q = new BasicDBObject()
+    q.put(ResourceJsonNames.occurredMillis, new BasicDBObject("$gt", from.getTime))
+    q.put(ResourceJsonNames.userId, userId)
+
+    MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
+  }
+
   def findLatestUserWalletEntries(userId: String) = {
     Maybe {
       val orderBy = new BasicDBObject(ResourceJsonNames.occurredMillis, -1) // -1 is descending order
index 3bc4b47..f9b22ee 100644 (file)
@@ -37,6 +37,8 @@ package gr.grnet.aquarium
 package user
 
 import util.json.JsonSupport
+import logic.accounting.Policy
+import logic.accounting.dsl.{DiscreteCostPolicy, ContinuousCostPolicy, OnOffCostPolicy}
 
 /**
  * Snapshot of data that are user-related.
@@ -106,7 +108,12 @@ case class OwnedResourcesSnapshot(data: List[ResourceInstanceSnapshot], snapshot
     val newData = oldRCInstanceOpt match {
       case Some(currentRCInstance) ⇒
         // Need to delete the old one and add the new one
-        val newValue = newRCInstance.data + currentRCInstance.data
+        val newValue = Policy.policy.findResource(name).get.costpolicy match {
+          case OnOffCostPolicy => newRCInstance.data
+          case ContinuousCostPolicy => newRCInstance.data + currentRCInstance.data
+          case DiscreteCostPolicy => newRCInstance.data
+        }
+
         newRCInstance.copy(data = newValue) :: (data.filterNot(_.isResource(name, instanceId)))
       case None ⇒
         // Resource not found, so this is the first time and we just add the new snapshot
index e9beaa6..354a53d 100644 (file)
@@ -540,7 +540,7 @@ class UserActor extends AquariumActor
    *
    */
   private[this] def ensureUserState(): Unit = {
-    if(null eq this._userState) {
+    /*if(null eq this._userState) {
       findUserState(this._userId) match {
         case Just(userState) ⇒
           DEBUG("Loaded user state %s from DB", userState)
@@ -554,13 +554,12 @@ class UserActor extends AquariumActor
           rebuildState(0)
           WARN("Request for unknown (to Aquarium) user")
       }
-    }
-    /*
+    }*/
+
     if (_userState == null)
       rebuildState(0)
     else
-      rebuildState(_userState.newestSnapshotTime, System.currentTimeMillis())
-    */
+      rebuildState(_userState.oldestSnapshotTime, System.currentTimeMillis())
   }
 
   /**
@@ -592,7 +591,7 @@ class UserActor extends AquariumActor
 
     //Rebuild state from wallet entries
     val wallet = _configurator.storeProvider.walletEntryStore
-    val walletEnties = wallet.findUserWalletEntriesFromTo(_userId, new Date(from), new Date(to))
+    val walletEnties = wallet.findWalletEntriesAfter(_userId, new Date(from))
     val numWalletEntries = walletEnties.size
     _userState = replayWalletEntries(_userState, walletEnties, from, to)
 
@@ -607,7 +606,7 @@ class UserActor extends AquariumActor
    * Create an empty state for a user
    */
   def createBlankState = {
-    val now = 0
+    val now = System.currentTimeMillis()
     val agreement = Policy.policy.findAgreement("default")
 
     this._userState = UserState(
@@ -665,6 +664,10 @@ class UserActor extends AquariumActor
           res = res.addOrUpdateResourceSnapshot(name,
             instanceId, e.value, e.occurredMillis)._1
     }
+    if (!events.isEmpty) {
+      val snapTime = events.map{e => e.occurredMillis}.max
+      res = res.copy(snapshotTime = snapTime)
+    }
     initState.copy(ownedResources = res)
   }
 
@@ -679,7 +682,11 @@ class UserActor extends AquariumActor
       .foreach {
         w =>
           val newVal = cred.data + w.value
-          cred = cred.copy(data = newVal, snapshotTime = w.occurredMillis)
+          cred = cred.copy(data = newVal)
+    }
+    if (!events.isEmpty) {
+      val snapTime = events.map{e => e.occurredMillis}.max
+      cred = cred.copy(snapshotTime = snapTime)
     }
     initState.copy(credits = cred)
   }
@@ -690,8 +697,7 @@ class UserActor extends AquariumActor
   def calcWalletEntries(): Unit = {
     ensureUserState
 
-    if (_userState.ownedResources.snapshotTime < _userState.credits.snapshotTime) return List()
-    println("%d %d".format(_userState.ownedResources.snapshotTime, _userState.credits.snapshotTime))
+    if (_userState.ownedResources.snapshotTime < _userState.credits.snapshotTime) return
     val eventsDB = _configurator.storeProvider.resourceEventStore
     val resourceEvents = eventsDB.findResourceEventsByUserIdAfterTimestamp(_userId, _userState.credits.snapshotTime)
     val policy = Policy.policy
@@ -701,32 +707,45 @@ class UserActor extends AquariumActor
         // TODO: Check that agreement exists
         val agr = policy.findAgreement(_userState.agreement.data).get
 
-        // Since we always process resource updates before producing wallet
-        // entries, it is safe to assume that a resource instance has already
-        // been recorded in the state and avoid further checks. If not, a
-        // bug lurks somewhere, so just let it crash.
-        val prevUpdate = _userState.ownedResources.findResourceSnapshot(
-          ev.resource, ev.getInstanceId(policy)).get.snapshotTime
-        val resource = policy.findResource(ev.resource).get
+        val resource = policy.findResource(ev.resource) match {
+          case Some(x) => x
+          case None =>
+            val errMsg = "Cannot find resource: %s".format(ev.resource)
+            ERROR(errMsg)
+            throw new AccountingException(errMsg) // FIXME: to throw or not to throw?
+        }
 
         val instid = resource.isComplex match {
           case true => ev.details.get(resource.asInstanceOf[DSLComplexResource].descriminatorField)
           case false => None
         }
-        val resHistory = eventsDB.findResourceEventHistory(_userId, ev.resource, instid, ev.occurredMillis)
 
-        var res = OwnedResourcesSnapshot(List(), 0)
-        resHistory.foreach {
-          e => 
-            res = res.addOrUpdateResourceSnapshot(ev.resource, ev.getInstanceId(policy), ev.value, ev.occurredMillis)._1
+        var curValue = 0F
+        var lastUpdate = _userState.ownedResources.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)) match {
+          case Some(x) => x.snapshotTime
+          case None => Long.MaxValue //To trigger recalculation
         }
 
-        val curValue = res.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)).get.data
-        val lastUpdate = res.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)).get.snapshotTime
+        if (lastUpdate > ev.occurredMillis) {
+          //Event is older that current state. Rebuild state up to event timestamp
+          val resHistory =
+            ResourceEvent("", 0, 0, _userId, "1", ev.resource, ev.eventVersion, 0, ev.details) ::
+            eventsDB.findResourceEventHistory(_userId, ev.resource, instid, ev.occurredMillis)
+          INFO("%d older entries for resource %s, user %s up to %d".format(resHistory.size, ev.resource, _userId, ev.occurredMillis));
+          var res = OwnedResourcesSnapshot(List(), 0)
+          resHistory.foreach {
+            e =>
+              res = res.addOrUpdateResourceSnapshot(e.resource, e.getInstanceId(policy), e.value, e.occurredMillis)._1
+          }
+          lastUpdate = res.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)).get.snapshotTime
+          curValue = res.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)).get.data
+        } else {
+          curValue = _userState.ownedResources.findResourceSnapshot(ev.resource, ev.getInstanceId(policy)).get.data
+        }
 
         val entries = chargeEvent(ev, agr, curValue, new Date(lastUpdate),
           findRelatedEntries(resource, ev.getInstanceId(policy)))
-
+        INFO("PERF: CHARGE %s %d".format(ev.id, System.currentTimeMillis))
         entries match {
           case Just(x) => x
           case Failed(e, r) => List()
@@ -767,8 +786,9 @@ class UserActor extends AquariumActor
       if(resourceEvent.userId != this._userId) {
         ERROR("Received %s but my userId = %s".format(m, this._userId))
       } else {
-        ensureUserState()
-        processResourceEvent(resourceEvent, true)
+        //ensureUserState()
+        calcWalletEntries()
+        //processResourceEvent(resourceEvent, true)
       }
 
     case m @ ProcessUserEvent(userEvent) ⇒
@@ -780,7 +800,7 @@ class UserActor extends AquariumActor
       }
 
     case m @ RequestUserBalance(userId, timestamp) ⇒
-      if(this._userId != userId) {
+      /*if(this._userId != userId) {
         ERROR("Received %s but my userId = %s".format(m, this._userId))
         // TODO: throw an exception here
       } else {
@@ -809,10 +829,11 @@ class UserActor extends AquariumActor
           val errMsg = "Could not load user state for %s".format(m)
           ERROR(errMsg)
           self reply ResponseUserBalance(userId, 0, Some(errMsg))
-        }
-        //calcWalletEntries()
+        }*/
+        if (System.currentTimeMillis() - _userState.newestSnapshotTime > 60 * 1000)
+          calcWalletEntries()
         self reply UserResponseGetBalance(userId, _userState.credits.data)
-      }
+      //}
 
     case m @ UserRequestGetState(userId, timestamp) ⇒
       if(this._userId != userId) {
@@ -828,7 +849,7 @@ class UserActor extends AquariumActor
 
   override def postStop {
     DEBUG("Stopping, saving state")
-    saveUserState
+    //saveUserState
   }
 
   private[this] def DEBUG(fmt: String, args: Any*) =
index 67dced0..cd2aeab 100644 (file)
@@ -75,7 +75,7 @@ class UserActorsLRU(val upperWaterMark: Int, val lowerWatermark: Int) extends Li
     val accessed  = mapAsScalaMap(_cache.getLatestAccessedItems(_cache.size()))
 
     //Send the poison pill and make sure that all futures have been returned
-    val futures = accessed.keys.map{x => _cache.get(x).stop()}
+    val futures = accessed.keysIterator.map{x => _cache.get(x).stop()}
   }
 
   def size: Int   = _cache.size()