2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.actor
40 import gr.grnet.aquarium.AquariumInternalError
41 import gr.grnet.aquarium.actor.message.GetUserBalanceRequest
42 import gr.grnet.aquarium.actor.message.GetUserBalanceResponse
43 import gr.grnet.aquarium.actor.message.GetUserBalanceResponseData
44 import gr.grnet.aquarium.actor.message.GetUserBillRequest
45 import gr.grnet.aquarium.actor.message.GetUserBillResponse
46 import gr.grnet.aquarium.actor.message.GetUserBillResponseData
47 import gr.grnet.aquarium.actor.message.GetUserStateRequest
48 import gr.grnet.aquarium.actor.message.GetUserStateResponse
49 import gr.grnet.aquarium.actor.message.GetUserWalletRequest
50 import gr.grnet.aquarium.actor.message.GetUserWalletResponse
51 import gr.grnet.aquarium.actor.message.GetUserWalletResponseData
52 import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded
53 import gr.grnet.aquarium.charging.state.{UserStateModel, UserAgreementHistoryModel, UserStateBootstrap}
54 import gr.grnet.aquarium.computation.BillingMonthInfo
55 import gr.grnet.aquarium.message.avro.gen.{IMEventMsg, ResourceEventMsg, UserStateMsg}
56 import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers, AvroHelpers}
57 import gr.grnet.aquarium.service.event.BalanceEvent
58 import gr.grnet.aquarium.util.date.TimeHelpers
59 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
60 import gr.grnet.aquarium.policy.{ResourceType, PolicyModel}
61 import gr.grnet.aquarium.charging.bill.BillEntryMsg
65 * @author Christos KK Loverdos <loverdos@gmail.com>
68 class UserActor extends ReflectiveRoleableActor {
69 private[this] var _rcMsgCount = 0
70 private[this] var _imMsgCount = 0
71 private[this] var _userID: String = "<?>"
72 private[this] var _userState: UserStateModel = _
73 private[this] var _userCreationIMEvent: IMEventMsg = _
74 private[this] var _userAgreementHistoryModel: UserAgreementHistoryModel = _
75 private[this] var _latestIMEventOriginalID: String = ""
76 private[this] var _latestIMEventOccurredMillis: Long = -1L
77 private[this] var _latestResourceEventOriginalID: String = ""
78 private[this] var _userStateBootstrap: UserStateBootstrap = _
82 throw new AquariumInternalError("%s not initialized")
88 override def postStop() {
89 DEBUG("I am finally stopped (in postStop())")
90 aquarium.akkaService.notifyUserActorPostStop(this)
93 private[this] def shutmedown(): Unit = {
95 aquarium.akkaService.invalidateUserActor(this)
99 override protected def onThrowable(t: Throwable, message: AnyRef) = {
100 LogHelpers.logChainOfCauses(logger, t)
101 ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
106 def role = UserActorRole
108 private[this] def chargingService = aquarium.chargingService
110 private[this] def stdUserStateStoreFunc = (userState: UserStateMsg) ⇒ {
111 aquarium.userStateStore.insertUserState(userState)
114 @inline private[this] def haveUserID = {
118 def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
121 @inline private[this] def haveAgreements = {
122 (this._userAgreementHistoryModel ne null) && this._userAgreementHistoryModel.size > 0
125 @inline private[this] def haveUserState = {
126 this._userState ne null
129 @inline private[this] def haveUserStateBootstrap = {
130 this._userStateBootstrap ne null
133 private[this] def createUserAgreementHistoryModel(imEvent: IMEventMsg) {
134 assert(MessageHelpers.isIMEventCreate(imEvent))
135 assert(this._userAgreementHistoryModel eq null)
136 assert(this._userCreationIMEvent eq null)
138 this._userCreationIMEvent = imEvent
139 this._userAgreementHistoryModel = ModelFactory.newUserAgreementHistoryModelFromIMEvent(
141 imEvent.getOriginalID
145 private[this] def updateAgreementHistoryFrom(imEvent: IMEventMsg): Unit = {
146 val isCreateUser = MessageHelpers.isIMEventCreate(imEvent)
149 throw new AquariumInternalError(
150 "Got user creation event (id=%s) but I already have one (id=%s)",
151 this._userCreationIMEvent.getOriginalID,
152 imEvent.getOriginalID
156 createUserAgreementHistoryModel(imEvent) // now we have an agreement history
157 createUserStateBootstrap(imEvent)
160 val effectiveFromMillis = imEvent.getOccurredMillis
161 val role = imEvent.getRole
162 // calling unsafe just for the side-effect
163 assert(null ne aquarium.unsafeFullPriceTableForRoleAt(role, effectiveFromMillis))
165 // add to model (will update the underlying messages as well)
166 val newUserAgreementModel = ModelFactory.newUserAgreementModelFromIMEvent(imEvent, imEvent.getOriginalID)
167 this._userAgreementHistoryModel += newUserAgreementModel
169 // We assume that we always call this method with in-sync events
170 assert(imEvent.getOccurredMillis >= this._latestIMEventOccurredMillis)
171 updateLatestIMEventStateFrom(imEvent)
174 // private[this] def updateLatestIMEventIDFrom(imEvent: IMEventMsg): Unit = {
175 // this._latestIMEventOriginalID = imEvent.getOriginalID
178 private[this] def updateLatestIMEventStateFrom(imEvent: IMEventMsg) {
179 this._latestIMEventOriginalID = imEvent.getOriginalID
180 this._latestIMEventOccurredMillis = imEvent.getOccurredMillis
181 this._imMsgCount += 1
184 private[this] def updateLatestResourceEventIDFrom(rcEvent: ResourceEventMsg): Unit = {
185 this._latestResourceEventOriginalID = rcEvent.getOriginalID
189 * Creates the initial state that is related to IMEvents.
191 * @return `true` if there was a user CREATE event
193 private[this] def initializeStateOfIMEvents(): Boolean = {
194 DEBUG("initializeStateOfIMEvents()")
196 // NOTE: this._userID is already set up our caller
199 aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
201 DEBUG("Replaying [%s] %s", _imcounter, imEvent)
203 if(_imcounter == 1 && !MessageHelpers.isIMEventCreate(imEvent)) {
204 // The very first event must be a CREATE event. Otherwise we abort initialization.
205 // This will normally happen during devops :)
206 INFO("Ignoring first %s since it is not CREATE", shortClassNameOf(imEvent))
210 updateAgreementHistoryFrom(imEvent)
216 private[this] def loadUserStateAndUpdateAgreementHistory(): Unit = {
217 assert(this.haveAgreements, "this.haveAgreements")
219 if(!haveUserStateBootstrap) {
220 this._userStateBootstrap = aquarium.getUserStateBootstrap(this._userCreationIMEvent)
222 logger.debug("#### this._userStateBootStrap %s".format(this._userStateBootstrap.toString))
223 val now = TimeHelpers.nowMillis()
224 this._userState = chargingService.replayMonthChargingUpTo(
225 BillingMonthInfo.fromMillis(now),
227 this._userStateBootstrap,
228 aquarium.currentResourceTypesMap,
229 aquarium.userStateStore.insertUserState
232 // Final touch: Update agreement history in the working user state.
233 // The assumption is that all agreement changes go via IMEvents, so the
234 // state this._workingAgreementHistory is always the authoritative source.
236 this._userState.userAgreementHistoryModel = this._userAgreementHistoryModel
237 DEBUG("Computed working user state %s", AvroHelpers.jsonStringOfSpecificRecord(this._userState.msg))
241 private[this] def initializeStateOfResourceEvents(): Unit = {
242 DEBUG("initializeStateOfResourceEvents()")
243 assert(haveAgreements)
245 // We will also need this functionality when receiving IMEvents, so we place it in a method
246 loadUserStateAndUpdateAgreementHistory()
249 DEBUG("Initial working user state %s", AvroHelpers.jsonStringOfSpecificRecord(this._userState.msg))
255 * Initializes the actor state from DB.
257 def initializeUserActorState(userID: String): Boolean = {
258 this._userID = userID
260 if(initializeStateOfIMEvents()) {
261 initializeStateOfResourceEvents()
262 // Even if we have no resource events, the user is at least CREATEd
270 def createUserStateBootstrap(imEvent: IMEventMsg) {
271 assert(MessageHelpers.isIMEventCreate(imEvent), "MessageHelpers.isIMEventCreate(imEvent)")
272 assert(this._userCreationIMEvent == imEvent, "this._userCreationIMEvent == imEvent")
274 this._userStateBootstrap = aquarium.getUserStateBootstrap(this._userCreationIMEvent)
278 * Processes [[gr.grnet.aquarium.message.avro.gen.IMEventMsg]]s that come directly from the
279 * messaging hub (rabbitmq).
281 def onIMEventMsg(imEvent: IMEventMsg) {
282 if(!haveAgreements) {
283 // If we have no agreements so far, then it does not matter what kind of event
284 // this is. So we replay the log (ehm.. store)
285 initializeUserActorState(imEvent.getUserID)
290 // Check for out of sync (regarding IMEvents)
291 val isOutOfSyncIM = imEvent.getOccurredMillis < this._latestIMEventOccurredMillis
293 // clear all resource state
299 // Check out of sync (regarding ResourceEvents)
300 val isOutOfSyncRC = false // FIXME implement
308 assert(!MessageHelpers.isIMEventCreate(imEvent), "!MessageHelpers.isIMEventCreate(imEvent)")
309 updateAgreementHistoryFrom(imEvent)
312 def onResourceEventMsg(rcEvent: ResourceEventMsg) {
313 if(!haveAgreements) {
314 DEBUG("No agreement. Ignoring %s", rcEvent)
319 val now = TimeHelpers.nowMillis()
320 // TODO: Review this and its usage in user state.
321 // TODO: The assumption is that the resource set increases all the time,
322 // TODO: so the current map contains everything ever known (assuming we do not run backwards in time).
323 val currentResourcesMap = aquarium.currentResourceTypesMap
325 val nowBillingMonthInfo = BillingMonthInfo.fromMillis(now)
326 val nowYear = nowBillingMonthInfo.year
327 val nowMonth = nowBillingMonthInfo.month
329 val eventOccurredMillis = rcEvent.getOccurredMillis
330 val eventBillingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis)
331 val eventYear = eventBillingMonthInfo.year
332 val eventMonth = eventBillingMonthInfo.month
334 def computeBatch(): Unit = {
335 DEBUG("Going for out of sync charging for %s", rcEvent.getOriginalID)
336 this._userState = chargingService.replayMonthChargingUpTo(
338 // Take into account that the event may be out-of-sync.
339 // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
340 now max eventOccurredMillis,
341 this._userStateBootstrap,
343 stdUserStateStoreFunc
346 updateLatestResourceEventIDFrom(rcEvent)
349 def computeRealtime(): Unit = {
350 DEBUG("Going for in sync charging for %s", rcEvent.getOriginalID)
351 chargingService.processResourceEvent(
358 updateLatestResourceEventIDFrom(rcEvent)
361 val oldTotalCredits =
362 if(this._userState!=null)
363 this._userState.totalCredits
367 if(this._userState eq null) {
370 else if(nowYear != eventYear || nowMonth != eventMonth) {
372 "nowYear(%s) != eventYear(%s) || nowMonth(%s) != eventMonth(%s)",
378 else if(this._userState.latestResourceEventOccurredMillis < rcEvent.getOccurredMillis) {
379 DEBUG("this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis")
382 TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userState.latestResourceEventOccurredMillis),
383 TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis)
388 DEBUG("OUT OF ORDER! this._workingUserState.latestResourceEventOccurredMillis=%s and rcEvent.occurredMillis=%s",
389 TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userState.latestResourceEventOccurredMillis),
390 TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis))
394 val newTotalCredits = this._userState.totalCredits
395 if(oldTotalCredits * newTotalCredits < 0)
396 aquarium.eventBus ! new BalanceEvent(this._userState.userID,
398 DEBUG("Updated %s", this._userState)
402 def onGetUserBillRequest(event: GetUserBillRequest): Unit = {
404 val timeslot = event.timeslot
405 val resourceTypes = aquarium.policyStore.
406 loadSortedPolicyModelsWithin(timeslot.from.getTime,
407 timeslot.to.getTime).
408 values.headOption match {
409 case None => Map[String,ResourceType]()
410 case Some(policy:PolicyModel) => policy.resourceTypesMap
412 val state= if(haveUserState) Some(this._userState.msg) else None
413 val billEntryMsg = BillEntryMsg.fromWorkingUserState(timeslot,this._userID,state,resourceTypes)
414 //val billEntryMsg = MessageFactory.createBillEntryMsg(billEntry)
415 //logger.debug("BILL ENTRY MSG: " + billEntryMsg.toString)
416 val billData = GetUserBillResponseData(this._userID,billEntryMsg)
417 sender ! GetUserBillResponse(Right(billData))
421 sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500)
425 def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
426 val userID = event.userID
428 (haveAgreements, haveUserState) match {
430 // (User CREATEd, with balance state)
431 val realtimeMillis = TimeHelpers.nowMillis()
432 chargingService.calculateRealtimeUserState(
434 BillingMonthInfo.fromMillis(realtimeMillis),
438 sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this._userID, this._userState.totalCredits)))
441 // (User CREATEd, no balance state)
442 // Return the default initial balance
443 sender ! GetUserBalanceResponse(
445 GetUserBalanceResponseData(
447 aquarium.initialUserBalance(this._userCreationIMEvent.getRole, this._userCreationIMEvent.getOccurredMillis)
451 // (Not CREATEd, with balance state)
452 // Clearly this is internal error
453 sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
455 case (false, false) ⇒
456 // (Not CREATEd, no balance state)
457 // The user is completely unknown
458 sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
462 def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
463 haveUserState match {
465 val realtimeMillis = TimeHelpers.nowMillis()
466 chargingService.calculateRealtimeUserState(
468 BillingMonthInfo.fromMillis(realtimeMillis),
472 sender ! GetUserStateResponse(Right(this._userState.msg))
475 sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
479 def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = {
480 haveUserState match {
482 DEBUG("haveWorkingUserState: %s", event)
483 val realtimeMillis = TimeHelpers.nowMillis()
484 chargingService.calculateRealtimeUserState(
486 BillingMonthInfo.fromMillis(realtimeMillis),
490 sender ! GetUserWalletResponse(
492 GetUserWalletResponseData(
494 this._userState.totalCredits,
495 MessageFactory.newWalletEntriesMsg(this._userState.msg.getWalletEntries)
499 DEBUG("!haveWorkingUserState: %s", event)
500 haveAgreements match {
502 DEBUG("haveAgreements: %s", event)
503 sender ! GetUserWalletResponse(
505 GetUserWalletResponseData(
507 aquarium.initialUserBalance(this._userCreationIMEvent.getRole, this._userCreationIMEvent.getOccurredMillis),
508 MessageFactory.newWalletEntriesMsg()
512 DEBUG("!haveUserCreationIMEvent: %s", event)
513 sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404)
518 private[this] def D_userID = {
522 private[this] def DEBUG(fmt: String, args: Any*) =
523 logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
525 private[this] def INFO(fmt: String, args: Any*) =
526 logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
528 private[this] def WARN(fmt: String, args: Any*) =
529 logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
531 private[this] def ERROR(fmt: String, args: Any*) =
532 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
534 private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
535 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)