2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.actor
40 import gr.grnet.aquarium.{Real, AquariumInternalError}
41 import gr.grnet.aquarium.actor.message.GetUserBalanceRequest
42 import gr.grnet.aquarium.actor.message.GetUserBalanceResponse
43 import gr.grnet.aquarium.actor.message.GetUserBalanceResponseData
44 import gr.grnet.aquarium.actor.message.GetUserBillRequest
45 import gr.grnet.aquarium.actor.message.GetUserBillResponse
46 import gr.grnet.aquarium.actor.message.GetUserBillResponseData
47 import gr.grnet.aquarium.actor.message.GetUserStateRequest
48 import gr.grnet.aquarium.actor.message.GetUserStateResponse
49 import gr.grnet.aquarium.actor.message.GetUserWalletRequest
50 import gr.grnet.aquarium.actor.message.GetUserWalletResponse
51 import gr.grnet.aquarium.actor.message.GetUserWalletResponseData
52 import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded
53 import gr.grnet.aquarium.charging.state.UserStateModel
54 import gr.grnet.aquarium.message.avro.gen.{UserAgreementHistoryMsg, IMEventMsg, ResourceEventMsg}
55 import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers}
56 import gr.grnet.aquarium.service.event.BalanceEvent
57 import gr.grnet.aquarium.util.date.TimeHelpers
58 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
59 import gr.grnet.aquarium.policy.{ResourceType, PolicyModel}
60 import gr.grnet.aquarium.charging.bill.BillEntryMsg
64 * @author Christos KK Loverdos <loverdos@gmail.com>
67 class UserActor extends ReflectiveRoleableActor {
68 private[this] var _imMsgCount = 0
69 private[this] var _userStateModel: UserStateModel = _
73 throw new AquariumInternalError("%s not initialized")
76 this._userStateModel.userID
79 override def postStop() {
80 DEBUG("I am finally stopped (in postStop())")
81 aquarium.akkaService.notifyUserActorPostStop(this)
84 private[this] def shutmedown(): Unit = {
86 aquarium.akkaService.invalidateUserActor(this)
90 override protected def onThrowable(t: Throwable, message: AnyRef) = {
91 LogHelpers.logChainOfCauses(logger, t)
92 ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
97 def role = UserActorRole
99 private[this] def chargingService = aquarium.chargingService
101 def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
104 private[this] def unsafeUserCreationIMEventMsg = {
105 this._userStateModel.unsafeUserCreationIMEvent
108 private[this] def haveAgreements = {
109 (this._userStateModel ne null)
112 private[this] def haveUserCreationEvent = {
114 this._userStateModel.hasUserCreationEvent
117 private[this] def haveUserState = {
118 (this._userStateModel ne null)
121 private[this] def isInitial = this._userStateModel.isInitial
124 * Creates the agreement history from all the stored IMEvents.
126 * @return (`true` iff there was a user CREATE event, the number of events processed)
128 private[this] def createUserAgreementHistoryFromIMEvents(userID: String): (Boolean, Int) = {
129 DEBUG("createUserAgreementHistoryFromStoredIMEvents()")
130 assert(haveUserState, "haveUserState")
135 val hadCreateEvent = aquarium.imEventStore.foreachIMEventInOccurrenceOrder(userID) { imEvent ⇒
137 DEBUG("Replaying [%s/%s] %s", shortClassNameOf(imEvent), _imcounter, imEvent)
139 if(_imcounter == 1 && !MessageHelpers.isUserCreationIMEvent(imEvent)) {
140 // The very first event must be a CREATE event. Otherwise we abort initialization.
141 // This will normally happen during devops :)
142 INFO("Ignoring first %s since it is not CREATE", shortClassNameOf(imEvent))
146 val effectiveFromMillis = imEvent.getOccurredMillis
147 val role = imEvent.getRole
148 // calling unsafe just for the side-effect
150 aquarium.unsafeFullPriceTableForRoleAt(role, effectiveFromMillis) ne null,
151 "aquarium.unsafeFullPriceTableForRoleAt(%s, %s) ne null".format(role, effectiveFromMillis)
154 this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent)
159 this._imMsgCount = _imcounter
161 DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg)
162 (hadCreateEvent, _imcounter)
165 private[this] def saveFirstUserState(userID: String) {
166 this._userStateModel.userStateMsg.setIsFirst(true)
167 this._userStateModel.updateUserStateMsg(
168 aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg)
172 private[this] def saveSubsequentUserState() {
173 this._userStateModel.userStateMsg.setIsFirst(false)
174 this._userStateModel.updateUserStateMsg(
175 aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg)
179 private[this] def loadLastKnownUserStateAndUpdateAgreements() {
180 val userID = this._userStateModel.userID
181 aquarium.userStateStore.findLatestUserState(userID) match {
183 // First user state ever
184 saveFirstUserState(userID)
186 case Some(latestUserState) ⇒
187 this._userStateModel.updateUserStateMsg(latestUserState)
191 private[this] def processResourceEventsAfterLastKnownUserState() {
192 // Update the user state snapshot with fresh (ie not previously processed) events.
195 private[this] def makeUserStateMsgUpToDate() {
196 loadLastKnownUserStateAndUpdateAgreements()
197 processResourceEventsAfterLastKnownUserState()
200 private[this] def checkInitial(nextThing: () ⇒ Any = () ⇒ {}): Boolean = {
205 val (userCreated, imEventsCount) = createUserAgreementHistoryFromIMEvents(userID)
208 makeUserStateMsgUpToDate()
217 * Processes [[gr.grnet.aquarium.message.avro.gen.IMEventMsg]]s that come directly from the
218 * messaging hub (rabbitmq).
220 def onIMEventMsg(imEvent: IMEventMsg) {
225 // Check for out of sync (regarding IMEvents)
226 val isOutOfSyncIM = imEvent.getOccurredMillis < this._userStateModel.latestIMEventOccurredMillis
228 // clear all resource state
234 // Check out of sync (regarding ResourceEvents)
235 val isOutOfSyncRC = false // FIXME implement
242 // Make new agreement
243 this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent)
244 this._imMsgCount += 1
246 if(MessageHelpers.isUserCreationIMEvent(imEvent)) {
247 makeUserStateMsgUpToDate()
250 DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg)
253 def onResourceEventMsg(rcEvent: ResourceEventMsg) {
258 if(!haveUserCreationEvent) {
259 DEBUG("No agreements. Ignoring %s", rcEvent)
264 assert(haveUserState, "haveUserState")
266 val oldTotalCredits = this._userStateModel.totalCreditsAsReal
268 chargingService.processResourceEvent(
269 rcEvent.getReceivedMillis,
271 this._userStateModel,
272 aquarium.currentResourceMapping,
276 val newTotalCredits = this._userStateModel.totalCreditsAsReal
278 if(oldTotalCredits.signum * newTotalCredits.signum < 0) {
279 aquarium.eventBus ! new BalanceEvent(userID, newTotalCredits >= 0)
282 DEBUG("Updated %s", this._userStateModel)
285 def onGetUserBillRequest(event: GetUserBillRequest): Unit = {
289 val timeslot = event.timeslot
290 val resourceTypes: Map[String, ResourceType] = aquarium.policyStore.
291 loadSortedPolicyModelsWithin(timeslot.from.getTime,
292 timeslot.to.getTime).
293 values.headOption match {
294 case None => Map[String,ResourceType]()
295 case Some(policy:PolicyModel) => policy.resourceTypesMap
297 val state= if(haveUserState) Some(this._userStateModel.userStateMsg) else None
298 val billEntryMsg = BillEntryMsg.fromWorkingUserState(timeslot,this.userID,state,resourceTypes)
299 //val billEntryMsg = MessageFactory.createBillEntryMsg(billEntry)
300 //logger.debug("BILL ENTRY MSG: " + billEntryMsg.toString)
301 val billData = GetUserBillResponseData(this.userID,billEntryMsg)
302 sender ! GetUserBillResponse(Right(billData))
306 sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500)
310 def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
313 val userID = event.userID
315 (haveAgreements, haveUserState) match {
317 // (User CREATEd, with balance state)
318 val realtimeMillis = TimeHelpers.nowMillis()
319 chargingService.calculateRealtimeUserState(
320 this._userStateModel,
321 aquarium.currentResourceMapping,
325 sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this.userID, this._userStateModel.totalCredits)))
328 // (User CREATEd, no balance state)
329 // Return the default initial balance
330 sender ! GetUserBalanceResponse(
332 GetUserBalanceResponseData(
334 aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis).toString()
338 // (Not CREATEd, with balance state)
339 // Clearly this is internal error
340 sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
342 case (false, false) ⇒
343 // (Not CREATEd, no balance state)
344 // The user is completely unknown
345 sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
349 def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
352 haveUserState match {
354 val realtimeMillis = TimeHelpers.nowMillis()
355 chargingService.calculateRealtimeUserState(
356 this._userStateModel,
357 aquarium.currentResourceMapping,
361 sender ! GetUserStateResponse(Right(this._userStateModel.userStateMsg))
364 sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
368 def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = {
371 haveUserState match {
373 DEBUG("haveWorkingUserState: %s", event)
374 val realtimeMillis = TimeHelpers.nowMillis()
375 chargingService.calculateRealtimeUserState(
376 this._userStateModel,
377 aquarium.currentResourceMapping,
381 sender ! GetUserWalletResponse(
383 GetUserWalletResponseData(
385 this._userStateModel.totalCredits,
386 MessageFactory.newWalletEntriesMsg(this._userStateModel.userStateMsg.getWalletEntries)
390 DEBUG("!haveWorkingUserState: %s", event)
391 haveAgreements match {
393 DEBUG("haveAgreements: %s", event)
394 sender ! GetUserWalletResponse(
396 GetUserWalletResponseData(
398 Real.toMsgField(aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis)),
399 MessageFactory.newWalletEntriesMsg()
403 DEBUG("!haveUserCreationIMEvent: %s", event)
404 sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404)
410 * Initializes the actor's internal state.
414 def onSetUserActorUserID(userID: String) {
415 // Create the full agreement history from the original sources (IMEvents)
416 this._userStateModel = ModelFactory.newInitialUserStateModel(
419 TimeHelpers.nowMillis()
422 require(this._userStateModel.isInitial, "this._userStateModel.isInitial")
425 private[this] def D_userID = {
426 if(haveUserState) userID else "???"
429 private[this] def DEBUG(fmt: String, args: Any*) =
430 logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
432 private[this] def INFO(fmt: String, args: Any*) =
433 logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
435 private[this] def WARN(fmt: String, args: Any*) =
436 logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
438 private[this] def ERROR(fmt: String, args: Any*) =
439 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
441 private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
442 logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)