Cleanup some code before the coming changes
authorChristos KK Loverdos <loverdos@gmail.com>
Tue, 17 Jan 2012 14:34:56 +0000 (16:34 +0200)
committerChristos KK Loverdos <loverdos@gmail.com>
Tue, 17 Jan 2012 14:34:56 +0000 (16:34 +0200)
src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala

index 0e2b970..0857dbc 100644 (file)
@@ -65,198 +65,6 @@ class UserActor extends AquariumActor
 
   private[this] def _configurator: Configurator = Configurator.MasterConfigurator
 
-  private[this] def processResourceEvent(resourceEvent: ResourceEvent, checkForOlderEvents: Boolean): Unit = {
-    if(checkForOlderEvents) {
-      DEBUG("Checking for events older than %s" format resourceEvent)
-      processOlderResourceEvents(resourceEvent)
-    }
-
-    justProcessTheResourceEvent(resourceEvent, "ACTUAL")
-  }
-
-
-  /**
-   * Given an "onoff" event, we try to locate all unprocessed resource events that precede this one.
-   */
-  def findOlderResourceEventsForOnOff(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = {
-    Nil
-  }
-
-  def findOlderResourceEventsForOther(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = {
-    Nil
-  }
-
-  /**
-   * Find resource events that precede the given one and are unprocessed.
-   */
-  private[this] def findOlderResourceEvents(rcEvent: ResourceEvent, policy: DSLPolicy): List[ResourceEvent] = {
-    if(rcEvent.isOnOffEvent(policy)) {
-      findOlderResourceEventsForOnOff(rcEvent, policy)
-    } else {
-      findOlderResourceEventsForOther(rcEvent, policy)
-    }
-  }
-
-  /**
-   * Find and process older resource events.
-   *
-   * Older resource events are found based on the latest credit calculation, that is the latest
-   * wallet entry. If there are resource events past that wallet entry, then we deduce that no wallet entries
-   * have been calculated for these resource events and start from there.
-   */
-  private[this] def processOlderResourceEvents(resourceEvent: ResourceEvent): Unit = {
-    assert(_userId == resourceEvent.userId)
-    val rceId = resourceEvent.id
-    val userId = resourceEvent.userId
-    val resourceEventStore = _configurator.resourceEventStore
-    val walletEntriesStore = _configurator.walletStore
-
-    // 1. Find latest wallet entry
-    val latestWalletEntriesM = walletEntriesStore.findLatestUserWalletEntries(userId)
-    latestWalletEntriesM match {
-      case Just(latestWalletEntries) ⇒
-        // The time on which we base the selection of the older events
-        val selectionTime = latestWalletEntries.head.occurredMillis
-
-        // 2. Now find all resource events past the time of the latest wallet entry.
-        //    These events have not been processed, except probably those ones
-        //    that have the same `occurredMillis` with `selectionTime`
-        val oldRCEvents = resourceEventStore.findResourceEventsByUserIdAfterTimestamp(userId, selectionTime)
-
-        // 3. Filter out those resource events for which no wallet entry has actually been
-        //    produced.
-        val rcEventsToProcess = for {
-          oldRCEvent        <- oldRCEvents
-          oldRCEventId      = oldRCEvent.id
-          latestWalletEntry <- latestWalletEntries if(!latestWalletEntry.fromResourceEvent(oldRCEventId) && rceId != oldRCEventId)
-        } yield {
-          oldRCEvent
-        }
-
-        DEBUG("Found %s events older than %s".format(rcEventsToProcess.size, resourceEvent))
-
-        for {
-          rcEventToProcess <- rcEventsToProcess
-        } {
-          justProcessTheResourceEvent(rcEventToProcess, "OLDER")
-        }
-      case NoVal ⇒
-        DEBUG("No events to process older than %s".format(resourceEvent))
-      case Failed(e, m) ⇒
-        ERROR("[%s][%s] %s".format(e.getClass.getName, m, e.getMessage))
-    }
-  }
-
-  private[this] def _storeWalletEntries(walletEntries: List[WalletEntry], allowNonFinalized: Boolean): Unit = {
-    val walletEntriesStore = _configurator.walletStore
-    for(walletEntry <- walletEntries) {
-      val allow = walletEntry.finalized || allowNonFinalized
-      if(allow) {
-        walletEntriesStore.storeWalletEntry(walletEntry)
-      }
-    }
-  }
-
-  private[this] def _calcNewCreditSum(walletEntries: List[WalletEntry]): Double = {
-    val newCredits = for {
-      walletEntry <- walletEntries if(walletEntry.finalized)
-    } yield walletEntry.value.toDouble
-
-    newCredits.sum
-  }
-
-  /**
-   * Process the resource event as if nothing else matters. Just do it.
-   */
-  private[this] def justProcessTheResourceEvent(ev: ResourceEvent, logLabel: String): Unit = {
-    val start = System.currentTimeMillis
-    DEBUG("Processing [%s] %s".format(logLabel, ev))
-
-    // Initially, the user state (regarding resources) is empty.
-    // So we have to compensate for both a totally empty resource state
-    // and the update with new values.
-
-    // 1. Find the resource definition
-    val policy = Policy.policy
-    policy.findResource(ev.resource) match {
-      case Some(resource) ⇒
-        // 2. Get the instance id and value for the resource
-        val instanceIdM = resource match {
-          // 2.1 If it is complex, from the details map, get the field which holds the instanceId
-          case DSLComplexResource(name, unit, costPolicy, descriminatorField) ⇒
-            ev.details.get(descriminatorField) match {
-              case Some(instanceId) ⇒
-                Just(instanceId)
-              case None ⇒
-                // We should have some value under this key here....
-                Failed(throw new AccountingException("")) //TODO: See what to do here
-            }
-          // 2.2 If it is simple, ...
-          case DSLSimpleResource(name, unit, costPolicy) ⇒
-            // ... by convention, the instanceId of a simple resource is just "1"
-            // @see [[gr.grnet.aquarium.user.OwnedResourcesSnapshot]]
-            Just("1")
-        }
-
-        // 3. Did we get a valid instanceId?
-        instanceIdM match {
-          // 3.1 Yes, time to get/update the current state
-          case Just(instanceId) ⇒
-            val oldOwnedResources = _userState.ownedResources
-
-            // A. First state diff: the modified resource value
-            val StateChangeMillis = TimeHelpers.nowMillis
-            val (newOwnedResources, oldRCInstanceOpt, newRCInstance) = oldOwnedResources.
-              addOrUpdateResourceSnapshot(resource.name, instanceId, ev.value, ev.occurredMillis)
-            val previousRCUpdateTime = oldRCInstanceOpt.map(_.snapshotTime).getOrElse(newRCInstance.snapshotTime)
-
-            // Calculate the wallet entries generated from this resource event
-            _userState.maybeDSLAgreement match {
-              case Just(agreement) ⇒
-                val walletEntriesM = chargeEvent(ev, agreement, ev.value,
-                  new Date(previousRCUpdateTime),
-                  findRelatedEntries(resource, ev.getInstanceId(policy)))
-                walletEntriesM match {
-                  case Just(walletEntries) ⇒
-                    _storeWalletEntries(walletEntries, true)
-
-                    // B. Second state diff: the new credits
-                    val newCreditsSum = _calcNewCreditSum(walletEntries)
-                    val oldCredits    = _userState.safeCredits.data
-                    val newCredits = CreditSnapshot(oldCredits + newCreditsSum, StateChangeMillis)
-
-                    // Finally, the userState change
-                    DEBUG("Credits   = %s".format(this._userId, newCredits))
-                    DEBUG("Resources = %s".format(this._userId, newOwnedResources))
-                    this._userState = this._userState.copy(
-                      credits = newCredits,
-                      ownedResources = newOwnedResources
-                    )
-                  case NoVal ⇒
-                    DEBUG("No wallet entries generated for %s".format(ev))
-                  case failed @ Failed(e, m) ⇒
-                    failed
-                }
-                
-              case NoVal ⇒
-                Failed(new UserDataSnapshotException("No agreement snapshot found for user %s".format(this._userId)))
-              case failed @ Failed(e, m) ⇒
-                failed
-            }
-          // 3.2 No, no luck, this is an error
-          case NoVal ⇒
-            Failed(new UserDataSnapshotException("No instanceId for resource %s of user %s".format(resource, this._userId)))
-          case failed @ Failed(e, m) ⇒
-            failed
-        }
-      // No resource definition found, this is an error
-      case None ⇒ // Policy.policy.findResource(ev.resource)
-        Failed(new UserDataSnapshotException("No resource %s found for user %s".format(ev.resource, this._userId)))
-    }
-
-    DEBUG("Finished %s time: %d ms".format(ev.id, System.currentTimeMillis - start))
-  }
-
   private[this] def processCreateUser(event: UserEvent): Unit = {
     val userId = event.userId
     DEBUG("Creating user from state %s", event)
@@ -319,36 +127,12 @@ class UserActor extends AquariumActor
   }
 
   /**
-   * Try to load from the DB the latest known info (snapshot data) for the given user.
-   */
-  private[this] def findUserState(userId: String): Maybe[UserState] = {
-    val usersDB = _configurator.storeProvider.userStateStore
-    usersDB.findUserStateByUserId(userId)
-  }
-
-  /**
    * Tries to makes sure that the internal user state exists.
    *
    * May contact the [[gr.grnet.aquarium.store.UserStateStore]] for that.
    *
    */
   private[this] def ensureUserState(): Unit = {
-    /*if(null eq this._userState) {
-      findUserState(this._userId) match {
-        case Just(userState) ⇒
-          DEBUG("Loaded user state %s from DB", userState)
-          //TODO: May be out of sync with the event store, rebuild it here
-          this._userState = userState
-          rebuildState(this._userState.oldestSnapshotTime)
-        case Failed(e, m) ⇒
-          ERROR("While loading user state from DB: [%s][%s] %s", e.getClass.getName, e.getMessage, m)
-        case NoVal ⇒
-          //TODO: Rebuild actor state here.
-          rebuildState(0)
-          WARN("Request for unknown (to Aquarium) user")
-      }
-    }*/
-
     if (_userState == null)
       rebuildState(0)
     else
@@ -594,40 +378,9 @@ class UserActor extends AquariumActor
       }
 
     case m @ RequestUserBalance(userId, timestamp) ⇒
-      /*if(this._userId != userId) {
-        ERROR("Received %s but my userId = %s".format(m, this._userId))
-        // TODO: throw an exception here
-      } else {
-        // This is the big party.
-        // Get the user state, if it exists and make sure it is not stale.
-        ensureUserState()
-
-        // Do we have a user state?
-        if(_userState ne null) {
-          // Yep, we do. See what there is inside it.
-          val credits = _userState.credits
-          val creditsTimestamp = credits.snapshotTime
-
-          // Check if data is stale
-          if(creditsTimestamp + _timestampTheshold > timestamp) {
-            // No, it's OK
-            self reply UserResponseGetBalance(userId, credits.data)
-          } else {
-            // Yep, data is stale and must recompute balance
-            // FIXME: implement
-            ERROR("FIXME: Should have computed a new value for %s".format(credits))
-            self reply UserResponseGetBalance(userId, credits.data)
-          }
-        } else {
-          // No user state exists. This is an error.
-          val errMsg = "Could not load user state for %s".format(m)
-          ERROR(errMsg)
-          self reply ResponseUserBalance(userId, 0, Some(errMsg))
-        }*/
-        if (System.currentTimeMillis() - _userState.newestSnapshotTime > 60 * 1000)
-          calcWalletEntries()
-        self reply UserResponseGetBalance(userId, _userState.credits.data)
-      //}
+      if (System.currentTimeMillis() - _userState.newestSnapshotTime > 60 * 1000)
+        calcWalletEntries()
+      self reply UserResponseGetBalance(userId, _userState.credits.data)
 
     case m @ UserRequestGetState(userId, timestamp) ⇒
       if(this._userId != userId) {