2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.actor
40 import gr.grnet.aquarium.{Real, AquariumInternalError}
41 import gr.grnet.aquarium.actor.message.GetUserBalanceRequest
42 import gr.grnet.aquarium.actor.message.GetUserBalanceResponse
43 import gr.grnet.aquarium.actor.message.GetUserBalanceResponseData
44 import gr.grnet.aquarium.actor.message.GetUserBillRequest
45 import gr.grnet.aquarium.actor.message.GetUserBillResponse
46 import gr.grnet.aquarium.actor.message.GetUserBillResponseData
47 import gr.grnet.aquarium.actor.message.GetUserStateRequest
48 import gr.grnet.aquarium.actor.message.GetUserStateResponse
49 import gr.grnet.aquarium.actor.message.GetUserWalletRequest
50 import gr.grnet.aquarium.actor.message.GetUserWalletResponse
51 import gr.grnet.aquarium.actor.message.GetUserWalletResponseData
52 import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded
53 import gr.grnet.aquarium.charging.state.UserStateModel
54 import gr.grnet.aquarium.message.avro.gen.{UserAgreementHistoryMsg, IMEventMsg, ResourceEventMsg}
55 import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers}
56 import gr.grnet.aquarium.service.event.BalanceEvent
57 import gr.grnet.aquarium.util.date.TimeHelpers
58 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
59 import gr.grnet.aquarium.policy.{ResourceType, PolicyModel}
60 import gr.grnet.aquarium.charging.bill.BillEntryMsg
64 * @author Christos KK Loverdos <loverdos@gmail.com>
67 class UserActor extends ReflectiveRoleableActor {
68 private[this] var _imMsgCount = 0
69 private[this] var _userStateModel: UserStateModel = _
73 throw new AquariumInternalError("%s not initialized")
76 this._userStateModel.userID
79 override def postStop() {
80 DEBUG("I am finally stopped (in postStop())")
81 aquarium.akkaService.notifyUserActorPostStop(this)
84 private[this] def shutmedown(): Unit = {
86 aquarium.akkaService.invalidateUserActor(this)
90 override protected def onThrowable(t: Throwable, message: AnyRef) = {
91 LogHelpers.logChainOfCauses(logger, t)
92 ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
97 def role = UserActorRole
99 private[this] def chargingService = aquarium.chargingService
101 def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
104 private[this] def unsafeUserCreationIMEventMsg = {
105 this._userStateModel.unsafeUserCreationIMEvent
108 private[this] def haveAgreements = {
109 (this._userStateModel ne null)
112 private[this] def haveUserCreationEvent = {
114 this._userStateModel.hasUserCreationEvent
117 private[this] def haveUserState = {
118 (this._userStateModel ne null)
121 private[this] def isInitial = this._userStateModel.isInitial
124 * Creates the agreement history from all the stored IMEvents.
126 * @return (`true` iff there was a user CREATE event, the number of events processed)
128 private[this] def createUserAgreementHistoryFromIMEvents(userID: String): (Boolean, Int) = {
129 DEBUG("createUserAgreementHistoryFromStoredIMEvents()")
130 assert(haveUserState, "haveUserState")
135 val hadCreateEvent = aquarium.imEventStore.foreachIMEventInOccurrenceOrder(userID) { imEvent ⇒
137 DEBUG("Replaying [%s/%s] %s", shortClassNameOf(imEvent), _imcounter, imEvent)
139 if(_imcounter == 1 && !MessageHelpers.isUserCreationIMEvent(imEvent)) {
140 // The very first event must be a CREATE event. Otherwise we abort initialization.
141 // This will normally happen during devops :)
142 INFO("Ignoring first %s since it is not CREATE", shortClassNameOf(imEvent))
146 val effectiveFromMillis = imEvent.getOccurredMillis
147 val role = imEvent.getRole
148 // calling unsafe just for the side-effect
150 aquarium.unsafeFullPriceTableForRoleAt(role, effectiveFromMillis) ne null,
151 "aquarium.unsafeFullPriceTableForRoleAt(%s, %s) ne null".format(role, effectiveFromMillis)
154 this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent)
159 this._imMsgCount = _imcounter
161 DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg)
162 (hadCreateEvent, _imcounter)
165 private[this] def saveFirstUserState(userID: String) {
166 this._userStateModel.userStateMsg.setIsFirst(true)
167 this._userStateModel.updateUserStateMsg(
168 aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg)
172 private[this] def saveSubsequentUserState() {
173 this._userStateModel.userStateMsg.setIsFirst(false)
174 this._userStateModel.updateUserStateMsg(
175 aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg)
179 private[this] def loadLastKnownUserStateAndUpdateAgreements() {
180 val userID = this._userStateModel.userID
181 aquarium.userStateStore.findLatestUserState(userID) match {
183 // First user state ever
184 saveFirstUserState(userID)
186 case Some(latestUserState) ⇒
187 this._userStateModel.updateUserStateMsg(latestUserState)
191 private[this] def processResourceEventsAfterLastKnownUserState() {
192 // Update the user state snapshot with fresh (ie not previously processed) events.
196 private[this] def makeUserStateMsgUpToDate() {
197 loadLastKnownUserStateAndUpdateAgreements()
198 processResourceEventsAfterLastKnownUserState()
201 private[this] def checkInitial(nextThing: () ⇒ Any = () ⇒ {}): Boolean = {
206 val (userCreated, imEventsCount) = createUserAgreementHistoryFromIMEvents(userID)
209 makeUserStateMsgUpToDate()
218 * Processes [[gr.grnet.aquarium.message.avro.gen.IMEventMsg]]s that come directly from the
219 * messaging hub (rabbitmq).
221 def onIMEventMsg(imEvent: IMEventMsg) {
226 // Check for out of sync (regarding IMEvents)
227 val isOutOfSyncIM = imEvent.getOccurredMillis < this._userStateModel.latestIMEventOccurredMillis
229 // clear all resource state
235 // Check out of sync (regarding ResourceEvents)
236 val isOutOfSyncRC = false // FIXME implement
243 // Make new agreement
244 this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent)
245 this._imMsgCount += 1
247 if(MessageHelpers.isUserCreationIMEvent(imEvent)) {
248 makeUserStateMsgUpToDate()
251 DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg)
254 def onResourceEventMsg(rcEvent: ResourceEventMsg) {
259 if(!haveUserCreationEvent) {
260 DEBUG("No agreements. Ignoring %s", rcEvent)
265 assert(haveUserState, "haveUserState")
267 val oldTotalCredits = this._userStateModel.totalCreditsAsReal
269 chargingService.processResourceEvent(
270 rcEvent.getReceivedMillis,
272 this._userStateModel,
273 aquarium.currentResourceMapping,
277 val newTotalCredits = this._userStateModel.totalCreditsAsReal
279 if(oldTotalCredits.signum * newTotalCredits.signum < 0) {
280 aquarium.eventBus ! new BalanceEvent(userID, newTotalCredits >= 0)
283 DEBUG("Updated %s", this._userStateModel)
286 def onGetUserBillRequest(event: GetUserBillRequest): Unit = {
290 val timeslot = event.timeslot
291 val resourceTypes: Map[String, ResourceType] = aquarium.policyStore.
292 loadSortedPolicyModelsWithin(timeslot.from.getTime,
293 timeslot.to.getTime).
294 values.headOption match {
295 case None => Map[String,ResourceType]()
296 case Some(policy:PolicyModel) => policy.resourceTypesMap
298 val state= if(haveUserState) Some(this._userStateModel.userStateMsg) else None
299 val billEntryMsg = BillEntryMsg.fromWorkingUserState(timeslot,this.userID,state,resourceTypes)
300 //val billEntryMsg = MessageFactory.createBillEntryMsg(billEntry)
301 //logger.debug("BILL ENTRY MSG: " + billEntryMsg.toString)
302 val billData = GetUserBillResponseData(this.userID,billEntryMsg)
303 sender ! GetUserBillResponse(Right(billData))
307 sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500)
311 def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
314 val userID = event.userID
316 (haveAgreements, haveUserState) match {
318 // (User CREATEd, with balance state)
319 val realtimeMillis = TimeHelpers.nowMillis()
320 chargingService.calculateRealtimeUserState(
321 this._userStateModel,
322 aquarium.currentResourceMapping,
326 sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this.userID, this._userStateModel.totalCredits)))
329 // (User CREATEd, no balance state)
330 // Return the default initial balance
331 sender ! GetUserBalanceResponse(
333 GetUserBalanceResponseData(
335 aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis).toString()
339 // (Not CREATEd, with balance state)
340 // Clearly this is internal error
341 sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
343 case (false, false) ⇒
344 // (Not CREATEd, no balance state)
345 // The user is completely unknown
346 sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
350 def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
353 haveUserState match {
355 val realtimeMillis = TimeHelpers.nowMillis()
356 chargingService.calculateRealtimeUserState(
357 this._userStateModel,
358 aquarium.currentResourceMapping,
362 sender ! GetUserStateResponse(Right(this._userStateModel.userStateMsg))
365 sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
369 def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = {
372 haveUserState match {
374 DEBUG("haveWorkingUserState: %s", event)
375 val realtimeMillis = TimeHelpers.nowMillis()
376 chargingService.calculateRealtimeUserState(
377 this._userStateModel,
378 aquarium.currentResourceMapping,
382 sender ! GetUserWalletResponse(
384 GetUserWalletResponseData(
386 this._userStateModel.totalCredits,
387 MessageFactory.newWalletEntriesMsg(this._userStateModel.userStateMsg.getWalletEntries)
391 DEBUG("!haveWorkingUserState: %s", event)
392 haveAgreements match {
394 DEBUG("haveAgreements: %s", event)
395 sender ! GetUserWalletResponse(
397 GetUserWalletResponseData(
399 Real.toMsgField(aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis)),
400 MessageFactory.newWalletEntriesMsg()
404 DEBUG("!haveUserCreationIMEvent: %s", event)
405 sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404)
411 * Initializes the actor's internal state.
415 def onSetUserActorUserID(userID: String) {
416 // Create the full agreement history from the original sources (IMEvents)
417 this._userStateModel = ModelFactory.newInitialUserStateModel(
420 TimeHelpers.nowMillis()
423 require(this._userStateModel.isInitial, "this._userStateModel.isInitial")
426 private[this] def D_userID = {
427 if(haveUserState) userID else "???"
430 private[this] def DEBUG(fmt: String, args: Any*) =
431 logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
433 private[this] def INFO(fmt: String, args: Any*) =
434 logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
436 private[this] def WARN(fmt: String, args: Any*) =
437 logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
439 private[this] def ERROR(fmt: String, args: Any*) =
440 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
442 private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
443 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)