Fix API breakage from previous upgrade
[aquarium] / src / main / scala / gr / grnet / aquarium / user / actor / UserActor.scala
index e97eaab..6732e8e 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2011 GRNET S.A. All rights reserved.
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or
  * without modification, are permitted provided that the following
@@ -38,14 +38,14 @@ package gr.grnet.aquarium.user.actor
 import gr.grnet.aquarium.actor._
 import gr.grnet.aquarium.Configurator
 import gr.grnet.aquarium.processor.actor._
-import gr.grnet.aquarium.logic.accounting.{AccountingException, Policy, Accounting}
 import gr.grnet.aquarium.user._
-import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry, ResourceEvent}
+import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry}
 import java.util.Date
-import gr.grnet.aquarium.util.{DateUtils, Loggable}
-import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLResource, DSLComplexResource}
+import gr.grnet.aquarium.util.Loggable
 import gr.grnet.aquarium.util.date.TimeHelpers
-import com.ckkloverdos.maybe.{Maybe, Failed, NoVal, Just}
+import com.ckkloverdos.maybe.{Failed, NoVal, Just}
+import gr.grnet.aquarium.logic.accounting.RoleAgreements
+import gr.grnet.aquarium.messaging.AkkaAMQP
 
 
 /**
@@ -54,7 +54,9 @@ import com.ckkloverdos.maybe.{Maybe, Failed, NoVal, Just}
  */
 
 class UserActor extends AquariumActor
