2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.actor
40 import gr.grnet.aquarium.AquariumInternalError
41 import gr.grnet.aquarium.actor.message.GetUserBalanceRequest
42 import gr.grnet.aquarium.actor.message.GetUserBalanceResponse
43 import gr.grnet.aquarium.actor.message.GetUserBalanceResponseData
44 import gr.grnet.aquarium.actor.message.GetUserBillRequest
45 import gr.grnet.aquarium.actor.message.GetUserBillResponse
46 import gr.grnet.aquarium.actor.message.GetUserBillResponseData
47 import gr.grnet.aquarium.actor.message.GetUserStateRequest
48 import gr.grnet.aquarium.actor.message.GetUserStateResponse
49 import gr.grnet.aquarium.actor.message.GetUserWalletRequest
50 import gr.grnet.aquarium.actor.message.GetUserWalletResponse
51 import gr.grnet.aquarium.actor.message.GetUserWalletResponseData
52 import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded
53 import gr.grnet.aquarium.charging.state.{UserStateModel, UserAgreementHistoryModel, UserStateBootstrap}
54 import gr.grnet.aquarium.computation.BillingMonthInfo
55 import gr.grnet.aquarium.message.avro.gen.{ResourceTypeMsg, UserAgreementHistoryMsg, IMEventMsg, ResourceEventMsg, UserStateMsg}
56 import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers, AvroHelpers}
57 import gr.grnet.aquarium.service.event.BalanceEvent
58 import gr.grnet.aquarium.util.date.TimeHelpers
59 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
60 import gr.grnet.aquarium.policy.{ResourceType, PolicyModel}
61 import gr.grnet.aquarium.charging.bill.BillEntryMsg
62 import gr.grnet.aquarium.event.CreditsModel
67 * @author Christos KK Loverdos <loverdos@gmail.com>
70 class UserActor extends ReflectiveRoleableActor {
71 private[this] var _rcMsgCount = 0
72 private[this] var _imMsgCount = 0
73 private[this] var _userID: String = "???"
74 private[this] var _userStateMsg: UserStateMsg = _
75 private[this] var _userAgreementHistoryModel: UserAgreementHistoryModel = _
79 throw new AquariumInternalError("%s not initialized")
85 override def postStop() {
86 DEBUG("I am finally stopped (in postStop())")
87 aquarium.akkaService.notifyUserActorPostStop(this)
90 private[this] def shutmedown(): Unit = {
92 aquarium.akkaService.invalidateUserActor(this)
96 override protected def onThrowable(t: Throwable, message: AnyRef) = {
97 LogHelpers.logChainOfCauses(logger, t)
98 ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
103 def role = UserActorRole
105 private[this] def chargingService = aquarium.chargingService
107 private[this] def stdUserStateStoreFunc = (userState: UserStateMsg) ⇒ {
108 aquarium.userStateStore.insertUserState(userState)
111 def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
114 private[this] def haveUserID = this._userID ne null
115 private[this] def unsafeUserCreationIMEventMsg = this._userAgreementHistoryModel.unsafeUserCreationIMEvent
116 private[this] def haveAgreements = this._userAgreementHistoryModel ne null
117 private[this] def isUserCreated = haveAgreements && this._userAgreementHistoryModel.hasUserCreationEvent
118 private[this] def haveUserState = this._userStateMsg ne null
120 private[this] def createInitialUserStateMsgFromCreateIMEvent() {
121 assert(haveAgreements, "haveAgreements")
122 assert(isUserCreated, "isUserCreated")
123 assert(this._userAgreementHistoryModel.hasUserCreationEvent, "this._userAgreementHistoryModel.hasUserCreationEvent")
125 val userCreationIMEventMsg = unsafeUserCreationIMEventMsg
126 val userStateBootstrap = aquarium.getUserStateBootstrap(userCreationIMEventMsg)
128 this._userStateMsg = MessageFactory.newInitialUserStateMsg(
130 CreditsModel.from(0.0),
131 TimeHelpers.nowMillis()
136 * Creates the agreement history from all the stored IMEvents.
138 * @return (`true` iff there was a user CREATE event, the number of events processed)
140 private[this] def createUserAgreementHistoryFromStoredIMEvents(): (Boolean, Int) = {
141 DEBUG("createUserAgreementHistoryFromStoredIMEvents()")
142 val historyMsg = MessageFactory.newUserAgreementHistoryMsg(this._userID)
143 this._userAgreementHistoryModel = ModelFactory.newUserAgreementHistoryModel(historyMsg)
147 val hadCreateEvent = aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
149 DEBUG("Replaying [%s/%s] %s", shortClassNameOf(imEvent), _imcounter, imEvent)
151 if(_imcounter == 1 && !MessageHelpers.isIMEventCreate(imEvent)) {
152 // The very first event must be a CREATE event. Otherwise we abort initialization.
153 // This will normally happen during devops :)
154 INFO("Ignoring first %s since it is not CREATE", shortClassNameOf(imEvent))
158 val effectiveFromMillis = imEvent.getOccurredMillis
159 val role = imEvent.getRole
160 // calling unsafe just for the side-effect
162 aquarium.unsafeFullPriceTableForRoleAt(role, effectiveFromMillis) ne null,
163 "aquarium.unsafeFullPriceTableForRoleAt(%s, %s) ne null".format(role, effectiveFromMillis)
166 this._userAgreementHistoryModel.insertUserAgreementMsgFromIMEvent(imEvent)
171 DEBUG("Agreements: %s", this._userAgreementHistoryModel)
172 (hadCreateEvent, _imcounter)
176 * Processes [[gr.grnet.aquarium.message.avro.gen.IMEventMsg]]s that come directly from the
177 * messaging hub (rabbitmq).
179 def onIMEventMsg(imEvent: IMEventMsg) {
180 if(!isUserCreated && MessageHelpers.isIMEventCreate(imEvent)) {
181 assert(this._imMsgCount == 0, "this._imMsgCount == 0")
182 // Create the full agreement history from the original sources (IMEvents)
183 val (userCreated, imEventsCount) = createUserAgreementHistoryFromStoredIMEvents()
185 this._imMsgCount = imEventsCount
189 // Check for out of sync (regarding IMEvents)
190 val isOutOfSyncIM = imEvent.getOccurredMillis < this._userAgreementHistoryModel.latestIMEventOccurredMillis
192 // clear all resource state
198 // Check out of sync (regarding ResourceEvents)
199 val isOutOfSyncRC = false // FIXME implement
207 assert(!MessageHelpers.isIMEventCreate(imEvent), "!MessageHelpers.isIMEventCreate(imEvent)")
209 // Make new agreement
210 this._userAgreementHistoryModel.insertUserAgreementMsgFromIMEvent(imEvent)
211 this._imMsgCount += 1
212 DEBUG("Agreements: %s", this._userAgreementHistoryModel)
215 def onResourceEventMsg(rcEvent: ResourceEventMsg) {
217 DEBUG("No agreements. Ignoring %s", rcEvent)
222 val now = TimeHelpers.nowMillis()
223 val resourceMapping = aquarium.resourceMappingAtMillis(now)
225 val nowBillingMonthInfo = BillingMonthInfo.fromMillis(now)
226 val nowYear = nowBillingMonthInfo.year
227 val nowMonth = nowBillingMonthInfo.month
229 val eventOccurredMillis = rcEvent.getOccurredMillis
230 val eventBillingMonthInfo = BillingMonthInfo.fromMillis(eventOccurredMillis)
231 val eventYear = eventBillingMonthInfo.year
232 val eventMonth = eventBillingMonthInfo.month
234 def computeBatch(): Unit = {
235 DEBUG("Going for out of sync charging for %s", rcEvent.getOriginalID)
237 this._userStateMsg = chargingService.replayMonthChargingUpTo(
238 this._userAgreementHistoryModel,
240 // Take into account that the event may be out-of-sync.
241 // TODO: Should we use this._latestResourceEventOccurredMillis instead of now?
242 now max eventOccurredMillis,
244 stdUserStateStoreFunc
249 def computeRealtime(): Unit = {
250 DEBUG("Going for in sync charging for %s", rcEvent.getOriginalID)
251 chargingService.processResourceEvent(
253 this._userAgreementHistoryModel,
260 this._rcMsgCount += 1
263 val oldTotalCredits =
264 if(this._userStateMsg!=null)
265 this._userStateMsg.totalCredits
269 if(this._userStateMsg eq null) {
272 else if(nowYear != eventYear || nowMonth != eventMonth) {
274 "nowYear(%s) != eventYear(%s) || nowMonth(%s) != eventMonth(%s)",
280 else if(this._userStateMsg.latestResourceEventOccurredMillis < rcEvent.getOccurredMillis) {
281 DEBUG("this._workingUserState.latestResourceEventOccurredMillis < rcEvent.occurredMillis")
284 TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userStateMsg.latestResourceEventOccurredMillis),
285 TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis)
290 DEBUG("OUT OF ORDER! this._workingUserState.latestResourceEventOccurredMillis=%s and rcEvent.occurredMillis=%s",
291 TimeHelpers.toYYYYMMDDHHMMSSSSS(this._userStateMsg.latestResourceEventOccurredMillis),
292 TimeHelpers.toYYYYMMDDHHMMSSSSS(rcEvent.getOccurredMillis))
296 val newTotalCredits = this._userStateMsg.totalCredits
297 if(oldTotalCredits * newTotalCredits < 0)
298 aquarium.eventBus ! new BalanceEvent(this._userStateMsg.userID,
300 DEBUG("Updated %s", this._userStateMsg)
304 def onGetUserBillRequest(event: GetUserBillRequest): Unit = {
306 val timeslot = event.timeslot
307 val resourceTypes: Map[String, ResourceType] = aquarium.policyStore.
308 loadSortedPolicyModelsWithin(timeslot.from.getTime,
309 timeslot.to.getTime).
310 values.headOption match {
311 case None => Map[String,ResourceType]()
312 case Some(policy:PolicyModel) => policy.resourceTypesMap
314 val state= if(haveUserState) Some(this._userStateMsg) else None
315 val billEntryMsg = BillEntryMsg.fromWorkingUserState(timeslot,this._userID,state,resourceTypes)
316 //val billEntryMsg = MessageFactory.createBillEntryMsg(billEntry)
317 //logger.debug("BILL ENTRY MSG: " + billEntryMsg.toString)
318 val billData = GetUserBillResponseData(this._userID,billEntryMsg)
319 sender ! GetUserBillResponse(Right(billData))
323 sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500)
327 def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
328 val userID = event.userID
330 (haveAgreements, haveUserState) match {
332 // (User CREATEd, with balance state)
333 val realtimeMillis = TimeHelpers.nowMillis()
334 chargingService.calculateRealtimeUserState(
335 this._userAgreementHistoryModel,
337 BillingMonthInfo.fromMillis(realtimeMillis),
338 aquarium.resourceMappingAtMillis(realtimeMillis),
342 sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this._userID, this._userStateMsg.totalCredits)))
345 // (User CREATEd, no balance state)
346 // Return the default initial balance
347 sender ! GetUserBalanceResponse(
349 GetUserBalanceResponseData(
351 aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis)
355 // (Not CREATEd, with balance state)
356 // Clearly this is internal error
357 sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
359 case (false, false) ⇒
360 // (Not CREATEd, no balance state)
361 // The user is completely unknown
362 sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
366 def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
367 haveUserState match {
369 val realtimeMillis = TimeHelpers.nowMillis()
370 chargingService.calculateRealtimeUserState(
371 this._userAgreementHistoryModel,
373 BillingMonthInfo.fromMillis(realtimeMillis),
374 aquarium.resourceMappingAtMillis(realtimeMillis),
378 sender ! GetUserStateResponse(Right(this._userStateMsg))
381 sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
385 def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = {
386 haveUserState match {
388 DEBUG("haveWorkingUserState: %s", event)
389 val realtimeMillis = TimeHelpers.nowMillis()
390 chargingService.calculateRealtimeUserState(
391 this._userAgreementHistoryModel,
393 BillingMonthInfo.fromMillis(realtimeMillis),
394 aquarium.resourceMappingAtMillis(realtimeMillis),
398 sender ! GetUserWalletResponse(
400 GetUserWalletResponseData(
402 this._userStateMsg.totalCredits,
403 MessageFactory.newWalletEntriesMsg(this._userStateMsg.getWalletEntries)
407 DEBUG("!haveWorkingUserState: %s", event)
408 haveAgreements match {
410 DEBUG("haveAgreements: %s", event)
411 sender ! GetUserWalletResponse(
413 GetUserWalletResponseData(
415 aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis),
416 MessageFactory.newWalletEntriesMsg()
420 DEBUG("!haveUserCreationIMEvent: %s", event)
421 sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404)
426 def onSetUserActorUserID(userID: String) {
427 this._userID = userID
430 private[this] def D_userID = {
434 private[this] def DEBUG(fmt: String, args: Any*) =
435 logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
437 private[this] def INFO(fmt: String, args: Any*) =
438 logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
440 private[this] def WARN(fmt: String, args: Any*) =
441 logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
443 private[this] def ERROR(fmt: String, args: Any*) =
444 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
446 private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
447 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)