/*
- * Copyright 2011 GRNET S.A. All rights reserved.
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
import gr.grnet.aquarium.actor._
import gr.grnet.aquarium.Configurator
import gr.grnet.aquarium.processor.actor._
-import gr.grnet.aquarium.logic.accounting.{AccountingException, Policy, Accounting}
import gr.grnet.aquarium.user._
-import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry, ResourceEvent}
+import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry}
import java.util.Date
-import gr.grnet.aquarium.util.{DateUtils, Loggable}
-import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLResource, DSLComplexResource}
+import gr.grnet.aquarium.util.Loggable
import gr.grnet.aquarium.util.date.TimeHelpers
-import com.ckkloverdos.maybe.{Maybe, Failed, NoVal, Just}
+import com.ckkloverdos.maybe.{Failed, NoVal, Just}
+import gr.grnet.aquarium.logic.accounting.RoleAgreements
+import gr.grnet.aquarium.messaging.AkkaAMQP
/**
*/
class UserActor extends AquariumActor
- with Loggable with Accounting with DateUtils {
+ with AkkaAMQP
+ with ReflectiveAquariumActor
+ with Loggable {
@volatile
private[this] var _userId: String = _
@volatile
@volatile
private[this] var _timestampTheshold: Long = _
+ private[this] lazy val messenger = producer("aquarium") // FIXME: Read this from configuration
+
def role = UserActorRole
private[this] def _configurator: Configurator = Configurator.MasterConfigurator
- private[this] def processCreateUser(event: UserEvent): Unit = {
- val userId = event.userId
- DEBUG("Creating user from state %s", event)
- val usersDB = _configurator.storeProvider.userStateStore
- usersDB.findUserStateByUserId(userId) match {
- case Just(userState) ⇒
- WARN("User already created, state = %s".format(userState))
- case failed @ Failed(e, m) ⇒
- ERROR("[%s][%s] %s", e.getClass.getName, e.getMessage, m)
- case NoVal ⇒
- // OK. Create a default UserState and store it
- val now = TimeHelpers.nowMillis
- val agreementOpt = Policy.policy.findAgreement(DSLAgreement.DefaultAgreementName)
-
- if(agreementOpt.isEmpty) {
- ERROR("No default agreement found. Cannot initialize user state")
- } else {
- this._userState = DefaultUserStateComputations.createFirstUserState(userId, DSLAgreement.DefaultAgreementName)
- saveUserState
- DEBUG("Created and stored %s", this._userState)
- }
- }
- }
-
- private[this] def findRelatedEntries(res: DSLResource, instid: String): List[WalletEntry] = {
- val walletDB = _configurator.storeProvider.walletEntryStore
- walletDB.findPreviousEntry(_userId, res.name, instid, Some(false))
- }
-
-
- private[this] def processModifyUser(event: UserEvent): Unit = {
- val now = TimeHelpers.nowMillis
- val newActive = ActiveStateSnapshot(event.isStateActive, now)
-
- DEBUG("New active status = %s".format(newActive))
-
- this._userState = this._userState.copy( active = newActive )
- }
- /**
- * Use the provided [[gr.grnet.aquarium.logic.events.UserEvent]] to change any user state.
- */
- private[this] def processUserEvent(event: UserEvent): Unit = {
- if(event.isCreateUser) {
- processCreateUser(event)
- } else if(event.isModifyUser) {
- processModifyUser(event)
- }
- }
-
- /**
- * Tries to makes sure that the internal user state exists.
- *
- * May contact the [[gr.grnet.aquarium.store.UserStateStore]] for that.
- *
- */
- private[this] def ensureUserState(): Unit = {
- if (_userState == null)
- rebuildState(0)
- else
- rebuildState(_userState.oldestSnapshotTime, System.currentTimeMillis())
- }
-
- /**
- * Replay the event log for all events that affect the user state, starting
- * from the provided time instant.
- */
- def rebuildState(from: Long): Unit =
- rebuildState(from, oneYearAhead(new Date(), new Date(Long.MaxValue)).getTime)
-
/**
* Replay the event log for all events that affect the user state.
*/
* Create an empty state for a user
*/
def createBlankState = {
- this._userState = DefaultUserStateComputations.createFirstUserState(this._userId, DSLAgreement.DefaultAgreementName)
+ this._userState = DefaultUserStateComputations.createInitialUserState(this._userId, 0L, true, 0.0)
}
/**
*/
def replayUserEvents(initState: UserState, events: List[UserEvent],
from: Long, to: Long): UserState = {
-// var act = initState.active
-// var rol = initState.roles
-// events
-// .filter(e => e.occurredMillis >= from && e.occurredMillis < to)
-// .foreach {
-// e =>
-// act = act.copy(
-// data = e.isStateActive, snapshotTime = e.occurredMillis)
-// // TODO: Implement the following
-// //_userState.agreement = _userState.agreement.copy(
-// // data = e.newAgreement, e.occurredMillis)
-//
-// rol = rol.copy(data = e.roles,
-// snapshotTime = e.occurredMillis)
-// }
-// initState.copy(active = act, roles = rol)
initState
}
*/
def replayWalletEntries(initState: UserState, events: List[WalletEntry],
from: Long, to: Long): UserState = {
-// var cred = initState.credits
-// events
-// .filter(e => e.occurredMillis >= from && e.occurredMillis < to)
-// .foreach {
-// w =>
-// val newVal = cred.creditAmount + w.value
-// cred = cred.copy(data = newVal)
-// }
-// if (!events.isEmpty) {
-// val snapTime = events.map{e => e.occurredMillis}.max
-// cred = cred.copy(snapshotTime = snapTime)
-// }
-// initState.copy(credits = cred)
initState
}
* Persist current user state
*/
private[this] def saveUserState(): Unit = {
- _configurator.storeProvider.userStateStore.deleteUserState(this._userId)
_configurator.storeProvider.userStateStore.storeUserState(this._userState) match {
case Just(record) => record
case NoVal => ERROR("Unknown error saving state")
- case Failed(e, a) =>
- ERROR("Saving state failed: %s error was: %s".format(a,e));
+ case Failed(e) =>
+ ERROR("Saving state failed: %s".format(e));
}
}
- protected def receive: Receive = {
- case m @ AquariumPropertiesLoaded(props) ⇒
- this._timestampTheshold = props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
- INFO("Setup my timestampTheshold = %s", this._timestampTheshold)
-
- case m @ UserActorInitWithUserId(userId) ⇒
- this._userId = userId
- DEBUG("Actor starting, loading state")
- ensureUserState()
-
- case m @ ProcessResourceEvent(resourceEvent) ⇒
- if(resourceEvent.userId != this._userId) {
- ERROR("Received %s but my userId = %s".format(m, this._userId))
- } else {
- //ensureUserState()
+ def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
+ this._timestampTheshold = event.props.getLong(Configurator.Keys.user_state_timestamp_threshold).getOr(10000)
+ INFO("Setup my timestampTheshold = %s", this._timestampTheshold)
+ }
+
+ def onUserActorInitWithUserId(event: UserActorInitWithUserId): Unit = {
+ this._userId = event.userId
+ DEBUG("Actor starting, loading state")
+ }
+
+ def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
+ val resourceEvent = event.rce
+ if(resourceEvent.userID != this._userId) {
+ ERROR("Received %s but my userId = %s".format(event, this._userId))
+ } else {
+ //ensureUserState()
// calcWalletEntries()
- //processResourceEvent(resourceEvent, true)
- }
+ //processResourceEvent(resourceEvent, true)
+ }
+ }
+
+ private[this] def processCreateUser(event: UserEvent): Unit = {
+ val userId = event.userID
+ DEBUG("Creating user from state %s", event)
+ val usersDB = _configurator.storeProvider.userStateStore
+ usersDB.findUserStateByUserId(userId) match {
+ case Just(userState) ⇒
+ WARN("User already created, state = %s".format(userState))
+ case failed @ Failed(e) ⇒
+ ERROR("[%s] %s", e.getClass.getName, e.getMessage)
+ case NoVal ⇒
+ val agreement = RoleAgreements.agreementForRole(event.role)
+ DEBUG("User %s assigned agreement %s".format(userId, agreement.name))
+
+ this._userState = DefaultUserStateComputations.createInitialUserState(
+ userId,
+ event.occurredMillis,
+ event.isActive, 0.0, List(event.role), agreement.name)
+ saveUserState
+ DEBUG("Created and stored %s", this._userState)
+ }
+ }
+
+ private[this] def processModifyUser(event: UserEvent): Unit = {
+ val now = TimeHelpers.nowMillis
+ val newActive = ActiveStateSnapshot(event.isStateActive, now)
- case m @ ProcessUserEvent(userEvent) ⇒
- if(userEvent.userId != this._userId) {
- ERROR("Received %s but my userId = %s".format(m, this._userId))
- } else {
- ensureUserState()
- processUserEvent(userEvent)
+ DEBUG("New active status = %s".format(newActive))
+
+ this._userState = this._userState.copy(activeStateSnapshot = newActive)
+ }
+
+ def onProcessUserEvent(event: ProcessUserEvent): Unit = {
+ val userEvent = event.ue
+ if(userEvent.userID != this._userId) {
+ ERROR("Received %s but my userId = %s".format(userEvent, this._userId))
+ } else {
+ if(userEvent.isCreateUser) {
+ processCreateUser(userEvent)
+ } else if(userEvent.isModifyUser) {
+ processModifyUser(userEvent)
}
+ }
+ }
+
+ def onRequestUserBalance(event: RequestUserBalance): Unit = {
+ val userId = event.userId
+ val timestamp = event.timestamp
- case m @ RequestUserBalance(userId, timestamp) ⇒
- if (System.currentTimeMillis() - _userState.newestSnapshotTime > 60 * 1000)
- {
+ if (System.currentTimeMillis() - _userState.newestSnapshotTime > 60 * 1000)
+ {
// calcWalletEntries()
- }
- self reply UserResponseGetBalance(userId, _userState.credits.creditAmount)
-
- case m @ UserRequestGetState(userId, timestamp) ⇒
- if(this._userId != userId) {
- ERROR("Received %s but my userId = %s".format(m, this._userId))
- // TODO: throw an exception here
- } else {
- // FIXME: implement
- ERROR("FIXME: Should have properly computed the user state")
- ensureUserState()
- self reply UserResponseGetState(userId, this._userState)
- }
+ }
+ self reply UserResponseGetBalance(userId, _userState.creditsSnapshot.creditAmount)
+ }
+
+ def onUserRequestGetState(event: UserRequestGetState): Unit = {
+ val userId = event.userId
+ if(this._userId != userId) {
+ ERROR("Received %s but my userId = %s".format(event, this._userId))
+ // TODO: throw an exception here
+ } else {
+ // FIXME: implement
+ ERROR("FIXME: Should have properly computed the user state")
+// ensureUserState()
+ self reply UserResponseGetState(userId, this._userState)
+ }
+ }
+
+ def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
}
override def postStop {
DEBUG("Stopping, saving state")
- //saveUserState
+ saveUserState
}
+ override def preRestart(reason: Throwable) {
+ DEBUG("Actor failed, restarting")
+ }
+
+ override def postRestart(reason: Throwable) {
+ DEBUG("Actor restarted succesfully")
+ }
+
+ def knownMessageTypes = UserActor.KnownMessageTypes
+
private[this] def DEBUG(fmt: String, args: Any*) =
logger.debug("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))
private[this] def ERROR(fmt: String, args: Any*) =
logger.error("UserActor[%s]: %s".format(_userId, fmt.format(args:_*)))
+}
+
+object UserActor {
+ final val KnownMessageTypes = List(
+ classOf[AquariumPropertiesLoaded],
+ classOf[UserActorInitWithUserId],
+ classOf[ProcessResourceEvent],
+ classOf[ProcessUserEvent],
+ classOf[RequestUserBalance],
+ classOf[UserRequestGetState],
+ classOf[ActorProviderConfigured]
+ )
}
\ No newline at end of file