/*
- * Copyright 2011 GRNET S.A. All rights reserved.
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
import gr.grnet.aquarium.actor._
import gr.grnet.aquarium.Configurator
-import java.util.Date
import gr.grnet.aquarium.processor.actor._
-import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
-import gr.grnet.aquarium.logic.accounting.{AccountingException, Policy, Accounting}
-import gr.grnet.aquarium.util.{TimeHelpers, Loggable}
import gr.grnet.aquarium.user._
-import gr.grnet.aquarium.logic.accounting.dsl.{DSLResource, DSLSimpleResource, DSLComplexResource}
-import gr.grnet.aquarium.logic.events.{AquariumEvent, UserEvent, WalletEntry, ResourceEvent}
+import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry}
+import java.util.Date
+import gr.grnet.aquarium.util.Loggable
+import gr.grnet.aquarium.util.date.TimeHelpers
+import com.ckkloverdos.maybe.{Failed, NoVal, Just}
+import gr.grnet.aquarium.logic.accounting.RoleAgreements
+import gr.grnet.aquarium.messaging.AkkaAMQP
/**
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
-class UserActor extends AquariumActor with Loggable with Accounting {
+class UserActor extends AquariumActor
+ with AkkaAMQP
+ with ReflectiveAquariumActor
+ with Loggable {
@volatile
private[this] var _userId: String = _
@volatile
@volatile
private[this] var _timestampTheshold: Long = _
+ private[this] lazy val messenger = producer("aquarium") // FIXME: Read this from configuration
+
def role = UserActorRole
private[this] def _configurator: Configurator = Configurator.MasterConfigurator
- private[this] def processResourceEvent(resourceEvent: ResourceEvent, checkForOlderEvents: Boolean): Unit = {
- if(checkForOlderEvents) {
- DEBUG("Checking for events older than %s" format resourceEvent)
- processOlderResourceEvents(resourceEvent)
- }
-
- justProcessTheResourceEvent(resourceEvent, "ACTUAL")
+ /**
+ * Replay the event log for all events that affect the user state.
+ */
+ def rebuildState(from: Long, to: Long): Unit = {
+ val start = System.currentTimeMillis()
+ if (_userState == null)
+ createBlankState
+
+ //Rebuild state from user events
+ val usersDB = _configurator.storeProvider.userEventStore
+ val userEvents = usersDB.findUserEventsByUserId(_userId)
+ val numUserEvents = userEvents.size
+ _userState = replayUserEvents(_userState, userEvents, from, to)
+
+ //Rebuild state from resource events
+ val eventsDB = _configurator.storeProvider.resourceEventStore
+ val resourceEvents = eventsDB.findResourceEventsByUserIdAfterTimestamp(_userId, from)
+ val numResourceEvents = resourceEvents.size
+// _userState = replayResourceEvents(_userState, resourceEvents, from, to)
+
+ //Rebuild state from wallet entries
+ val wallet = _configurator.storeProvider.walletEntryStore
+ val walletEnties = wallet.findWalletEntriesAfter(_userId, new Date(from))
+ val numWalletEntries = walletEnties.size
+ _userState = replayWalletEntries(_userState, walletEnties, from, to)
+
+ INFO(("Rebuilt state from %d events (%d user events, " +
+ "%d resource events, %d wallet entries) in %d msec").format(
+ numUserEvents + numResourceEvents + numWalletEntries,
+ numUserEvents, numResourceEvents, numWalletEntries,
+ System.currentTimeMillis() - start))
}
/**
- * Find and process older resource events.
- *
- * Older resource events are found based on the latest credit calculation, that is the latest
- * wallet entry. If there are resource events past that wallet entry, then we deduce that no wallet entries
- * have been calculated for these resource events and start from there.
+ * Create an empty state for a user
*/
- private[this] def processOlderResourceEvents(resourceEvent: ResourceEvent): Unit = {
- assert(_userId == resourceEvent.userId)
- val rceId = resourceEvent.id
- val userId = resourceEvent.userId
- val resourceEventStore = _configurator.resourceEventStore
- val walletEntriesStore = _configurator.walletStore
-
- // 1. Find latest wallet entry
- val latestWalletEntriesM = walletEntriesStore.findLatestUserWalletEntries(userId)
- latestWalletEntriesM match {
- case Just(latestWalletEntries) ⇒
- // The time on which we base the selection of the older events
- val selectionTime = latestWalletEntries.head.occurredMillis
-
- // 2. Now find all resource events past the time of the latest wallet entry.
- // These events have not been processed, except probably those ones
- // that have the same `occurredMillis` with `selectionTime`
- val oldRCEvents = resourceEventStore.findResourceEventsByUserIdAfterTimestamp(userId, selectionTime)
-
- // 3. Filter out those resource events for which no wallet entry has actually been
- // produced.
- val rcEventsToProcess = for {
- oldRCEvent <- oldRCEvents
- oldRCEventId = oldRCEvent.id
- latestWalletEntry <- latestWalletEntries if(!latestWalletEntry.fromResourceEvent(oldRCEventId) && rceId != oldRCEventId)
- } yield {
- oldRCEvent
- }
-
- DEBUG("Found %s events older than %s".format(rcEventsToProcess.size, resourceEvent))
-
- for {
- rcEventToProcess <- rcEventsToProcess
- } {
- justProcessTheResourceEvent(rcEventToProcess, "OLDER")
- }
- case NoVal ⇒
- DEBUG("No events to process older than %s".format(resourceEvent))
- case Failed(e, m) ⇒
- ERROR("[%s][%s] %s".format(e.getClass.getName, m, e.getMessage))
- }
+ def createBlankState = {
+ this._userState = DefaultUserStateComputations.createInitialUserState(this._userId, 0L, true, 0.0)
}
- private[this] def _storeWalletEntries(walletEntries: List[WalletEntry]): Unit = {
- val walletEntriesStore = _configurator.walletStore
- for(walletEntry <- walletEntries) {
- walletEntriesStore.storeWalletEntry(walletEntry)
- }
+ /**
+ * Replay user events on the provided user state
+ */
+ def replayUserEvents(initState: UserState, events: List[UserEvent],
+ from: Long, to: Long): UserState = {
+ initState
}
- private[this] def _calcNewCreditSum(walletEntries: List[WalletEntry]): Double = {
- val newCredits = for {
- walletEntry <- walletEntries if(walletEntry.finalized)
- } yield walletEntry.value.toDouble
- newCredits.sum
+ /**
+ * Replay wallet entries on the provided user state
+ */
+ def replayWalletEntries(initState: UserState, events: List[WalletEntry],
+ from: Long, to: Long): UserState = {
+ initState
}
/**
- * Process the resource event as if nothing else matters. Just do it.
+ * Persist current user state
*/
- private[this] def justProcessTheResourceEvent(ev: ResourceEvent, logLabel: String): Unit = {
- val start = System.currentTimeMillis
- DEBUG("Processing [%s] %s".format(logLabel, ev))
-
- // Initially, the user state (regarding resources) is empty.
- // So we have to compensate for both a totally empty resource state
- // and the update with new values.
-
- // 1. Find the resource definition
- val policy = Policy.policy
- policy.findResource(ev.resource) match {
- case Some(resource) ⇒
- // 2. Get the instance id and value for the resource
- val instanceIdM = resource match {
- // 2.1 If it is complex, from the details map, get the field which holds the instanceId
- case DSLComplexResource(name, unit, costPolicy, descriminatorField) ⇒
- ev.details.get(descriminatorField) match {
- case Some(instanceId) ⇒
- Just(instanceId)
- case None ⇒
- // We should have some value under this key here....
- Failed(throw new AccountingException("")) //TODO: See what to do here
- }
- // 2.2 If it is simple, ...
- case DSLSimpleResource(name, unit, costPolicy) ⇒
- // ... by convention, the instanceId of a simple resource is just "1"
- // @see [[gr.grnet.aquarium.user.OwnedResourcesSnapshot]]
- Just("1")
- }
-
- // 3. Did we get a valid instanceId?
- instanceIdM match {
- // 3.1 Yes, time to get/update the current state
- case Just(instanceId) ⇒
- val oldOwnedResources = _userState.ownedResources
- // Find or create the new resource instance map
- val oldOwnedResourcesData = oldOwnedResources.data
- val oldRCInstanceMap = oldOwnedResourcesData.get(resource) match {
- case Some(resourceMap) ⇒ resourceMap
- case None ⇒ Map[String, Float]()
- }
- // Update the new value in the resource instance map
- val newRCInstanceMap: Map[String, Float] = oldRCInstanceMap.updated(instanceId, ev.value)
-
- val newOwnedResourcesData = oldOwnedResourcesData.updated(resource, newRCInstanceMap)
-
- // A. First state diff: the modified resource value
- val StateChangeMillis = TimeHelpers.nowMillis
- val newOwnedResources = oldOwnedResources.copy(
- data = newOwnedResourcesData,
- snapshotTime = StateChangeMillis
- )
-
- // Calculate the wallet entries generated from this resource event
- _userState.maybeDSLAgreement match {
- case Just(agreement) ⇒
- // TODO: the snapshot time should be per instanceId?
- // TODO: Related events
- val walletEntriesM = chargeEvent(ev, agreement, ev.value,
- new Date(oldOwnedResources.snapshotTime),
- findRelatedEntries(resource, ev.getInstanceId(policy)))
- walletEntriesM match {
- case Just(walletEntries) ⇒
- _storeWalletEntries(walletEntries)
-
- // B. Second state diff: the new credits
- val newCreditsSum = _calcNewCreditSum(walletEntries)
- val oldCredits = _userState.safeCredits.data
- val newCredits = CreditSnapshot(oldCredits + newCreditsSum, StateChangeMillis)
-
- // Finally, the userState change
- DEBUG("Credits = %s".format(this._userId, newCredits))
- DEBUG("Resources = %s".format(this._userId, newOwnedResources))
- this._userState = this._userState.copy(
- credits = newCredits,
- ownedResources = newOwnedResources
- )
- case NoVal ⇒
- DEBUG("No wallet entries generated for %s".format(ev))
- case failed @ Failed(e, m) ⇒
- failed
- }
-
- case NoVal ⇒
- Failed(new UserDataSnapshotException("No agreement snapshot found for user %s".format(this._userId)))
- case failed @ Failed(e, m) ⇒
- failed
- }
- // 3.2 No, no luck, this is an error
- case NoVal ⇒
- Failed(new UserDataSnapshotException("No instanceId for resource %s of user %s".format(resource, this._userId)))
- case failed @ Failed(e, m) ⇒
- failed
- }
- // No resource definition found, this is an error
- case None ⇒ // Policy.policy.findResource(ev.resource)
- Failed(new UserDataSnapshotException("No resource %s found for user %s".format(ev.resource, this._userId)))
+ private[this] def saveUserState(): Unit = {
+ _configurator.storeProvider.userStateStore.storeUserState(this._userState) match {
+ case Just(record) => record
+ case NoVal => ERROR("Unknown error saving state")
+ case Failed(e) =>
+ ERROR("Saving state failed: %s".format(e));
}
+ }
- DEBUG("Finished %s time: %d ms".format(ev.id, System.currentTimeMillis - start))
+ def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
+ this._timestampTheshold = event.props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
+ INFO("Setup my timestampTheshold = %s", this._timestampTheshold)
+ }
+
+ def onUserActorInitWithUserId(event: UserActorInitWithUserId): Unit = {
+ this._userId = event.userId
+ DEBUG("Actor starting, loading state")
+ }
+
+ def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
+ val resourceEvent = event.rce
+ if(resourceEvent.userID != this._userId) {
+ ERROR("Received %s but my userId = %s".format(event, this._userId))
+ } else {
+ //ensureUserState()
+// calcWalletEntries()
+ //processResourceEvent(resourceEvent, true)
+ }
}
private[this] def processCreateUser(event: UserEvent): Unit = {
- val userId = event.userId
+ val userId = event.userID
DEBUG("Creating user from state %s", event)
val usersDB = _configurator.storeProvider.userStateStore
usersDB.findUserStateByUserId(userId) match {
case Just(userState) ⇒
WARN("User already created, state = %s".format(userState))
- case failed @ Failed(e, m) ⇒
- ERROR("[%s][%s] %s", e.getClass.getName, e.getMessage, m)
+ case failed @ Failed(e) ⇒
+ ERROR("[%s] %s", e.getClass.getName, e.getMessage)
case NoVal ⇒
- // OK. Create a default UserState and store it
- val now = TimeHelpers.nowMillis
- val agreementOpt = Policy.policy.findAgreement("default")
-
- if(agreementOpt.isEmpty) {
- ERROR("No default agreement found. Cannot initialize user state")
- } else {
- this._userState = UserState(
- userId,
- ActiveSuspendedSnapshot(event.isStateActive, now),
- CreditSnapshot(0, now),
- AgreementSnapshot(agreementOpt.get.name, now),
- RolesSnapshot(event.roles, now),
- PaymentOrdersSnapshot(Nil, now),
- OwnedGroupsSnapshot(Nil, now),
- GroupMembershipsSnapshot(Nil, now),
- OwnedResourcesSnapshot(Map(), now)
- )
-
- usersDB.storeUserState(this._userState)
- DEBUG("Created and stored %s", this._userState)
- }
+ val agreement = RoleAgreements.agreementForRole(event.role)
+ DEBUG("User %s assigned agreement %s".format(userId, agreement.name))
+
+ this._userState = DefaultUserStateComputations.createInitialUserState(
+ userId,
+ event.occurredMillis,
+ event.isActive, 0.0, List(event.role), agreement.name)
+ saveUserState
+ DEBUG("Created and stored %s", this._userState)
}
}
- private[this] def findRelatedEntries(res: DSLResource, instid: String): List[WalletEntry] = {
- val walletDB = _configurator.storeProvider.walletEntryStore
- walletDB.findPreviousEntry(_userId, res.name, instid, Some(false))
- }
-
-
private[this] def processModifyUser(event: UserEvent): Unit = {
val now = TimeHelpers.nowMillis
- val newActive = ActiveSuspendedSnapshot(event.isStateActive, now)
+ val newActive = ActiveStateSnapshot(event.isStateActive, now)
DEBUG("New active status = %s".format(newActive))
- this._userState = this._userState.copy( active = newActive )
+ this._userState = this._userState.copy(activeStateSnapshot = newActive)
}
- /**
- * Use the provided [[gr.grnet.aquarium.logic.events.UserEvent]] to change any user state.
- */
- private[this] def processUserEvent(event: UserEvent): Unit = {
- if(event.isCreateUser) {
- processCreateUser(event)
- } else if(event.isModifyUser) {
- processModifyUser(event)
+
+ def onProcessUserEvent(event: ProcessUserEvent): Unit = {
+ val userEvent = event.ue
+ if(userEvent.userID != this._userId) {
+ ERROR("Received %s but my userId = %s".format(userEvent, this._userId))
+ } else {
+ if(userEvent.isCreateUser) {
+ processCreateUser(userEvent)
+ } else if(userEvent.isModifyUser) {
+ processModifyUser(userEvent)
+ }
}
}
- /**
- * Try to load from the DB the latest known info (snapshot data) for the given user.
- */
- private[this] def findUserState(userId: String): Maybe[UserState] = {
- val usersDB = _configurator.storeProvider.userStateStore
- usersDB.findUserStateByUserId(userId)
+ def onRequestUserBalance(event: RequestUserBalance): Unit = {
+ val userId = event.userId
+ val timestamp = event.timestamp
+
+ if (System.currentTimeMillis() - _userState.newestSnapshotTime > 60 * 1000)
+ {
+// calcWalletEntries()
+ }
+ self reply UserResponseGetBalance(userId, _userState.creditsSnapshot.creditAmount)
}
- /**
- * Tries to makes sure that the internal user state exists.
- *
- * May contact the [[gr.grnet.aquarium.store.UserStateStore]] for that.
- *
- */
- private[this] def ensureUserState(): Unit = {
- if(null eq this._userState) {
- findUserState(this._userId) match {
- case Just(userState) ⇒
- DEBUG("Loaded user state %s from DB", userState)
- this._userState = userState
- case Failed(e, m) ⇒
- ERROR("While loading user state from DB: [%s][%s] %s", e.getClass.getName, e.getMessage, m)
- case NoVal ⇒
- WARN("Request for unknown (to Aquarium) user")
- }
+ def onUserRequestGetState(event: UserRequestGetState): Unit = {
+ val userId = event.userId
+ if(this._userId != userId) {
+ ERROR("Received %s but my userId = %s".format(event, this._userId))
+ // TODO: throw an exception here
+ } else {
+ // FIXME: implement
+ ERROR("FIXME: Should have properly computed the user state")
+// ensureUserState()
+ self reply UserResponseGetState(userId, this._userState)
}
}
- protected def receive: Receive = {
- case UserActorStop ⇒
- // TODO: Check if this is OK with the rest of the semantics
- self.stop()
-
- case m @ AquariumPropertiesLoaded(props) ⇒
- this._timestampTheshold = props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
- INFO("Setup my timestampTheshold = %s", this._timestampTheshold)
-
- case m @ UserActorInitWithUserId(userId) ⇒
- this._userId = userId
- ensureUserState()
-
- case m @ ProcessResourceEvent(resourceEvent) ⇒
- if(resourceEvent.userId != this._userId) {
- ERROR("Received %s but my userId = %s".format(m, this._userId))
- } else {
- ensureUserState()
- processResourceEvent(resourceEvent, true)
- }
+ def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
+ }
- case m @ ProcessUserEvent(userEvent) ⇒
- if(userEvent.userId != this._userId) {
- ERROR("Received %s but my userId = %s".format(m, this._userId))
- } else {
- ensureUserState()
- processUserEvent(userEvent)
- }
+ override def postStop {
+ DEBUG("Stopping, saving state")
+ saveUserState
+ }
- case m @ RequestUserBalance(userId, timestamp) ⇒
- if(this._userId != userId) {
- ERROR("Received %s but my userId = %s".format(m, this._userId))
- // TODO: throw an exception here
- } else {
- // This is the big party.
- // Get the user state, if it exists and make sure it is not stale.
- ensureUserState()
-
- // Do we have a user state?
- if(_userState ne null) {
- // Yep, we do. See what there is inside it.
- val credits = _userState.credits
- val creditsTimestamp = credits.snapshotTime
-
- // Check if data is stale
- if(creditsTimestamp + _timestampTheshold > timestamp) {
- // No, it's OK
- self reply UserResponseGetBalance(userId, credits.data)
- } else {
- // Yep, data is stale and must recompute balance
- // FIXME: implement
- ERROR("FIXME: Should have computed a new value for %s".format(credits))
- self reply UserResponseGetBalance(userId, credits.data)
- }
- } else {
- // No user state exists. This is an error.
- val errMsg = "Could not load user state for %s".format(m)
- ERROR(errMsg)
- self reply ResponseUserBalance(userId, 0, Some(errMsg))
- }
- }
+ override def preRestart(reason: Throwable) {
+ DEBUG("Actor failed, restarting")
+ }
- case m @ UserRequestGetState(userId, timestamp) ⇒
- if(this._userId != userId) {
- ERROR("Received %s but my userId = %s".format(m, this._userId))
- // TODO: throw an exception here
- } else {
- // FIXME: implement
- ERROR("FIXME: Should have properly computed the user state")
- self reply UserResponseGetState(userId, this._userState)
- }
+ override def postRestart(reason: Throwable) {
+ DEBUG("Actor restarted succesfully")
}
+ def knownMessageTypes = UserActor.KnownMessageTypes
+
private[this] def DEBUG(fmt: String, args: Any*) =
logger.debug("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))
private[this] def ERROR(fmt: String, args: Any*) =
logger.error("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))
+}
+
+object UserActor {
+ final val KnownMessageTypes = List(
+ classOf[AquariumPropertiesLoaded],
+ classOf[UserActorInitWithUserId],
+ classOf[ProcessResourceEvent],
+ classOf[ProcessUserEvent],
+ classOf[RequestUserBalance],
+ classOf[UserRequestGetState],
+ classOf[ActorProviderConfigured]
+ )
}
\ No newline at end of file