-  with Loggable with Accounting with DateUtils {
+                   with AkkaAMQP
+                   with ReflectiveAquariumActor
+                   with Loggable {
   @volatile
   private[this] var _userId: String = _
   @volatile
@@ -62,79 +64,12 @@ class UserActor extends AquariumActor
   @volatile
   private[this] var _timestampTheshold: Long = _
 
+  private[this] lazy val messenger = producer("aquarium") // FIXME: Read this from configuration
+
   def role = UserActorRole
 
   private[this] def _configurator: Configurator = Configurator.MasterConfigurator
 
-  private[this] def processCreateUser(event: UserEvent): Unit = {
-    val userId = event.userId
-    DEBUG("Creating user from state %s", event)
-    val usersDB = _configurator.storeProvider.userStateStore
-    usersDB.findUserStateByUserId(userId) match {
-      case Just(userState) ⇒
-        WARN("User already created, state = %s".format(userState))
-      case failed @ Failed(e, m) ⇒
-        ERROR("[%s][%s] %s", e.getClass.getName, e.getMessage, m)
-      case NoVal ⇒
-        // OK. Create a default UserState and store it
-        val now = TimeHelpers.nowMillis
-        val agreementOpt = Policy.policy.findAgreement(DSLAgreement.DefaultAgreementName)
-
-        if(agreementOpt.isEmpty) {
-          ERROR("No default agreement found. Cannot initialize user state")
-        } else {
-          this._userState = DefaultUserStateComputations.createFirstUserState(userId, DSLAgreement.DefaultAgreementName)
-          saveUserState
-          DEBUG("Created and stored %s", this._userState)
-        }
-    }
-  }
-
-  private[this] def findRelatedEntries(res: DSLResource, instid: String): List[WalletEntry] = {
-    val walletDB = _configurator.storeProvider.walletEntryStore
-    walletDB.findPreviousEntry(_userId, res.name, instid, Some(false))
-  }
-
-
-  private[this] def processModifyUser(event: UserEvent): Unit = {
-    val now = TimeHelpers.nowMillis
-    val newActive = ActiveStateSnapshot(event.isStateActive, now)
-
-    DEBUG("New active status = %s".format(newActive))
-
-    this._userState = this._userState.copy( active = newActive )
-  }
-  /**
-   * Use the provided [[gr.grnet.aquarium.logic.events.UserEvent]] to change any user state.
-   */
-  private[this] def processUserEvent(event: UserEvent): Unit = {
-    if(event.isCreateUser) {
-      processCreateUser(event)
-    } else if(event.isModifyUser) {
-      processModifyUser(event)
-    }
-  }
-
-  /**
-   * Tries to makes sure that the internal user state exists.
-   *
-   * May contact the [[gr.grnet.aquarium.store.UserStateStore]] for that.
-   *
-   */
-  private[this] def ensureUserState(): Unit = {
-    if (_userState == null)
-      rebuildState(0)
-    else
-      rebuildState(_userState.oldestSnapshotTime, System.currentTimeMillis())
-  }
-
-  /**
-   * Replay the event log for all events that affect the user state, starting
-   * from the provided time instant.
-   */
-  def rebuildState(from: Long): Unit =
-    rebuildState(from, oneYearAhead(new Date(), new Date(Long.MaxValue)).getTime)
-
   /**
    * Replay the event log for all events that affect the user state.
    */
@@ -172,7 +107,7 @@ class UserActor extends AquariumActor
    * Create an empty state for a user
    */
   def createBlankState = {
-    this._userState = DefaultUserStateComputations.createFirstUserState(this._userId, DSLAgreement.DefaultAgreementName)
+    this._userState = DefaultUserStateComputations.createInitialUserState(this._userId, 0L, true, 0.0)
   }
 
   /**
@@ -180,22 +115,6 @@ class UserActor extends AquariumActor
    */
   def replayUserEvents(initState: UserState, events: List[UserEvent],
                        from: Long, to: Long): UserState = {
-//    var act = initState.active
-//    var rol = initState.roles
-//    events
-//      .filter(e => e.occurredMillis >= from && e.occurredMillis < to)
-//      .foreach {
-//        e =>
-//          act = act.copy(
-//            data = e.isStateActive, snapshotTime = e.occurredMillis)
-//          // TODO: Implement the following
-//          //_userState.agreement = _userState.agreement.copy(
-//          //  data = e.newAgreement, e.occurredMillis)
-//
-//          rol = rol.copy(data = e.roles,
-//            snapshotTime = e.occurredMillis)
-//    }
-//    initState.copy(active = act, roles = rol)
     initState
   }
 
@@ -205,19 +124,6 @@ class UserActor extends AquariumActor
    */
   def replayWalletEntries(initState: UserState, events: List[WalletEntry],
                           from: Long, to: Long): UserState = {
-//    var cred = initState.credits
-//    events
-//      .filter(e => e.occurredMillis >= from && e.occurredMillis < to)
-//      .foreach {
-//        w =>
-//          val newVal = cred.creditAmount + w.value
-//          cred = cred.copy(data = newVal)
-//    }
-//    if (!events.isEmpty) {
-//      val snapTime = events.map{e => e.occurredMillis}.max
-//      cred = cred.copy(snapshotTime = snapTime)
-//    }
-//    initState.copy(credits = cred)
     initState
   }
 
@@ -225,66 +131,121 @@ class UserActor extends AquariumActor
    * Persist current user state
    */
   private[this] def saveUserState(): Unit = {
-    _configurator.storeProvider.userStateStore.deleteUserState(this._userId)
     _configurator.storeProvider.userStateStore.storeUserState(this._userState) match {
       case Just(record) => record
       case NoVal => ERROR("Unknown error saving state")
-      case Failed(e, a) =>
-        ERROR("Saving state failed: %s error was: %s".format(a,e));
+      case Failed(e) =>
+        ERROR("Saving state failed: %s".format(e));
     }
   }
 
-  protected def receive: Receive = {
-    case m @ AquariumPropertiesLoaded(props) ⇒
-      this._timestampTheshold = props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
-      INFO("Setup my timestampTheshold = %s", this._timestampTheshold)
-
-    case m @ UserActorInitWithUserId(userId) ⇒
-      this._userId = userId
-      DEBUG("Actor starting, loading state")
-      ensureUserState()
-
-    case m @ ProcessResourceEvent(resourceEvent) ⇒
-      if(resourceEvent.userId != this._userId) {
-        ERROR("Received %s but my userId = %s".format(m, this._userId))
-      } else {
-        //ensureUserState()
+  def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
+    this._timestampTheshold = event.props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
+    INFO("Setup my timestampTheshold = %s", this._timestampTheshold)
+  }
+
+  def onUserActorInitWithUserId(event: UserActorInitWithUserId): Unit = {
+    this._userId = event.userId
+    DEBUG("Actor starting, loading state")
+  }
+
+  def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
+    val resourceEvent = event.rce
+    if(resourceEvent.userID != this._userId) {
+      ERROR("Received %s but my userId = %s".format(event, this._userId))
+    } else {
+      //ensureUserState()
 //        calcWalletEntries()
-        //processResourceEvent(resourceEvent, true)
-      }
+      //processResourceEvent(resourceEvent, true)
+    }
+  }
+
+  private[this] def processCreateUser(event: UserEvent): Unit = {
+    val userId = event.userID
+    DEBUG("Creating user from state %s", event)
+    val usersDB = _configurator.storeProvider.userStateStore
+    usersDB.findUserStateByUserId(userId) match {
+      case Just(userState) ⇒
+        WARN("User already created, state = %s".format(userState))
+      case failed @ Failed(e) ⇒
+        ERROR("[%s] %s", e.getClass.getName, e.getMessage)
+      case NoVal ⇒
+        val agreement = RoleAgreements.agreementForRole(event.role)
+        DEBUG("User %s assigned agreement %s".format(userId, agreement.name))
+
+        this._userState = DefaultUserStateComputations.createInitialUserState(
+          userId,
+          event.occurredMillis,
+          event.isActive, 0.0, List(event.role), agreement.name)
+        saveUserState
+        DEBUG("Created and stored %s", this._userState)
+    }
+  }
+
+  private[this] def processModifyUser(event: UserEvent): Unit = {
+    val now = TimeHelpers.nowMillis
+    val newActive = ActiveStateSnapshot(event.isStateActive, now)
 
-    case m @ ProcessUserEvent(userEvent) ⇒
-      if(userEvent.userId != this._userId) {
-        ERROR("Received %s but my userId = %s".format(m, this._userId))
-      } else {
-        ensureUserState()
-        processUserEvent(userEvent)
+    DEBUG("New active status = %s".format(newActive))
+
+    this._userState = this._userState.copy(activeStateSnapshot = newActive)
+  }
+
+  def onProcessUserEvent(event: ProcessUserEvent): Unit = {
+    val userEvent = event.ue
+    if(userEvent.userID != this._userId) {
+      ERROR("Received %s but my userId = %s".format(userEvent, this._userId))
+    } else {
+      if(userEvent.isCreateUser) {
+        processCreateUser(userEvent)
+      } else if(userEvent.isModifyUser) {
+        processModifyUser(userEvent)
       }
+    }
+  }
+
+  def onRequestUserBalance(event: RequestUserBalance): Unit = {
+    val userId = event.userId
+    val timestamp = event.timestamp
 
-    case m @ RequestUserBalance(userId, timestamp) ⇒
-      if (System.currentTimeMillis() - _userState.newestSnapshotTime > 60 * 1000)
-      {
+    if (System.currentTimeMillis() - _userState.newestSnapshotTime > 60 * 1000)
+    {
 //        calcWalletEntries()
-      }
-      self reply UserResponseGetBalance(userId, _userState.credits.creditAmount)
-
-    case m @ UserRequestGetState(userId, timestamp) ⇒
-      if(this._userId != userId) {
-        ERROR("Received %s but my userId = %s".format(m, this._userId))
-        // TODO: throw an exception here
-      } else {
-        // FIXME: implement
-        ERROR("FIXME: Should have properly computed the user state")
-        ensureUserState()
-        self reply UserResponseGetState(userId, this._userState)
-      }
+    }
+    self reply UserResponseGetBalance(userId, _userState.creditsSnapshot.creditAmount)
+  }
+
+  def onUserRequestGetState(event: UserRequestGetState): Unit = {
+    val userId = event.userId
+    if(this._userId != userId) {
+      ERROR("Received %s but my userId = %s".format(event, this._userId))
+      // TODO: throw an exception here
+    } else {
+      // FIXME: implement
+      ERROR("FIXME: Should have properly computed the user state")
+//      ensureUserState()
+      self reply UserResponseGetState(userId, this._userState)
+    }
+  }
+
+  def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
   }
 
   override def postStop {
     DEBUG("Stopping, saving state")
-    //saveUserState
+    saveUserState
   }
 
+  override def preRestart(reason: Throwable) {
+    DEBUG("Actor failed, restarting")
+  }
+
+  override def postRestart(reason: Throwable) {
+    DEBUG("Actor restarted succesfully")
+  }
+
+  def knownMessageTypes = UserActor.KnownMessageTypes
+
   private[this] def DEBUG(fmt: String, args: Any*) =
     logger.debug("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))
 
@@ -296,4 +257,16 @@ class UserActor extends AquariumActor
 
   private[this] def ERROR(fmt: String, args: Any*) =
     logger.error("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))
+}
+
+object UserActor {
+  final val KnownMessageTypes = List(
+    classOf[AquariumPropertiesLoaded],
+    classOf[UserActorInitWithUserId],
+    classOf[ProcessResourceEvent],
+    classOf[ProcessUserEvent],
+    classOf[RequestUserBalance],
+    classOf[UserRequestGetState],
+    classOf[ActorProviderConfigured]
+  )
 }
\ No newline at end of file