import gr.grnet.aquarium.Configurator
import gr.grnet.aquarium.user._
+import gr.grnet.aquarium.util.shortClassNameOf
import gr.grnet.aquarium.util.date.TimeHelpers
import gr.grnet.aquarium.logic.accounting.RoleAgreements
import gr.grnet.aquarium.actor.message.service.router._
private[this] def _timestampTheshold =
_configurator.props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
- /**
- * Create an empty state for a user
- */
- def createInitialState(userID: String) = {
- this._userState = DefaultUserStateComputations.createInitialUserState(userID, 0L, true, 0.0)
- }
-
-
- /**
- * Persist current user state
- */
- private[this] def saveUserState(): Unit = {
- _configurator.storeProvider.userStateStore.storeUserState(this._userState) match {
- case Just(record) => record
- case NoVal => ERROR("Unknown error saving state")
- case Failed(e) =>
- ERROR("Saving state failed: %s".format(e));
- }
- }
def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
}
def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
}
- private[this] def processCreateUser(event: IMEventModel): Unit = {
- val userId = event.userID
- DEBUG("Creating user from state %s", event)
- val usersDB = _configurator.storeProvider.userStateStore
- usersDB.findUserStateByUserId(userId) match {
- case Just(userState) ⇒
- WARN("User already created, state = %s".format(userState))
- case failed@Failed(e) ⇒
- ERROR("[%s] %s", e.getClass.getName, e.getMessage)
- case NoVal ⇒
- val agreement = RoleAgreements.agreementForRole(event.role)
- DEBUG("User %s assigned agreement %s".format(userId, agreement.name))
-
- this._userState = DefaultUserStateComputations.createInitialUserState(
- userId,
- event.occurredMillis,
- event.isActive, 0.0, List(event.role), agreement.name)
- saveUserState
- DEBUG("Created and stored %s", this._userState)
+ private[this] def _computeAgreementForNewUser(imEvent: IMEventModel): String = {
+ // FIXME: Implement based on the role
+ "default"
+ }
+
+ private[this] def processCreateUser(imEvent: IMEventModel): Unit = {
+ val userID = imEvent.userID
+ val store = _configurator.storeProvider.userStateStore
+ // try find user state. normally should ot exist
+ val latestUserStateOpt = store.findLatestUserStateByUserID(userID)
+ if(latestUserStateOpt.isDefined) {
+ logger.error("Got %s(%s, %s) but user already exists. Ingoring".format(
+ userID,
+ shortClassNameOf(imEvent),
+ imEvent.eventType))
+
+ return
}
+
+ val initialAgreementName = _computeAgreementForNewUser(imEvent)
+ val newUserState = DefaultUserStateComputations.createInitialUserState(
+ userID,
+ imEvent.occurredMillis,
+ imEvent.isActive,
+ 0.0,
+ List(imEvent.role),
+ initialAgreementName)
+
+ this._userState = newUserState
+
+ // FIXME: If this fails, then the actor must be shut down.
+ store.insertUserState(newUserState)
}
private[this] def processModifyUser(event: IMEventModel): Unit = {
def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
}
- override def postStop {
- DEBUG("Actor[%s] stopping, saving state", self.uuid)
- saveUserState
- }
-
- override def preRestart(reason: Throwable) {
- ERROR(reason, "preRestart: Actor[%s]", self.uuid)
- }
-
- override def postRestart(reason: Throwable) {
- ERROR(reason, "postRestart: Actor[%s]", self.uuid)
- }
private[this] def D_userID = {
this._userState match {
trait UserStateStore {
/**
- * Store a user state
+ * Store a user state.
*/
- def storeUserState(userState: UserState): Maybe[RecordID]
+ def insertUserState(userState: UserState): UserState
-
- def storeUserState2(userState: UserState): Maybe[UserState] = {
- for {
- recordID <- storeUserState(userState)
- } yield {
- userState.copy(id = recordID.id.toString)
- }
+ def insertUserState2(userState: UserState): Maybe[UserState] = {
+ Maybe { insertUserState(userState) }
}
/**
* Find a state by user ID
*/
- def findUserStateByUserId(userId: String): Maybe[UserState]
+ def findUserStateByUserID(userID: String): Option[UserState]
+
+ def findLatestUserStateByUserID(userID: String): Option[UserState]
/**
* Find the most up-to-date user state for the particular billing period.
*/
- def findLatestUserStateForEndOfBillingMonth(userId: String, yearOfBillingMonth: Int, billingMonth: Int): Maybe[UserState]
+ def findLatestUserStateForEndOfBillingMonth(userId: String, yearOfBillingMonth: Int, billingMonth: Int): Option[UserState]
/**
* Delete a state for a user
import gr.grnet.aquarium.converter.JsonTextFormat
import gr.grnet.aquarium.util._
import gr.grnet.aquarium.event.im.{StdIMEvent, IMEventModel}
+import org.bson.types.ObjectId
/**
* An implementation of various stores that persists data in memory.
//+ UserStateStore
- def storeUserState(userState: UserState): Maybe[RecordID] = {
- _userStates = userState.copy(id = idGen.nextUID()) :: _userStates
- Just(RecordID(_userStates.head._id))
+ def insertUserState(userState: UserState): UserState = {
+ _userStates = userState.copy(_id = new ObjectId()) :: _userStates
+ userState
}
- def findUserStateByUserId(userId: String) = {
- _userStates.find(_.userID == userId) match {
- case Some(userState) ⇒
- Just(userState)
- case None ⇒
- NoVal
+ def findUserStateByUserID(userID: String) = {
+ _userStates.find(_.userID == userID)
+ }
+
+ def findLatestUserStateByUserID(userID: String) = {
+ val goodOnes = _userStates.filter(_.userID == userID)
+
+ goodOnes.sortWith {
+ case (us1, us2) ⇒
+ us1.oldestSnapshotTime > us2.oldestSnapshotTime
+ } match {
+ case head :: _ ⇒
+ Some(head)
+ case _ ⇒
+ None
}
}
- def findLatestUserStateForEndOfBillingMonth(userId: String,
+ def findLatestUserStateForEndOfBillingMonth(userID: String,
yearOfBillingMonth: Int,
- billingMonth: Int): Maybe[UserState] = {
+ billingMonth: Int): Option[UserState] = {
val goodOnes = _userStates.filter { userState ⇒
- val f1 = userState.userID == userId
+ val f1 = userState.userID == userID
val f2 = userState.isFullBillingMonthState
val bm = userState.theFullBillingMonth
val f3 = (bm ne null) && {
us1.oldestSnapshotTime > us2.oldestSnapshotTime
} match {
case head :: _ ⇒
- Just(head)
+ Some(head)
case _ ⇒
- NoVal
+ None
}
}
//-ResourceEventStore
//+ UserStateStore
- def storeUserState(userState: UserState): Maybe[RecordID] = {
- MongoDBStore.storeUserState(userState, userStates)
+ def insertUserState(userState: UserState) = {
+ MongoDBStore.insertUserState(userState, userStates, MongoDBStore.jsonSupportToDBObject)
}
- def findUserStateByUserId(userId: String): Maybe[UserState] = {
- Maybe {
- val query = new BasicDBObject(UserStateJsonNames.userId, userId)
- val cursor = userStates find query
+ def findUserStateByUserID(userID: String): Option[UserState] = {
+ val query = new BasicDBObject(UserStateJsonNames.userID, userID)
+ val cursor = userStates find query
- try {
- if(cursor.hasNext)
- MongoDBStore.dbObjectToUserState(cursor.next())
- else
- null
- } finally {
- cursor.close()
- }
+ withCloseable(cursor) { cursor ⇒
+ if(cursor.hasNext)
+ Some(MongoDBStore.dbObjectToUserState(cursor.next()))
+ else
+ None
}
}
+
+ def findLatestUserStateByUserID(userID: String) = {
+ // FIXME: implement
+ null
+ }
+
def findLatestUserStateForEndOfBillingMonth(userId: String,
yearOfBillingMonth: Int,
- billingMonth: Int): Maybe[UserState] = {
- NoVal // FIXME: implement
+ billingMonth: Int): Option[UserState] = {
+ None // FIXME: implement
}
def deleteUserState(userId: String) = {
- val query = new BasicDBObject(UserStateJsonNames.userId, userId)
+ val query = new BasicDBObject(UserStateJsonNames.userID, userId)
userStates.findAndRemove(query)
}
//- UserStateStore
MongoDBStore.createIMEventFromOther(event)
}
-// def storeUnparsed(json: String): Maybe[RecordID] = {
-// MongoDBStore.storeJustJson(json, unparsedIMEvents)
-// }
-
def insertIMEvent(event: IMEventModel): IMEvent = {
val localEvent = MongoDBIMEvent.fromOther(event, new ObjectId())
MongoDBStore.insertObject(localEvent, imEvents, MongoDBStore.jsonSupportToDBObject)
}
}
- def storeUserState(userState: UserState, collection: DBCollection): Maybe[RecordID] = {
- Maybe(storeAny[UserState](userState, collection, ResourceJsonNames.userId, _.userID, MongoDBStore.jsonSupportToDBObject))
+ def storeUserState(userState: UserState, collection: DBCollection) = {
+ storeAny[UserState](userState, collection, ResourceJsonNames.userId, _.userID, MongoDBStore.jsonSupportToDBObject)
}
def storePolicyEntry(policyEntry: PolicyEntry, collection: DBCollection): Maybe[RecordID] = {
RecordID(dbObject.get("_id").toString)
}
+ // FIXME: consolidate
+ def insertUserState[A <: UserState](obj: A, collection: DBCollection, serializer: A ⇒ DBObject) = {
+ val dbObject = serializer apply obj
+ val objectId = obj._id match {
+ case null ⇒
+ val _id = new ObjectId()
+ dbObject.put("_id", _id)
+ _id
+
+ case _id ⇒
+ _id
+ }
+
+ dbObject.put(JsonNames._id, objectId)
+
+ collection.insert(dbObject, WriteConcern.JOURNAL_SAFE)
+
+ obj
+ }
+
def insertObject[A <: MongoDBEventModel](obj: A, collection: DBCollection, serializer: (A) => DBObject) : ObjectId = {
val dbObject = serializer apply obj
val objectId = obj._id match {
import gr.grnet.aquarium.converter.{JsonTextFormat, StdConverters}
import gr.grnet.aquarium.AquariumException
import gr.grnet.aquarium.event.im.IMEventModel
+import org.bson.types.ObjectId
/**
// The user state we used to compute this one. Normally the (cached)
// state at the beginning of the billing period.
parentUserStateId: Option[String] = None,
- id: String = ""
+ _id: ObjectId = new ObjectId()
) extends JsonSupport {
private[this] def _allSnapshots: List[Long] = {
def newestSnapshotTime: Long = _allSnapshots max
- def _id = id
def idOpt: Option[String] = _id match {
case null ⇒ None
- case "" ⇒ None
- case _id ⇒ Some(_id)
+ case _id ⇒ Some(_id.toString)
}
// def userCreationDate = new Date(userCreationMillis)
object JsonNames {
final val _id = "_id"
- final val userId = "userId"
+ final val userID = "userID"
}
}
// NOTE: Reason here will be: InitialUserStateCalculation
val initialUserState0 = createInitialUserStateFrom(currentUserState)
- val initialUserStateM = userStateStore.storeUserState2(initialUserState0)
+ val initialUserStateM = userStateStore.insertUserState2(initialUserState0)
clog.debug("Returning ZERO state [_idM=%s] %s".format(initialUserStateM.map(_._id), initialUserStateM))
clog.end()
initialUserStateM
} else {
// Ask DB cache for the latest known user state for this billing period
- val latestUserStateM = userStateStore.findLatestUserStateForEndOfBillingMonth(
+ val latestUserStateM = Maybe { userStateStore.findLatestUserStateForEndOfBillingMonth(
userId,
billingMonthInfo.year,
- billingMonthInfo.month)
+ billingMonthInfo.month) match {
+
+ case Some(latestUserState) ⇒
+ latestUserState
+ case None ⇒
+ null
+ }}
latestUserStateM match {
case NoVal ⇒
clog.debug("calculationReason = %s", calculationReason)
if(calculationReason.shouldStoreUserState) {
- val storedUserStateM = userStateStore.storeUserState2(_workingUserState)
+ val storedUserStateM = userStateStore.insertUserState2(_workingUserState)
storedUserStateM match {
case Just(storedUserState) ⇒
clog.info("Saved [_id=%s] %s", storedUserState._id, storedUserState)
private[this]
def showUserState(clog: ContextualLogger, userState: UserState) {
- val id = userState.id
+ val id = userState._id
val parentId = userState.parentUserStateId
val credits = userState.creditsSnapshot.creditAmount
val newWalletEntries = userState.newWalletEntries.map(_.toDebugString)