Fix API breakage from previous upgrade
[aquarium] / src / main / scala / gr / grnet / aquarium / user / actor / UserActor.scala
index 15e27e6..6732e8e 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2011 GRNET S.A. All rights reserved.
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or
  * without modification, are permitted provided that the following
@@ -37,14 +37,15 @@ package gr.grnet.aquarium.user.actor
 
 import gr.grnet.aquarium.actor._
 import gr.grnet.aquarium.Configurator
-import java.util.Date
 import gr.grnet.aquarium.processor.actor._
-import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
-import gr.grnet.aquarium.logic.accounting.{AccountingException, Policy, Accounting}
-import gr.grnet.aquarium.util.{TimeHelpers, Loggable}
 import gr.grnet.aquarium.user._
-import gr.grnet.aquarium.logic.accounting.dsl.{DSLResource, DSLSimpleResource, DSLComplexResource}
-import gr.grnet.aquarium.logic.events.{AquariumEvent, UserEvent, WalletEntry, ResourceEvent}
+import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry}
+import java.util.Date
+import gr.grnet.aquarium.util.Loggable
+import gr.grnet.aquarium.util.date.TimeHelpers
+import com.ckkloverdos.maybe.{Failed, NoVal, Just}
+import gr.grnet.aquarium.logic.accounting.RoleAgreements
+import gr.grnet.aquarium.messaging.AkkaAMQP
 
 
 /**
@@ -52,7 +53,10 @@ import gr.grnet.aquarium.logic.events.{AquariumEvent, UserEvent, WalletEntry, Re
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 
-class UserActor extends AquariumActor with Loggable with Accounting {
+class UserActor extends AquariumActor
+                   with AkkaAMQP
+                   with ReflectiveAquariumActor
+                   with Loggable {
   @volatile
   private[this] var _userId: String = _
   @volatile
@@ -60,350 +64,188 @@ class UserActor extends AquariumActor with Loggable with Accounting {
   @volatile
   private[this] var _timestampTheshold: Long = _
 
+  private[this] lazy val messenger = producer("aquarium") // FIXME: Read this from configuration
+
   def role = UserActorRole
 
   private[this] def _configurator: Configurator = Configurator.MasterConfigurator
 
-  private[this] def processResourceEvent(resourceEvent: ResourceEvent, checkForOlderEvents: Boolean): Unit = {
-    if(checkForOlderEvents) {
-      DEBUG("Checking for events older than %s" format resourceEvent)
-      processOlderResourceEvents(resourceEvent)
-    }
-
-    justProcessTheResourceEvent(resourceEvent, "ACTUAL")
+  /**
+   * Replay the event log for all events that affect the user state.
+   */
+  def rebuildState(from: Long, to: Long): Unit = {
+    val start = System.currentTimeMillis()
+    if (_userState == null)
+      createBlankState
+
+    //Rebuild state from user events
+    val usersDB = _configurator.storeProvider.userEventStore
+    val userEvents = usersDB.findUserEventsByUserId(_userId)
+    val numUserEvents = userEvents.size
+    _userState = replayUserEvents(_userState, userEvents, from, to)
+
+    //Rebuild state from resource events
+    val eventsDB = _configurator.storeProvider.resourceEventStore
+    val resourceEvents = eventsDB.findResourceEventsByUserIdAfterTimestamp(_userId, from)
+    val numResourceEvents = resourceEvents.size
+//    _userState = replayResourceEvents(_userState, resourceEvents, from, to)
+
+    //Rebuild state from wallet entries
+    val wallet = _configurator.storeProvider.walletEntryStore
+    val walletEnties = wallet.findWalletEntriesAfter(_userId, new Date(from))
+    val numWalletEntries = walletEnties.size
+    _userState = replayWalletEntries(_userState, walletEnties, from, to)
+
+    INFO(("Rebuilt state from %d events (%d user events, " +
+      "%d resource events, %d wallet entries) in %d msec").format(
+      numUserEvents + numResourceEvents + numWalletEntries,
+      numUserEvents, numResourceEvents, numWalletEntries,
+      System.currentTimeMillis() - start))
   }
 
   /**
-   * Find and process older resource events.
-   *
-   * Older resource events are found based on the latest credit calculation, that is the latest
-   * wallet entry. If there are resource events past that wallet entry, then we deduce that no wallet entries
-   * have been calculated for these resource events and start from there.
+   * Create an empty state for a user
    */
