2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.actor
40 import gr.grnet.aquarium.actor._
42 import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
43 import gr.grnet.aquarium.actor.message.config.{InitializeUserActorState, AquariumPropertiesLoaded}
44 import gr.grnet.aquarium.util.date.TimeHelpers
45 import gr.grnet.aquarium.event.model.im.IMEventModel
46 import gr.grnet.aquarium.actor.message.{GetUserWalletResponseData, GetUserWalletResponse, GetUserWalletRequest, GetUserStateResponse, GetUserBalanceResponseData, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
47 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
48 import gr.grnet.aquarium.AquariumInternalError
49 import gr.grnet.aquarium.computation.BillingMonthInfo
50 import gr.grnet.aquarium.charging.state.UserStateBootstrap
51 import gr.grnet.aquarium.charging.state.{WorkingAgreementHistory, WorkingUserState, UserStateModel}
52 import gr.grnet.aquarium.charging.reason.{InitialUserActorSetup, RealtimeChargingReason}
53 import gr.grnet.aquarium.policy.{PolicyDefinedFullPriceTableRef, StdUserAgreement}
54 import gr.grnet.aquarium.event.model.resource.ResourceEventModel
58 * @author Christos KK Loverdos <loverdos@gmail.com>
61 class UserActor extends ReflectiveRoleableActor {
62 private[this] var _userID: String = "<?>"
63 private[this] var _workingUserState: WorkingUserState = _
64 private[this] var _userCreationIMEvent: IMEventModel = _
65 private[this] val _workingAgreementHistory: WorkingAgreementHistory = new WorkingAgreementHistory
66 private[this] var _latestIMEventID: String = ""
67 private[this] var _latestResourceEventID: String = ""
68 private[this] var _userStateBootstrap: UserStateBootstrap = _
72 throw new AquariumInternalError("%s not initialized")
78 override def postStop() {
79 DEBUG("I am finally stopped (in postStop())")
80 aquarium.akkaService.notifyUserActorPostStop(this)
83 private[this] def shutmedown(): Unit = {
85 aquarium.akkaService.invalidateUserActor(this)
89 override protected def onThrowable(t: Throwable, message: AnyRef) = {
90 LogHelpers.logChainOfCauses(logger, t)
91 ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
96 def role = UserActorRole
98 private[this] def chargingService = aquarium.chargingService
100 private[this] def stdUserStateStoreFunc = (userState: UserStateModel) ⇒ {
101 aquarium.userStateStore.insertUserState(userState)
104 @inline private[this] def haveUserID = {
108 @inline private[this] def haveUserCreationIMEvent = {
109 this._userCreationIMEvent ne null
112 def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
115 @inline private[this] def haveAgreements = {
116 this._workingAgreementHistory.size > 0
119 @inline private[this] def haveWorkingUserState = {
120 this._workingUserState ne null
123 @inline private[this] def haveUserStateBootstrap = {
124 this._userStateBootstrap ne null
127 private[this] def updateAgreementHistoryFrom(imEvent: IMEventModel): Unit = {
128 if(imEvent.isCreateUser) {
129 if(haveUserCreationIMEvent) {
130 throw new AquariumInternalError(
131 "Got user creation event (id=%s) but I already have one (id=%s)",
132 this._userCreationIMEvent.id,
137 this._userCreationIMEvent = imEvent
140 val effectiveFromMillis = imEvent.occurredMillis
141 val role = imEvent.role
142 // calling unsafe just for the side-effect
143 assert(null ne aquarium.unsafePriceTableForRoleAt(role, effectiveFromMillis))
145 val newAgreement = StdUserAgreement(
151 PolicyDefinedFullPriceTableRef
154 this._workingAgreementHistory += newAgreement
157 private[this] def updateLatestIMEventIDFrom(imEvent: IMEventModel): Unit = {
158 this._latestIMEventID = imEvent.id
161 private[this] def updateLatestResourceEventIDFrom(rcEvent: ResourceEventModel): Unit = {
162 this._latestResourceEventID = rcEvent.id
166 * Creates the initial state that is related to IMEvents.
168 private[this] def initializeStateOfIMEvents(): Unit = {
169 // NOTE: this._userID is already set up by onInitializeUserActorState()
170 aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
171 DEBUG("Replaying %s", imEvent)
173 updateAgreementHistoryFrom(imEvent)
174 updateLatestIMEventIDFrom(imEvent)
178 DEBUG("Initial agreement history %s", this._workingAgreementHistory.toJsonString)
184 * Resource events are processed only if the user has been created and has agreements.
185 * Otherwise nothing can be computed.
187 private[this] def shouldProcessResourceEvents: Boolean = {
188 haveUserCreationIMEvent && haveAgreements && haveUserStateBootstrap
191 private[this] def loadWorkingUserStateAndUpdateAgreementHistory(): Unit = {
192 assert(this.haveAgreements, "this.haveAgreements")
193 assert(this.haveUserCreationIMEvent, "this.haveUserCreationIMEvent")
195 val userCreationMillis = this._userCreationIMEvent.occurredMillis
196 val userCreationRole = this._userCreationIMEvent.role // initial role
197 val userCreationIMEventID = this._userCreationIMEvent.id
199 if(!haveUserStateBootstrap) {
200 this._userStateBootstrap = UserStateBootstrap(
203 aquarium.initialUserAgreement(userCreationRole, userCreationMillis, Some(userCreationIMEventID)),
204 aquarium.initialUserBalance(userCreationRole, userCreationMillis)
208 val now = TimeHelpers.nowMillis()
209 this._workingUserState = chargingService.replayMonthChargingUpTo(
210 BillingMonthInfo.fromMillis(now),
212 this._userStateBootstrap,
213 aquarium.currentResourceTypesMap,
214 InitialUserActorSetup(),
215 aquarium.userStateStore.insertUserState,
219 // Final touch: Update agreement history in the working user state.
220 // The assumption is that all agreement changes go via IMEvents, so the
221 // state this._workingAgreementHistory is always the authoritative source.
222 if(haveWorkingUserState) {
223 this._workingUserState.workingAgreementHistory.setFrom(this._workingAgreementHistory)
224 DEBUG("Computed working user state %s", this._workingUserState.toJsonString)
228 private[this] def initializeStateOfResourceEvents(event: InitializeUserActorState): Unit = {
229 if(!this.haveAgreements) {
230 DEBUG("Cannot initializeResourceEventsState() from %s. There are no agreements", event)
234 if(!this.haveUserCreationIMEvent) {
235 DEBUG("Cannot initializeResourceEventsState() from %s. I never got a CREATE IMEvent", event)
239 // We will also need this functionality when receiving IMEvents, so we place it in a method
240 loadWorkingUserStateAndUpdateAgreementHistory()
242 if(haveWorkingUserState) {
243 DEBUG("Initial working user state %s", this._workingUserState.toJsonString)
248 def onInitializeUserActorState(event: InitializeUserActorState): Unit = {
249 this._userID = event.userID
250 DEBUG("Got %s", event)
252 initializeStateOfIMEvents()
253 initializeStateOfResourceEvents(event)
257 * Process [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
258 * When this method is called, we assume that all proper checks have been made and it
259 * is OK to proceed with the event processing.
261 def onProcessIMEvent(processEvent: ProcessIMEvent): Unit = {
262 val imEvent = processEvent.imEvent
263 val hadUserCreationIMEvent = haveUserCreationIMEvent
265 if(!haveAgreements) {
266 // This is an error. Should have been initialized from somewhere ...
267 throw new AquariumInternalError("No agreements. Cannot process %s", processEvent)
270 if(this._latestIMEventID == imEvent.id) {
271 // This happens when the actor is brought to life, then immediately initialized, and then
272 // sent the first IM event. But from the initialization procedure, this IM event will have
273 // already been loaded from DB!
274 INFO("Ignoring first %s", imEvent.toDebugString)
277 //this._latestIMEventID = imEvent.id
281 updateAgreementHistoryFrom(imEvent)
282 updateLatestIMEventIDFrom(imEvent)
284 // Must also update user state if we know when in history the life of a user begins
285 if(!hadUserCreationIMEvent && haveUserCreationIMEvent) {
286 loadWorkingUserStateAndUpdateAgreementHistory()
290 def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
291 val rcEvent = event.rcEvent
293 if(!shouldProcessResourceEvents) {
294 // This means the user has not been created (at least, as far as Aquarium is concerned).
295 // So, we do not process any resource event
296 DEBUG("Not processing %s", rcEvent.toJsonString)
302 // Since the latest resource event per resource is recorded in the user state,
303 // we do not need to query the store. Just query the in-memory state.
304 // Note: This is a similar situation with the first IMEvent received right after the user
306 if(this._latestResourceEventID == rcEvent.id) {
307 INFO("Ignoring first %s", rcEvent.toDebugString)
313 val now = TimeHelpers.nowMillis()
314 val currentResourcesMap = aquarium.currentResourceTypesMap
315 val chargingReason = RealtimeChargingReason(None, now)
317 val nowBillingMonthInfo = BillingMonthInfo.fromMillis(now)
318 val nowYear = nowBillingMonthInfo.year
319 val nowMonth = nowBillingMonthInfo.month
321 val eventOccurredMillis = rcEvent.occurredMillis
322 val eventBillingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis)
323 val eventYear = eventBillingMonthInfo.year
324 val eventMonth = eventBillingMonthInfo.month
326 def computeBatch(): Unit = {
327 DEBUG("Going for out of sync charging")
328 this._workingUserState = chargingService.replayMonthChargingUpTo(
330 // Take into account that the event may be out-of-sync.
331 // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
332 now max eventOccurredMillis,
333 this._userStateBootstrap,
336 stdUserStateStoreFunc,
340 updateLatestResourceEventIDFrom(rcEvent)
343 def computeRealtime(): Unit = {
344 DEBUG("Going for in sync charging")
345 chargingService.processResourceEvent(
347 this._workingUserState,
354 updateLatestResourceEventIDFrom(rcEvent)
358 if(nowYear != eventYear || nowMonth != eventMonth) {
360 "nowYear(%s) != eventYear(%s) || nowMonth(%s) != eventMonth(%s)",
366 else if(this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis) {
367 DEBUG("this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis")
370 TimeHelpers.toYYYYMMDDHHMMSSSSS(this._workingUserState.latestResourceEventOccurredMillis),
371 TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.occurredMillis)
379 DEBUG("Updated %s", this._workingUserState)
383 def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
384 val userID = event.userID
386 (haveUserCreationIMEvent, haveWorkingUserState) match {
388 // (User CREATEd, with balance state)
389 sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this._userID, this._workingUserState.totalCredits)))
392 // (User CREATEd, no balance state)
393 // Return the default initial balance
394 sender ! GetUserBalanceResponse(
396 GetUserBalanceResponseData(
398 aquarium.initialUserBalance(this._userCreationIMEvent.role, this._userCreationIMEvent.occurredMillis)
402 // (Not CREATEd, with balance state)
403 // Clearly this is internal error
404 sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
406 case (false, false) ⇒
407 // (Not CREATEd, no balance state)
408 // The user is completely unknown
409 sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
413 def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
414 haveWorkingUserState match {
416 sender ! GetUserStateResponse(Right(this._workingUserState))
419 sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
423 def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = {
424 haveWorkingUserState match {
426 DEBUG("haveWorkingUserState: %s", event)
427 sender ! GetUserWalletResponse(
429 GetUserWalletResponseData(
431 this._workingUserState.totalCredits,
432 this._workingUserState.walletEntries.toList
436 DEBUG("!haveWorkingUserState: %s", event)
437 haveUserCreationIMEvent match {
439 DEBUG("haveUserCreationIMEvent: %s", event)
440 sender ! GetUserWalletResponse(
442 GetUserWalletResponseData(
444 aquarium.initialUserBalance(this._userCreationIMEvent.role, this._userCreationIMEvent.occurredMillis),
449 DEBUG("!haveUserCreationIMEvent: %s", event)
450 sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404)
455 private[this] def D_userID = {
459 private[this] def DEBUG(fmt: String, args: Any*) =
460 logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
462 private[this] def INFO(fmt: String, args: Any*) =
463 logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
465 private[this] def WARN(fmt: String, args: Any*) =
466 logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
468 private[this] def ERROR(fmt: String, args: Any*) =
469 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
471 private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
472 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)