From 1224a1617dbd9bf1869144030b6128debc38c1df Mon Sep 17 00:00:00 2001 From: Christos KK Loverdos Date: Wed, 25 Apr 2012 17:05:07 +0300 Subject: [PATCH] WIP: Remodeling UserState store mechanics --- .../aquarium/actor/service/user/UserActor.scala | 82 ++++++++------------ .../gr/grnet/aquarium/store/UserStateStore.scala | 19 ++--- .../gr/grnet/aquarium/store/memory/MemStore.scala | 38 +++++---- .../aquarium/store/mongodb/MongoDBStore.scala | 64 +++++++++------ .../scala/gr/grnet/aquarium/user/UserState.scala | 9 +-- .../aquarium/user/UserStateComputations.scala | 14 +++- .../aquarium/user/UserStateComputationsTest.scala | 2 +- 7 files changed, 120 insertions(+), 108 deletions(-) diff --git a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala index d7aa0ed..3ab665b 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala @@ -43,6 +43,7 @@ import gr.grnet.aquarium.actor._ import gr.grnet.aquarium.Configurator import gr.grnet.aquarium.user._ +import gr.grnet.aquarium.util.shortClassNameOf import gr.grnet.aquarium.util.date.TimeHelpers import gr.grnet.aquarium.logic.accounting.RoleAgreements import gr.grnet.aquarium.actor.message.service.router._ @@ -66,25 +67,6 @@ class UserActor extends ReflectiveAquariumActor { private[this] def _timestampTheshold = _configurator.props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000) - /** - * Create an empty state for a user - */ - def createInitialState(userID: String) = { - this._userState = DefaultUserStateComputations.createInitialUserState(userID, 0L, true, 0.0) - } - - - /** - * Persist current user state - */ - private[this] def saveUserState(): Unit = { - _configurator.storeProvider.userStateStore.storeUserState(this._userState) match { - case Just(record) => record - case NoVal => ERROR("Unknown error saving state") - case Failed(e) => - ERROR("Saving state failed: %s".format(e)); - } - } def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = { } @@ -92,26 +74,38 @@ class UserActor extends ReflectiveAquariumActor { def onActorProviderConfigured(event: ActorProviderConfigured): Unit = { } - private[this] def processCreateUser(event: IMEventModel): Unit = { - val userId = event.userID - DEBUG("Creating user from state %s", event) - val usersDB = _configurator.storeProvider.userStateStore - usersDB.findUserStateByUserId(userId) match { - case Just(userState) ⇒ - WARN("User already created, state = %s".format(userState)) - case failed@Failed(e) ⇒ - ERROR("[%s] %s", e.getClass.getName, e.getMessage) - case NoVal ⇒ - val agreement = RoleAgreements.agreementForRole(event.role) - DEBUG("User %s assigned agreement %s".format(userId, agreement.name)) - - this._userState = DefaultUserStateComputations.createInitialUserState( - userId, - event.occurredMillis, - event.isActive, 0.0, List(event.role), agreement.name) - saveUserState - DEBUG("Created and stored %s", this._userState) + private[this] def _computeAgreementForNewUser(imEvent: IMEventModel): String = { + // FIXME: Implement based on the role + "default" + } + + private[this] def processCreateUser(imEvent: IMEventModel): Unit = { + val userID = imEvent.userID + val store = _configurator.storeProvider.userStateStore + // try find user state. normally should ot exist + val latestUserStateOpt = store.findLatestUserStateByUserID(userID) + if(latestUserStateOpt.isDefined) { + logger.error("Got %s(%s, %s) but user already exists. Ingoring".format( + userID, + shortClassNameOf(imEvent), + imEvent.eventType)) + + return } + + val initialAgreementName = _computeAgreementForNewUser(imEvent) + val newUserState = DefaultUserStateComputations.createInitialUserState( + userID, + imEvent.occurredMillis, + imEvent.isActive, + 0.0, + List(imEvent.role), + initialAgreementName) + + this._userState = newUserState + + // FIXME: If this fails, then the actor must be shut down. + store.insertUserState(newUserState) } private[this] def processModifyUser(event: IMEventModel): Unit = { @@ -147,18 +141,6 @@ class UserActor extends ReflectiveAquariumActor { def onProcessResourceEvent(event: ProcessResourceEvent): Unit = { } - override def postStop { - DEBUG("Actor[%s] stopping, saving state", self.uuid) - saveUserState - } - - override def preRestart(reason: Throwable) { - ERROR(reason, "preRestart: Actor[%s]", self.uuid) - } - - override def postRestart(reason: Throwable) { - ERROR(reason, "postRestart: Actor[%s]", self.uuid) - } private[this] def D_userID = { this._userState match { diff --git a/src/main/scala/gr/grnet/aquarium/store/UserStateStore.scala b/src/main/scala/gr/grnet/aquarium/store/UserStateStore.scala index 6f4e53d..f008133 100644 --- a/src/main/scala/gr/grnet/aquarium/store/UserStateStore.scala +++ b/src/main/scala/gr/grnet/aquarium/store/UserStateStore.scala @@ -49,28 +49,25 @@ import com.ckkloverdos.maybe.Maybe trait UserStateStore { /** - * Store a user state + * Store a user state. */ - def storeUserState(userState: UserState): Maybe[RecordID] + def insertUserState(userState: UserState): UserState - - def storeUserState2(userState: UserState): Maybe[UserState] = { - for { - recordID <- storeUserState(userState) - } yield { - userState.copy(id = recordID.id.toString) - } + def insertUserState2(userState: UserState): Maybe[UserState] = { + Maybe { insertUserState(userState) } } /** * Find a state by user ID */ - def findUserStateByUserId(userId: String): Maybe[UserState] + def findUserStateByUserID(userID: String): Option[UserState] + + def findLatestUserStateByUserID(userID: String): Option[UserState] /** * Find the most up-to-date user state for the particular billing period. */ - def findLatestUserStateForEndOfBillingMonth(userId: String, yearOfBillingMonth: Int, billingMonth: Int): Maybe[UserState] + def findLatestUserStateForEndOfBillingMonth(userId: String, yearOfBillingMonth: Int, billingMonth: Int): Option[UserState] /** * Delete a state for a user diff --git a/src/main/scala/gr/grnet/aquarium/store/memory/MemStore.scala b/src/main/scala/gr/grnet/aquarium/store/memory/MemStore.scala index 259b0f2..7da8cd0 100644 --- a/src/main/scala/gr/grnet/aquarium/store/memory/MemStore.scala +++ b/src/main/scala/gr/grnet/aquarium/store/memory/MemStore.scala @@ -49,6 +49,7 @@ import gr.grnet.aquarium.event.{WalletEntry, ResourceEvent, PolicyEntry} import gr.grnet.aquarium.converter.JsonTextFormat import gr.grnet.aquarium.util._ import gr.grnet.aquarium.event.im.{StdIMEvent, IMEventModel} +import org.bson.types.ObjectId /** * An implementation of various stores that persists data in memory. @@ -105,25 +106,34 @@ class MemStore extends UserStateStore //+ UserStateStore - def storeUserState(userState: UserState): Maybe[RecordID] = { - _userStates = userState.copy(id = idGen.nextUID()) :: _userStates - Just(RecordID(_userStates.head._id)) + def insertUserState(userState: UserState): UserState = { + _userStates = userState.copy(_id = new ObjectId()) :: _userStates + userState } - def findUserStateByUserId(userId: String) = { - _userStates.find(_.userID == userId) match { - case Some(userState) ⇒ - Just(userState) - case None ⇒ - NoVal + def findUserStateByUserID(userID: String) = { + _userStates.find(_.userID == userID) + } + + def findLatestUserStateByUserID(userID: String) = { + val goodOnes = _userStates.filter(_.userID == userID) + + goodOnes.sortWith { + case (us1, us2) ⇒ + us1.oldestSnapshotTime > us2.oldestSnapshotTime + } match { + case head :: _ ⇒ + Some(head) + case _ ⇒ + None } } - def findLatestUserStateForEndOfBillingMonth(userId: String, + def findLatestUserStateForEndOfBillingMonth(userID: String, yearOfBillingMonth: Int, - billingMonth: Int): Maybe[UserState] = { + billingMonth: Int): Option[UserState] = { val goodOnes = _userStates.filter { userState ⇒ - val f1 = userState.userID == userId + val f1 = userState.userID == userID val f2 = userState.isFullBillingMonthState val bm = userState.theFullBillingMonth val f3 = (bm ne null) && { @@ -138,9 +148,9 @@ class MemStore extends UserStateStore us1.oldestSnapshotTime > us2.oldestSnapshotTime } match { case head :: _ ⇒ - Just(head) + Some(head) case _ ⇒ - NoVal + None } } diff --git a/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala b/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala index b1f1421..e238883 100644 --- a/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala +++ b/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala @@ -202,34 +202,36 @@ class MongoDBStore( //-ResourceEventStore //+ UserStateStore - def storeUserState(userState: UserState): Maybe[RecordID] = { - MongoDBStore.storeUserState(userState, userStates) + def insertUserState(userState: UserState) = { + MongoDBStore.insertUserState(userState, userStates, MongoDBStore.jsonSupportToDBObject) } - def findUserStateByUserId(userId: String): Maybe[UserState] = { - Maybe { - val query = new BasicDBObject(UserStateJsonNames.userId, userId) - val cursor = userStates find query + def findUserStateByUserID(userID: String): Option[UserState] = { + val query = new BasicDBObject(UserStateJsonNames.userID, userID) + val cursor = userStates find query - try { - if(cursor.hasNext) - MongoDBStore.dbObjectToUserState(cursor.next()) - else - null - } finally { - cursor.close() - } + withCloseable(cursor) { cursor ⇒ + if(cursor.hasNext) + Some(MongoDBStore.dbObjectToUserState(cursor.next())) + else + None } } + + def findLatestUserStateByUserID(userID: String) = { + // FIXME: implement + null + } + def findLatestUserStateForEndOfBillingMonth(userId: String, yearOfBillingMonth: Int, - billingMonth: Int): Maybe[UserState] = { - NoVal // FIXME: implement + billingMonth: Int): Option[UserState] = { + None // FIXME: implement } def deleteUserState(userId: String) = { - val query = new BasicDBObject(UserStateJsonNames.userId, userId) + val query = new BasicDBObject(UserStateJsonNames.userID, userId) userStates.findAndRemove(query) } //- UserStateStore @@ -335,10 +337,6 @@ class MongoDBStore( MongoDBStore.createIMEventFromOther(event) } -// def storeUnparsed(json: String): Maybe[RecordID] = { -// MongoDBStore.storeJustJson(json, unparsedIMEvents) -// } - def insertIMEvent(event: IMEventModel): IMEvent = { val localEvent = MongoDBIMEvent.fromOther(event, new ObjectId()) MongoDBStore.insertObject(localEvent, imEvents, MongoDBStore.jsonSupportToDBObject) @@ -496,8 +494,8 @@ object MongoDBStore { } } - def storeUserState(userState: UserState, collection: DBCollection): Maybe[RecordID] = { - Maybe(storeAny[UserState](userState, collection, ResourceJsonNames.userId, _.userID, MongoDBStore.jsonSupportToDBObject)) + def storeUserState(userState: UserState, collection: DBCollection) = { + storeAny[UserState](userState, collection, ResourceJsonNames.userId, _.userID, MongoDBStore.jsonSupportToDBObject) } def storePolicyEntry(policyEntry: PolicyEntry, collection: DBCollection): Maybe[RecordID] = { @@ -519,6 +517,26 @@ object MongoDBStore { RecordID(dbObject.get("_id").toString) } + // FIXME: consolidate + def insertUserState[A <: UserState](obj: A, collection: DBCollection, serializer: A ⇒ DBObject) = { + val dbObject = serializer apply obj + val objectId = obj._id match { + case null ⇒ + val _id = new ObjectId() + dbObject.put("_id", _id) + _id + + case _id ⇒ + _id + } + + dbObject.put(JsonNames._id, objectId) + + collection.insert(dbObject, WriteConcern.JOURNAL_SAFE) + + obj + } + def insertObject[A <: MongoDBEventModel](obj: A, collection: DBCollection, serializer: (A) => DBObject) : ObjectId = { val dbObject = serializer apply obj val objectId = obj._id match { diff --git a/src/main/scala/gr/grnet/aquarium/user/UserState.scala b/src/main/scala/gr/grnet/aquarium/user/UserState.scala index 7e0aece..a382e0f 100644 --- a/src/main/scala/gr/grnet/aquarium/user/UserState.scala +++ b/src/main/scala/gr/grnet/aquarium/user/UserState.scala @@ -43,6 +43,7 @@ import gr.grnet.aquarium.event.{NewWalletEntry, WalletEntry} import gr.grnet.aquarium.converter.{JsonTextFormat, StdConverters} import gr.grnet.aquarium.AquariumException import gr.grnet.aquarium.event.im.IMEventModel +import org.bson.types.ObjectId /** @@ -171,7 +172,7 @@ case class UserState( // The user state we used to compute this one. Normally the (cached) // state at the beginning of the billing period. parentUserStateId: Option[String] = None, - id: String = "" + _id: ObjectId = new ObjectId() ) extends JsonSupport { private[this] def _allSnapshots: List[Long] = { @@ -188,11 +189,9 @@ case class UserState( def newestSnapshotTime: Long = _allSnapshots max - def _id = id def idOpt: Option[String] = _id match { case null ⇒ None - case "" ⇒ None - case _id ⇒ Some(_id) + case _id ⇒ Some(_id.toString) } // def userCreationDate = new Date(userCreationMillis) @@ -250,7 +249,7 @@ object UserState { object JsonNames { final val _id = "_id" - final val userId = "userId" + final val userID = "userID" } } diff --git a/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala b/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala index 71918cb..cb505cb 100644 --- a/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala +++ b/src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala @@ -137,7 +137,7 @@ class UserStateComputations extends Loggable { // NOTE: Reason here will be: InitialUserStateCalculation val initialUserState0 = createInitialUserStateFrom(currentUserState) - val initialUserStateM = userStateStore.storeUserState2(initialUserState0) + val initialUserStateM = userStateStore.insertUserState2(initialUserState0) clog.debug("Returning ZERO state [_idM=%s] %s".format(initialUserStateM.map(_._id), initialUserStateM)) clog.end() @@ -145,10 +145,16 @@ class UserStateComputations extends Loggable { initialUserStateM } else { // Ask DB cache for the latest known user state for this billing period - val latestUserStateM = userStateStore.findLatestUserStateForEndOfBillingMonth( + val latestUserStateM = Maybe { userStateStore.findLatestUserStateForEndOfBillingMonth( userId, billingMonthInfo.year, - billingMonthInfo.month) + billingMonthInfo.month) match { + + case Some(latestUserState) ⇒ + latestUserState + case None ⇒ + null + }} latestUserStateM match { case NoVal ⇒ @@ -527,7 +533,7 @@ class UserStateComputations extends Loggable { clog.debug("calculationReason = %s", calculationReason) if(calculationReason.shouldStoreUserState) { - val storedUserStateM = userStateStore.storeUserState2(_workingUserState) + val storedUserStateM = userStateStore.insertUserState2(_workingUserState) storedUserStateM match { case Just(storedUserState) ⇒ clog.info("Saved [_id=%s] %s", storedUserState._id, storedUserState) diff --git a/src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala b/src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala index 3b60369..b81cc6d 100644 --- a/src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala +++ b/src/test/scala/gr/grnet/aquarium/user/UserStateComputationsTest.scala @@ -258,7 +258,7 @@ aquariumpolicy: private[this] def showUserState(clog: ContextualLogger, userState: UserState) { - val id = userState.id + val id = userState._id val parentId = userState.parentUserStateId val credits = userState.creditsSnapshot.creditAmount val newWalletEntries = userState.newWalletEntries.map(_.toDebugString) -- 1.7.10.4