-  private[this] def processOlderResourceEvents(resourceEvent: ResourceEvent): Unit = {
-    assert(_userId == resourceEvent.userId)
-    val rceId = resourceEvent.id
-    val userId = resourceEvent.userId
-    val resourceEventStore = _configurator.resourceEventStore
-    val walletEntriesStore = _configurator.walletStore
-
-    // 1. Find latest wallet entry
-    val latestWalletEntriesM = walletEntriesStore.findLatestUserWalletEntries(userId)
-    latestWalletEntriesM match {
-      case Just(latestWalletEntries) ⇒
-        // The time on which we base the selection of the older events
-        val selectionTime = latestWalletEntries.head.occurredMillis
-
-        // 2. Now find all resource events past the time of the latest wallet entry.
-        //    These events have not been processed, except probably those ones
-        //    that have the same `occurredMillis` with `selectionTime`
-        val oldRCEvents = resourceEventStore.findResourceEventsByUserIdAfterTimestamp(userId, selectionTime)
-
-        // 3. Filter out those resource events for which no wallet entry has actually been
-        //    produced.
-        val rcEventsToProcess = for {
-          oldRCEvent        <- oldRCEvents
-          oldRCEventId      = oldRCEvent.id
-          latestWalletEntry <- latestWalletEntries if(!latestWalletEntry.fromResourceEvent(oldRCEventId) && rceId != oldRCEventId)
-        } yield {
-          oldRCEvent
-        }
-
-        DEBUG("Found %s events older than %s".format(rcEventsToProcess.size, resourceEvent))
-
-        for {
-          rcEventToProcess <- rcEventsToProcess
-        } {
-          justProcessTheResourceEvent(rcEventToProcess, "OLDER")
-        }
-      case NoVal ⇒
-        DEBUG("No events to process older than %s".format(resourceEvent))
-      case Failed(e, m) ⇒
-        ERROR("[%s][%s] %s".format(e.getClass.getName, m, e.getMessage))
-    }
+  def createBlankState = {
+    this._userState = DefaultUserStateComputations.createInitialUserState(this._userId, 0L, true, 0.0)
   }
 
-  private[this] def _storeWalletEntries(walletEntries: List[WalletEntry]): Unit = {
-    val walletEntriesStore = _configurator.walletStore
-    for(walletEntry <- walletEntries) {
-      walletEntriesStore.storeWalletEntry(walletEntry)
-    }
+  /**
+   * Replay user events on the provided user state
+   */
+  def replayUserEvents(initState: UserState, events: List[UserEvent],
+                       from: Long, to: Long): UserState = {
+    initState
   }
 
-  private[this] def _calcNewCreditSum(walletEntries: List[WalletEntry]): Double = {
-    val newCredits = for {
-      walletEntry <- walletEntries if(walletEntry.finalized)
-    } yield walletEntry.value.toDouble
 
-    newCredits.sum
+  /**
+   * Replay wallet entries on the provided user state
+   */
+  def replayWalletEntries(initState: UserState, events: List[WalletEntry],
+                          from: Long, to: Long): UserState = {
+    initState
   }
 
   /**
-   * Process the resource event as if nothing else matters. Just do it.
+   * Persist current user state
    */
-  private[this] def justProcessTheResourceEvent(ev: ResourceEvent, logLabel: String): Unit = {
-    val start = System.currentTimeMillis
-    DEBUG("Processing [%s] %s".format(logLabel, ev))
-
-    // Initially, the user state (regarding resources) is empty.
-    // So we have to compensate for both a totally empty resource state
-    // and the update with new values.
-
-    // 1. Find the resource definition
-    val policy = Policy.policy
-    policy.findResource(ev.resource) match {
-      case Some(resource) ⇒
-        // 2. Get the instance id and value for the resource
-        val instanceIdM = resource match {
-          // 2.1 If it is complex, from the details map, get the field which holds the instanceId
-          case DSLComplexResource(name, unit, costPolicy, descriminatorField) ⇒
-            ev.details.get(descriminatorField) match {
-              case Some(instanceId) ⇒
-                Just(instanceId)
-              case None ⇒
-                // We should have some value under this key here....
-                Failed(throw new AccountingException("")) //TODO: See what to do here
-            }
-          // 2.2 If it is simple, ...
-          case DSLSimpleResource(name, unit, costPolicy) ⇒
-            // ... by convention, the instanceId of a simple resource is just "1"
-            // @see [[gr.grnet.aquarium.user.OwnedResourcesSnapshot]]
-            Just("1")
-        }
-
-        // 3. Did we get a valid instanceId?
-        instanceIdM match {
-          // 3.1 Yes, time to get/update the current state
-          case Just(instanceId) ⇒
-            val oldOwnedResources = _userState.ownedResources
-            // Find or create the new resource instance map
-            val oldOwnedResourcesData = oldOwnedResources.data
-            val oldRCInstanceMap = oldOwnedResourcesData.get(resource) match {
-              case Some(resourceMap) ⇒ resourceMap
-              case None              ⇒ Map[String, Float]()
-            }
-            // Update the new value in the resource instance map
-            val newRCInstanceMap: Map[String, Float] = oldRCInstanceMap.updated(instanceId, ev.value)
-
-            val newOwnedResourcesData = oldOwnedResourcesData.updated(resource, newRCInstanceMap)
-
-            // A. First state diff: the modified resource value
-            val StateChangeMillis = TimeHelpers.nowMillis
-            val newOwnedResources = oldOwnedResources.copy(
-              data = newOwnedResourcesData,
-              snapshotTime = StateChangeMillis
-            )
-
-            // Calculate the wallet entries generated from this resource event
-            _userState.maybeDSLAgreement match {
-              case Just(agreement) ⇒
-                // TODO: the snapshot time should be per instanceId?
-                // TODO: Related events
-                val walletEntriesM = chargeEvent(ev, agreement, ev.value,
-                  new Date(oldOwnedResources.snapshotTime),
-                  findRelatedEntries(resource, ev.getInstanceId(policy)))
-                walletEntriesM match {
-                  case Just(walletEntries) ⇒
-                    _storeWalletEntries(walletEntries)
-
-                    // B. Second state diff: the new credits
-                    val newCreditsSum = _calcNewCreditSum(walletEntries)
-                    val oldCredits    = _userState.safeCredits.data
-                    val newCredits = CreditSnapshot(oldCredits + newCreditsSum, StateChangeMillis)
-
-                    // Finally, the userState change
-                    DEBUG("Credits   = %s".format(this._userId, newCredits))
-                    DEBUG("Resources = %s".format(this._userId, newOwnedResources))
-                    this._userState = this._userState.copy(
-                      credits = newCredits,
-                      ownedResources = newOwnedResources
-                    )
-                  case NoVal ⇒
-                    DEBUG("No wallet entries generated for %s".format(ev))
-                  case failed @ Failed(e, m) ⇒
-                    failed
-                }
-                
-              case NoVal ⇒
-                Failed(new UserDataSnapshotException("No agreement snapshot found for user %s".format(this._userId)))
-              case failed @ Failed(e, m) ⇒
-                failed
-            }
-          // 3.2 No, no luck, this is an error
-          case NoVal ⇒
-            Failed(new UserDataSnapshotException("No instanceId for resource %s of user %s".format(resource, this._userId)))
-          case failed @ Failed(e, m) ⇒
-            failed
-        }
-      // No resource definition found, this is an error
-      case None ⇒ // Policy.policy.findResource(ev.resource)
-        Failed(new UserDataSnapshotException("No resource %s found for user %s".format(ev.resource, this._userId)))
+  private[this] def saveUserState(): Unit = {
+    _configurator.storeProvider.userStateStore.storeUserState(this._userState) match {
+      case Just(record) => record
+      case NoVal => ERROR("Unknown error saving state")
+      case Failed(e) =>
+        ERROR("Saving state failed: %s".format(e));
     }
+  }
 
-    DEBUG("Finished %s time: %d ms".format(ev.id, System.currentTimeMillis - start))
+  def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
+    this._timestampTheshold = event.props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
+    INFO("Setup my timestampTheshold = %s", this._timestampTheshold)
+  }
+
+  def onUserActorInitWithUserId(event: UserActorInitWithUserId): Unit = {
+    this._userId = event.userId
+    DEBUG("Actor starting, loading state")
+  }
+
+  def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
+    val resourceEvent = event.rce
+    if(resourceEvent.userID != this._userId) {
+      ERROR("Received %s but my userId = %s".format(event, this._userId))
+    } else {
+      //ensureUserState()
+//        calcWalletEntries()
+      //processResourceEvent(resourceEvent, true)
+    }
   }
 
   private[this] def processCreateUser(event: UserEvent): Unit = {
-    val userId = event.userId
+    val userId = event.userID
     DEBUG("Creating user from state %s", event)
     val usersDB = _configurator.storeProvider.userStateStore
     usersDB.findUserStateByUserId(userId) match {
       case Just(userState) ⇒
         WARN("User already created, state = %s".format(userState))
-      case failed @ Failed(e, m) ⇒
-        ERROR("[%s][%s] %s", e.getClass.getName, e.getMessage, m)
+      case failed @ Failed(e) ⇒
+        ERROR("[%s] %s", e.getClass.getName, e.getMessage)
       case NoVal ⇒
-        // OK. Create a default UserState and store it
-        val now = TimeHelpers.nowMillis
-        val agreementOpt = Policy.policy.findAgreement("default")
-        
-        if(agreementOpt.isEmpty) {
-          ERROR("No default agreement found. Cannot initialize user state")
-        } else {
-          this._userState = UserState(
-            userId,
-            ActiveSuspendedSnapshot(event.isStateActive, now),
-            CreditSnapshot(0, now),
-            AgreementSnapshot(agreementOpt.get.name, now),
-            RolesSnapshot(event.roles, now),
-            PaymentOrdersSnapshot(Nil, now),
-            OwnedGroupsSnapshot(Nil, now),
-            GroupMembershipsSnapshot(Nil, now),
-            OwnedResourcesSnapshot(Map(), now)
-          )
-
-          usersDB.storeUserState(this._userState)
-          DEBUG("Created and stored %s", this._userState)
-        }
+        val agreement = RoleAgreements.agreementForRole(event.role)
+        DEBUG("User %s assigned agreement %s".format(userId, agreement.name))
+
+        this._userState = DefaultUserStateComputations.createInitialUserState(
+          userId,
+          event.occurredMillis,
+          event.isActive, 0.0, List(event.role), agreement.name)
+        saveUserState
+        DEBUG("Created and stored %s", this._userState)
     }
   }
 
-  private[this] def findRelatedEntries(res: DSLResource, instid: String): List[WalletEntry] = {
-    val walletDB = _configurator.storeProvider.walletEntryStore
-    walletDB.findPreviousEntry(_userId, res.name, instid, Some(false))
-  }
-
-
   private[this] def processModifyUser(event: UserEvent): Unit = {
     val now = TimeHelpers.nowMillis
-    val newActive = ActiveSuspendedSnapshot(event.isStateActive, now)
+    val newActive = ActiveStateSnapshot(event.isStateActive, now)
 
     DEBUG("New active status = %s".format(newActive))
 
-    this._userState = this._userState.copy( active = newActive )
+    this._userState = this._userState.copy(activeStateSnapshot = newActive)
   }
-  /**
-   * Use the provided [[gr.grnet.aquarium.logic.events.UserEvent]] to change any user state.
-   */
-  private[this] def processUserEvent(event: UserEvent): Unit = {
-    if(event.isCreateUser) {
-      processCreateUser(event)
-    } else if(event.isModifyUser) {
-      processModifyUser(event)
+
+  def onProcessUserEvent(event: ProcessUserEvent): Unit = {
+    val userEvent = event.ue
+    if(userEvent.userID != this._userId) {
+      ERROR("Received %s but my userId = %s".format(userEvent, this._userId))
+    } else {
+      if(userEvent.isCreateUser) {
+        processCreateUser(userEvent)
+      } else if(userEvent.isModifyUser) {
+        processModifyUser(userEvent)
+      }
     }
   }
 
-  /**
-   * Try to load from the DB the latest known info (snapshot data) for the given user.
-   */
-  private[this] def findUserState(userId: String): Maybe[UserState] = {
-    val usersDB = _configurator.storeProvider.userStateStore
-    usersDB.findUserStateByUserId(userId)
+  def onRequestUserBalance(event: RequestUserBalance): Unit = {
+    val userId = event.userId
+    val timestamp = event.timestamp
+
+    if (System.currentTimeMillis() - _userState.newestSnapshotTime > 60 * 1000)
+    {
+//        calcWalletEntries()
+    }
+    self reply UserResponseGetBalance(userId, _userState.creditsSnapshot.creditAmount)
   }
 
-  /**
-   * Tries to makes sure that the internal user state exists.
-   *
-   * May contact the [[gr.grnet.aquarium.store.UserStateStore]] for that.
-   *
-   */
-  private[this] def ensureUserState(): Unit = {
-    if(null eq this._userState) {
-      findUserState(this._userId) match {
-        case Just(userState) ⇒
-          DEBUG("Loaded user state %s from DB", userState)
-          this._userState = userState
-        case Failed(e, m) ⇒
-          ERROR("While loading user state from DB: [%s][%s] %s", e.getClass.getName, e.getMessage, m)
-        case NoVal ⇒
-          WARN("Request for unknown (to Aquarium) user")
-      }
+  def onUserRequestGetState(event: UserRequestGetState): Unit = {
+    val userId = event.userId
+    if(this._userId != userId) {
+      ERROR("Received %s but my userId = %s".format(event, this._userId))
+      // TODO: throw an exception here
+    } else {
+      // FIXME: implement
+      ERROR("FIXME: Should have properly computed the user state")
+//      ensureUserState()
+      self reply UserResponseGetState(userId, this._userState)
     }
   }
 
-  protected def receive: Receive = {
-    case UserActorStop ⇒
-      // TODO: Check if this is OK with the rest of the semantics
-      self.stop()
-
-    case m @ AquariumPropertiesLoaded(props) ⇒
-      this._timestampTheshold = props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
-      INFO("Setup my timestampTheshold = %s", this._timestampTheshold)
-
-    case m @ UserActorInitWithUserId(userId) ⇒
-      this._userId = userId
-      ensureUserState()
-
-    case m @ ProcessResourceEvent(resourceEvent) ⇒
-      if(resourceEvent.userId != this._userId) {
-        ERROR("Received %s but my userId = %s".format(m, this._userId))
-      } else {
-        ensureUserState()
-        processResourceEvent(resourceEvent, true)
-      }
+  def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
+  }
 
-    case m @ ProcessUserEvent(userEvent) ⇒
-      if(userEvent.userId != this._userId) {
-        ERROR("Received %s but my userId = %s".format(m, this._userId))
-      } else {
-        ensureUserState()
-        processUserEvent(userEvent)
-      }
+  override def postStop {
+    DEBUG("Stopping, saving state")
+    saveUserState
+  }
 
-    case m @ RequestUserBalance(userId, timestamp) ⇒
-      if(this._userId != userId) {
-        ERROR("Received %s but my userId = %s".format(m, this._userId))
-        // TODO: throw an exception here
-      } else {
-        // This is the big party.
-        // Get the user state, if it exists and make sure it is not stale.
-        ensureUserState()
-
-        // Do we have a user state?
-        if(_userState ne null) {
-          // Yep, we do. See what there is inside it.
-          val credits = _userState.credits
-          val creditsTimestamp = credits.snapshotTime
-
-          // Check if data is stale
-          if(creditsTimestamp + _timestampTheshold > timestamp) {
-            // No, it's OK
-            self reply UserResponseGetBalance(userId, credits.data)
-          } else {
-            // Yep, data is stale and must recompute balance
-            // FIXME: implement
-            ERROR("FIXME: Should have computed a new value for %s".format(credits))
-            self reply UserResponseGetBalance(userId, credits.data)
-          }
-        } else {
-          // No user state exists. This is an error.
-          val errMsg = "Could not load user state for %s".format(m)
-          ERROR(errMsg)
-          self reply ResponseUserBalance(userId, 0, Some(errMsg))
-        }
-      }
+  override def preRestart(reason: Throwable) {
+    DEBUG("Actor failed, restarting")
+  }
 
-    case m @ UserRequestGetState(userId, timestamp) ⇒
-      if(this._userId != userId) {
-        ERROR("Received %s but my userId = %s".format(m, this._userId))
-        // TODO: throw an exception here
-      } else {
-        // FIXME: implement
-        ERROR("FIXME: Should have properly computed the user state")
-        self reply UserResponseGetState(userId, this._userState)
-      }
+  override def postRestart(reason: Throwable) {
+    DEBUG("Actor restarted succesfully")
   }
 
+  def knownMessageTypes = UserActor.KnownMessageTypes
+
   private[this] def DEBUG(fmt: String, args: Any*) =
     logger.debug("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))
 
@@ -415,4 +257,16 @@ class UserActor extends AquariumActor with Loggable with Accounting {
 
   private[this] def ERROR(fmt: String, args: Any*) =
     logger.error("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))
+}
+
+object UserActor {
+  final val KnownMessageTypes = List(
+    classOf[AquariumPropertiesLoaded],
+    classOf[UserActorInitWithUserId],
+    classOf[ProcessResourceEvent],
+    classOf[ProcessUserEvent],
+    classOf[RequestUserBalance],
+    classOf[UserRequestGetState],
+    classOf[ActorProviderConfigured]
+  )
 }
\ No newline at end of file