WIP: Remodeling UserState store mechanics
authorChristos KK Loverdos <loverdos@gmail.com>
Wed, 25 Apr 2012 14:05:07 +0000 (17:05 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Wed, 25 Apr 2012 14:05:07 +0000 (17:05 +0300)
src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala
src/main/scala/gr/grnet/aquarium/store/UserStateStore.scala
src/main/scala/gr/grnet/aquarium/store/memory/MemStore.scala
src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala
src/main/scala/gr/grnet/aquarium/user/UserState.scala
src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala
src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala

index d7aa0ed..3ab665b 100644 (file)
@@ -43,6 +43,7 @@ import gr.grnet.aquarium.actor._
 import gr.grnet.aquarium.Configurator
 import gr.grnet.aquarium.user._
 
+import gr.grnet.aquarium.util.shortClassNameOf
 import gr.grnet.aquarium.util.date.TimeHelpers
 import gr.grnet.aquarium.logic.accounting.RoleAgreements
 import gr.grnet.aquarium.actor.message.service.router._
@@ -66,25 +67,6 @@ class UserActor extends ReflectiveAquariumActor {
   private[this] def _timestampTheshold =
     _configurator.props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
 
-  /**
-   * Create an empty state for a user
-   */
-  def createInitialState(userID: String) = {
-    this._userState = DefaultUserStateComputations.createInitialUserState(userID, 0L, true, 0.0)
-  }
-
-
-  /**
-   * Persist current user state
-   */
-  private[this] def saveUserState(): Unit = {
-    _configurator.storeProvider.userStateStore.storeUserState(this._userState) match {
-      case Just(record) => record
-      case NoVal => ERROR("Unknown error saving state")
-      case Failed(e) =>
-        ERROR("Saving state failed: %s".format(e));
-    }
-  }
 
   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
   }
@@ -92,26 +74,38 @@ class UserActor extends ReflectiveAquariumActor {
   def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
   }
 
-  private[this] def processCreateUser(event: IMEventModel): Unit = {
-    val userId = event.userID
-    DEBUG("Creating user from state %s", event)
-    val usersDB = _configurator.storeProvider.userStateStore
-    usersDB.findUserStateByUserId(userId) match {
-      case Just(userState) ⇒
-        WARN("User already created, state = %s".format(userState))
-      case failed@Failed(e) ⇒
-        ERROR("[%s] %s", e.getClass.getName, e.getMessage)
-      case NoVal ⇒
-        val agreement = RoleAgreements.agreementForRole(event.role)
-        DEBUG("User %s assigned agreement %s".format(userId, agreement.name))
-
-        this._userState = DefaultUserStateComputations.createInitialUserState(
-          userId,
-          event.occurredMillis,
-          event.isActive, 0.0, List(event.role), agreement.name)
-        saveUserState
-        DEBUG("Created and stored %s", this._userState)
+  private[this] def _computeAgreementForNewUser(imEvent: IMEventModel): String = {
+    // FIXME: Implement based on the role
+    "default"
+  }
+
+  private[this] def processCreateUser(imEvent: IMEventModel): Unit = {
+    val userID = imEvent.userID
+    val store = _configurator.storeProvider.userStateStore
+    // try find user state. normally should ot exist
+    val latestUserStateOpt = store.findLatestUserStateByUserID(userID)
+    if(latestUserStateOpt.isDefined) {
+      logger.error("Got %s(%s, %s) but user already exists. Ingoring".format(
+        userID,
+        shortClassNameOf(imEvent),
+        imEvent.eventType))
+
+      return
     }
+
+    val initialAgreementName = _computeAgreementForNewUser(imEvent)
+    val newUserState    = DefaultUserStateComputations.createInitialUserState(
+      userID,
+      imEvent.occurredMillis,
+      imEvent.isActive,
+      0.0,
+      List(imEvent.role),
+      initialAgreementName)
+
+    this._userState = newUserState
+
+    // FIXME: If this fails, then the actor must be shut down.
+    store.insertUserState(newUserState)
   }
 
   private[this] def processModifyUser(event: IMEventModel): Unit = {
@@ -147,18 +141,6 @@ class UserActor extends ReflectiveAquariumActor {
   def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
   }
 
-  override def postStop {
-    DEBUG("Actor[%s] stopping, saving state", self.uuid)
-    saveUserState
-  }
-
-  override def preRestart(reason: Throwable) {
-    ERROR(reason, "preRestart: Actor[%s]", self.uuid)
-  }
-
-  override def postRestart(reason: Throwable) {
-    ERROR(reason, "postRestart: Actor[%s]", self.uuid)
-  }
 
   private[this] def D_userID = {
     this._userState match {
index 6f4e53d..f008133 100644 (file)
@@ -49,28 +49,25 @@ import com.ckkloverdos.maybe.Maybe
 trait UserStateStore {
 
   /**
-   * Store a user state
+   * Store a user state.
    */
-  def storeUserState(userState: UserState): Maybe[RecordID]
+  def insertUserState(userState: UserState): UserState
 
-
-  def storeUserState2(userState: UserState): Maybe[UserState] = {
-    for {
-      recordID <- storeUserState(userState)
-    } yield {
-      userState.copy(id = recordID.id.toString)
-    }
+  def insertUserState2(userState: UserState): Maybe[UserState] = {
+    Maybe { insertUserState(userState) }
   }
 
   /**
    * Find a state by user ID
    */
-  def findUserStateByUserId(userId: String): Maybe[UserState]
+  def findUserStateByUserID(userID: String): Option[UserState]
+
+  def findLatestUserStateByUserID(userID: String): Option[UserState]
 
   /**
    * Find the most up-to-date user state for the particular billing period.
    */
-  def findLatestUserStateForEndOfBillingMonth(userId: String, yearOfBillingMonth: Int, billingMonth: Int): Maybe[UserState]
+  def findLatestUserStateForEndOfBillingMonth(userId: String, yearOfBillingMonth: Int, billingMonth: Int): Option[UserState]
 
   /**
    * Delete a state for a user
index 259b0f2..7da8cd0 100644 (file)
@@ -49,6 +49,7 @@ import gr.grnet.aquarium.event.{WalletEntry, ResourceEvent, PolicyEntry}
 import gr.grnet.aquarium.converter.JsonTextFormat
 import gr.grnet.aquarium.util._
 import gr.grnet.aquarium.event.im.{StdIMEvent, IMEventModel}
+import org.bson.types.ObjectId
 
 /**
  * An implementation of various stores that persists data in memory.
@@ -105,25 +106,34 @@ class MemStore extends UserStateStore
 
 
   //+ UserStateStore
-  def storeUserState(userState: UserState): Maybe[RecordID] = {
-    _userStates = userState.copy(id = idGen.nextUID()) :: _userStates
-    Just(RecordID(_userStates.head._id))
+  def insertUserState(userState: UserState): UserState = {
+    _userStates = userState.copy(_id = new ObjectId()) :: _userStates
+    userState
   }
 
-  def findUserStateByUserId(userId: String) = {
-    _userStates.find(_.userID == userId) match {
-      case Some(userState) ⇒
-        Just(userState)
-      case None ⇒
-        NoVal
+  def findUserStateByUserID(userID: String) = {
+    _userStates.find(_.userID == userID)
+  }
+
+  def findLatestUserStateByUserID(userID: String) = {
+    val goodOnes = _userStates.filter(_.userID == userID)
+
+    goodOnes.sortWith {
+      case (us1, us2) ⇒
+        us1.oldestSnapshotTime > us2.oldestSnapshotTime
+    } match {
+      case head :: _ ⇒
+        Some(head)
+      case _ ⇒
+        None
     }
   }
 
-  def findLatestUserStateForEndOfBillingMonth(userId: String,
+  def findLatestUserStateForEndOfBillingMonth(userID: String,
                                               yearOfBillingMonth: Int,
-                                              billingMonth: Int): Maybe[UserState] = {
+                                              billingMonth: Int): Option[UserState] = {
     val goodOnes = _userStates.filter { userState ⇒
-        val f1 = userState.userID == userId
+        val f1 = userState.userID == userID
         val f2 = userState.isFullBillingMonthState
         val bm = userState.theFullBillingMonth
         val f3 = (bm ne null) && {
@@ -138,9 +148,9 @@ class MemStore extends UserStateStore
         us1.oldestSnapshotTime > us2.oldestSnapshotTime
     } match {
       case head :: _ ⇒
-        Just(head)
+        Some(head)
       case _ ⇒
-        NoVal
+        None
     }
   }
 
index b1f1421..e238883 100644 (file)
@@ -202,34 +202,36 @@ class MongoDBStore(
   //-ResourceEventStore
 
   //+ UserStateStore
-  def storeUserState(userState: UserState): Maybe[RecordID] = {
-    MongoDBStore.storeUserState(userState, userStates)
+  def insertUserState(userState: UserState) = {
+    MongoDBStore.insertUserState(userState, userStates, MongoDBStore.jsonSupportToDBObject)
   }
 
-  def findUserStateByUserId(userId: String): Maybe[UserState] = {
-    Maybe {
-      val query = new BasicDBObject(UserStateJsonNames.userId, userId)
-      val cursor = userStates find query
+  def findUserStateByUserID(userID: String): Option[UserState] = {
+    val query = new BasicDBObject(UserStateJsonNames.userID, userID)
+    val cursor = userStates find query
 
-      try {
-        if(cursor.hasNext)
-          MongoDBStore.dbObjectToUserState(cursor.next())
-        else
-          null
-      } finally {
-        cursor.close()
-      }
+    withCloseable(cursor) { cursor ⇒
+      if(cursor.hasNext)
+        Some(MongoDBStore.dbObjectToUserState(cursor.next()))
+      else
+        None
     }
   }
 
+
+  def findLatestUserStateByUserID(userID: String) = {
+    // FIXME: implement
+    null
+  }
+
   def findLatestUserStateForEndOfBillingMonth(userId: String,
                                               yearOfBillingMonth: Int,
-                                              billingMonth: Int): Maybe[UserState] = {
-    NoVal // FIXME: implement
+                                              billingMonth: Int): Option[UserState] = {
+    None // FIXME: implement
   }
 
   def deleteUserState(userId: String) = {
-    val query = new BasicDBObject(UserStateJsonNames.userId, userId)
+    val query = new BasicDBObject(UserStateJsonNames.userID, userId)
     userStates.findAndRemove(query)
   }
   //- UserStateStore
@@ -335,10 +337,6 @@ class MongoDBStore(
     MongoDBStore.createIMEventFromOther(event)
   }
 
-//  def storeUnparsed(json: String): Maybe[RecordID] = {
-//    MongoDBStore.storeJustJson(json, unparsedIMEvents)
-//  }
-
   def insertIMEvent(event: IMEventModel): IMEvent = {
     val localEvent = MongoDBIMEvent.fromOther(event, new ObjectId())
     MongoDBStore.insertObject(localEvent, imEvents, MongoDBStore.jsonSupportToDBObject)
@@ -496,8 +494,8 @@ object MongoDBStore {
     }
   }
 
-  def storeUserState(userState: UserState, collection: DBCollection): Maybe[RecordID] = {
-    Maybe(storeAny[UserState](userState, collection, ResourceJsonNames.userId, _.userID, MongoDBStore.jsonSupportToDBObject))
+  def storeUserState(userState: UserState, collection: DBCollection) = {
+    storeAny[UserState](userState, collection, ResourceJsonNames.userId, _.userID, MongoDBStore.jsonSupportToDBObject)
   }
   
   def storePolicyEntry(policyEntry: PolicyEntry, collection: DBCollection): Maybe[RecordID] = {
@@ -519,6 +517,26 @@ object MongoDBStore {
     RecordID(dbObject.get("_id").toString)
   }
 
+  // FIXME: consolidate
+  def insertUserState[A <: UserState](obj: A, collection: DBCollection, serializer: A ⇒ DBObject) = {
+    val dbObject = serializer apply obj
+    val objectId = obj._id  match {
+      case null ⇒
+        val _id = new ObjectId()
+        dbObject.put("_id", _id)
+        _id
+
+      case _id ⇒
+        _id
+    }
+
+    dbObject.put(JsonNames._id, objectId)
+
+    collection.insert(dbObject, WriteConcern.JOURNAL_SAFE)
+
+    obj
+  }
+
   def insertObject[A <: MongoDBEventModel](obj: A, collection: DBCollection, serializer: (A) => DBObject) : ObjectId = {
     val dbObject = serializer apply obj
     val objectId = obj._id  match {
index 7e0aece..a382e0f 100644 (file)
@@ -43,6 +43,7 @@ import gr.grnet.aquarium.event.{NewWalletEntry, WalletEntry}
 import gr.grnet.aquarium.converter.{JsonTextFormat, StdConverters}
 import gr.grnet.aquarium.AquariumException
 import gr.grnet.aquarium.event.im.IMEventModel
+import org.bson.types.ObjectId
 
 
 /**
@@ -171,7 +172,7 @@ case class UserState(
     // The user state we used to compute this one. Normally the (cached)
     // state at the beginning of the billing period.
     parentUserStateId: Option[String] = None,
-    id: String = ""
+    _id: ObjectId = new ObjectId()
 ) extends JsonSupport {
 
   private[this] def _allSnapshots: List[Long] = {
@@ -188,11 +189,9 @@ case class UserState(
 
   def newestSnapshotTime: Long  = _allSnapshots max
 
-  def _id = id
   def idOpt: Option[String] = _id match {
     case null ⇒ None
-    case ""   ⇒ None
-    case _id  ⇒ Some(_id)
+    case _id  ⇒ Some(_id.toString)
   }
 
 //  def userCreationDate = new Date(userCreationMillis)
@@ -250,7 +249,7 @@ object UserState {
 
   object JsonNames {
     final val _id = "_id"
-    final val userId = "userId"
+    final val userID = "userID"
   }
 }
 
index 71918cb..cb505cb 100644 (file)
@@ -137,7 +137,7 @@ class UserStateComputations extends Loggable {
 
       // NOTE: Reason here will be: InitialUserStateCalculation
       val initialUserState0 = createInitialUserStateFrom(currentUserState)
-      val initialUserStateM = userStateStore.storeUserState2(initialUserState0)
+      val initialUserStateM = userStateStore.insertUserState2(initialUserState0)
 
       clog.debug("Returning ZERO state [_idM=%s] %s".format(initialUserStateM.map(_._id), initialUserStateM))
       clog.end()
@@ -145,10 +145,16 @@ class UserStateComputations extends Loggable {
       initialUserStateM
     } else {
       // Ask DB cache for the latest known user state for this billing period
-      val latestUserStateM = userStateStore.findLatestUserStateForEndOfBillingMonth(
+      val latestUserStateM = Maybe { userStateStore.findLatestUserStateForEndOfBillingMonth(
         userId,
         billingMonthInfo.year,
-        billingMonthInfo.month)
+        billingMonthInfo.month) match {
+
+        case Some(latestUserState) ⇒
+          latestUserState
+        case None ⇒
+          null
+      }}
 
       latestUserStateM match {
         case NoVal ⇒
@@ -527,7 +533,7 @@ class UserStateComputations extends Loggable {
     clog.debug("calculationReason = %s", calculationReason)
 
     if(calculationReason.shouldStoreUserState) {
-      val storedUserStateM = userStateStore.storeUserState2(_workingUserState)
+      val storedUserStateM = userStateStore.insertUserState2(_workingUserState)
       storedUserStateM match {
         case Just(storedUserState) ⇒
           clog.info("Saved [_id=%s] %s", storedUserState._id, storedUserState)
index 3b60369..b81cc6d 100644 (file)
@@ -258,7 +258,7 @@ aquariumpolicy:
 
   private[this]
   def showUserState(clog: ContextualLogger, userState: UserState) {
-    val id = userState.id
+    val id = userState._id
     val parentId = userState.parentUserStateId
     val credits = userState.creditsSnapshot.creditAmount
     val newWalletEntries = userState.newWalletEntries.map(_.toDebugString)