Statistics
| Branch: | Tag: | Revision:

root / src / main / scala / gr / grnet / aquarium / actor / service / user / UserActor.scala @ 060b04bd

History | View | Annotate | Download (14.7 kB)

1
/*
2
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *
12
 *   2. Redistributions in binary form must reproduce the above
13
 *      copyright notice, this list of conditions and the following
14
 *      disclaimer in the documentation and/or other materials
15
 *      provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
 * POSSIBILITY OF SUCH DAMAGE.
29
 *
30
 * The views and conclusions contained in the software and
31
 * documentation are those of the authors and should not be
32
 * interpreted as representing official policies, either expressed
33
 * or implied, of GRNET S.A.
34
 */
35

    
36
package gr.grnet.aquarium.actor
37
package service
38
package user
39

    
40
import gr.grnet.aquarium.{Real, AquariumInternalError}
41
import gr.grnet.aquarium.actor.message.GetUserBalanceRequest
42
import gr.grnet.aquarium.actor.message.GetUserBalanceResponse
43
import gr.grnet.aquarium.actor.message.GetUserBalanceResponseData
44
import gr.grnet.aquarium.actor.message.GetUserBillRequest
45
import gr.grnet.aquarium.actor.message.GetUserBillResponse
46
import gr.grnet.aquarium.actor.message.GetUserBillResponseData
47
import gr.grnet.aquarium.actor.message.GetUserStateRequest
48
import gr.grnet.aquarium.actor.message.GetUserStateResponse
49
import gr.grnet.aquarium.actor.message.GetUserWalletRequest
50
import gr.grnet.aquarium.actor.message.GetUserWalletResponse
51
import gr.grnet.aquarium.actor.message.GetUserWalletResponseData
52
import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded
53
import gr.grnet.aquarium.charging.state.UserStateModel
54
import gr.grnet.aquarium.message.avro.gen.{UserAgreementHistoryMsg, IMEventMsg, ResourceEventMsg}
55
import gr.grnet.aquarium.message.avro.{ModelFactory, MessageFactory, MessageHelpers}
56
import gr.grnet.aquarium.service.event.BalanceEvent
57
import gr.grnet.aquarium.util.date.TimeHelpers
58
import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
59
import gr.grnet.aquarium.policy.{ResourceType, PolicyModel}
60
import gr.grnet.aquarium.charging.bill.BillEntryMsg
61

    
62
/**
63
 *
64
 * @author Christos KK Loverdos <loverdos@gmail.com>
65
 */
66

    
67
class UserActor extends ReflectiveRoleableActor {
68
  private[this] var _imMsgCount = 0
69
  private[this] var _userStateModel: UserStateModel = _
70

    
71
  def userID = {
72
    if(!haveUserState) {
73
      throw new AquariumInternalError("%s not initialized")
74
    }
75

    
76
    this._userStateModel.userID
77
  }
78

    
79
  override def postStop() {
80
    DEBUG("I am finally stopped (in postStop())")
81
    aquarium.akkaService.notifyUserActorPostStop(this)
82
  }
83

    
84
  private[this] def shutmedown(): Unit = {
85
    if(haveUserState) {
86
      aquarium.akkaService.invalidateUserActor(this)
87
    }
88
  }
89

    
90
  override protected def onThrowable(t: Throwable, message: AnyRef) = {
91
    LogHelpers.logChainOfCauses(logger, t)
92
    ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
93

    
94
    shutmedown()
95
  }
96

    
97
  def role = UserActorRole
98

    
99
  private[this] def chargingService = aquarium.chargingService
100

    
101
  def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
102
  }
103

    
104
  private[this] def unsafeUserCreationIMEventMsg = {
105
    this._userStateModel.unsafeUserCreationIMEvent
106
  }
107

    
108
  private[this] def haveAgreements = {
109
    (this._userStateModel ne null)
110
  }
111

    
112
  private[this] def haveUserCreationEvent = {
113
    haveAgreements &&
114
    this._userStateModel.hasUserCreationEvent
115
  }
116

    
117
  private[this] def haveUserState = {
118
    (this._userStateModel ne null)
119
  }
120

    
121
  private[this] def isInitial = this._userStateModel.isInitial
122

    
123
  /**
124
   * Creates the agreement history from all the stored IMEvents.
125
   *
126
   * @return (`true` iff there was a user CREATE event, the number of events processed)
127
   */
128
  private[this] def createUserAgreementHistoryFromIMEvents(userID: String): (Boolean, Int) = {
129
    DEBUG("createUserAgreementHistoryFromStoredIMEvents()")
130
    assert(haveUserState, "haveUserState")
131

    
132

    
133
    var _imcounter = 0
134

    
135
    val hadCreateEvent = aquarium.imEventStore.foreachIMEventInOccurrenceOrder(userID) { imEvent ⇒
136
      _imcounter += 1
137
      DEBUG("Replaying [%s/%s] %s", shortClassNameOf(imEvent), _imcounter, imEvent)
138

    
139
      if(_imcounter == 1 && !MessageHelpers.isUserCreationIMEvent(imEvent)) {
140
        // The very first event must be a CREATE event. Otherwise we abort initialization.
141
        // This will normally happen during devops :)
142
        INFO("Ignoring first %s since it is not CREATE", shortClassNameOf(imEvent))
143
        false
144
      }
145
      else {
146
        val effectiveFromMillis = imEvent.getOccurredMillis
147
        val role = imEvent.getRole
148
        // calling unsafe just for the side-effect
149
        assert(
150
          aquarium.unsafeFullPriceTableForRoleAt(role, effectiveFromMillis) ne null,
151
          "aquarium.unsafeFullPriceTableForRoleAt(%s, %s) ne null".format(role, effectiveFromMillis)
152
        )
153

    
154
        this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent)
155
        true
156
      }
157
    }
158

    
159
    this._imMsgCount = _imcounter
160

    
161
    DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg)
162
    (hadCreateEvent, _imcounter)
163
  }
164

    
165
  private[this] def saveFirstUserState(userID: String) {
166
    this._userStateModel.userStateMsg.setIsFirst(true)
167
    this._userStateModel.updateUserStateMsg(
168
      aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg)
169
    )
170
  }
171

    
172
  private[this] def saveSubsequentUserState() {
173
    this._userStateModel.userStateMsg.setIsFirst(false)
174
    this._userStateModel.updateUserStateMsg(
175
      aquarium.userStateStore.insertUserState(this._userStateModel.userStateMsg)
176
    )
177
  }
178

    
179
  private[this] def loadLastKnownUserStateAndUpdateAgreements() {
180
    val userID = this._userStateModel.userID
181
    aquarium.userStateStore.findLatestUserState(userID) match {
182
      case None ⇒
183
        // First user state ever
184
        saveFirstUserState(userID)
185

    
186
      case Some(latestUserState) ⇒
187
        this._userStateModel.updateUserStateMsg(latestUserState)
188
    }
189
  }
190

    
191
  private[this] def processResourceEventsAfterLastKnownUserState() {
192
    // Update the user state snapshot with fresh (ie not previously processed) events.
193
    aquarium.resourceEventStore.foreachResourceEventOccurredInPeriod(
194
      this._userStateModel.userID,
195
      this._userStateModel.latestResourceEventOccurredMillis,
196
      TimeHelpers.nowMillis()
197
    )
198
  }
199

    
200
  private[this] def makeUserStateMsgUpToDate() {
201
    loadLastKnownUserStateAndUpdateAgreements()
202
    processResourceEventsAfterLastKnownUserState()
203
  }
204

    
205
  private[this] def checkInitial(nextThing: () ⇒ Any = () ⇒ {}): Boolean = {
206
    if(!isInitial) {
207
      return false
208
    }
209

    
210
    val (userCreated, imEventsCount) = createUserAgreementHistoryFromIMEvents(userID)
211

    
212
    if(userCreated) {
213
      makeUserStateMsgUpToDate()
214
    }
215

    
216
    nextThing()
217

    
218
    true
219
  }
220

    
221
  /**
222
   * Processes [[gr.grnet.aquarium.message.avro.gen.IMEventMsg]]s that come directly from the
223
   * messaging hub (rabbitmq).
224
   */
225
  def onIMEventMsg(imEvent: IMEventMsg) {
226
    if(checkInitial()) {
227
      return
228
    }
229

    
230
    // Check for out of sync (regarding IMEvents)
231
    val isOutOfSyncIM = imEvent.getOccurredMillis < this._userStateModel.latestIMEventOccurredMillis
232
    if(isOutOfSyncIM) {
233
      // clear all resource state
234
      // FIXME implement
235

    
236
      return
237
    }
238

    
239
    // Check out of sync (regarding ResourceEvents)
240
    val isOutOfSyncRC = false // FIXME implement
241
    if(isOutOfSyncRC) {
242
      // TODO
243

    
244
      return
245
    }
246

    
247
    // Make new agreement
248
    this._userStateModel.insertUserAgreementMsgFromIMEvent(imEvent)
249
    this._imMsgCount += 1
250

    
251
    if(MessageHelpers.isUserCreationIMEvent(imEvent)) {
252
      makeUserStateMsgUpToDate()
253
    }
254

    
255
    DEBUG("Agreements: %s", this._userStateModel.userAgreementHistoryMsg)
256
  }
257

    
258
  def onResourceEventMsg(rcEvent: ResourceEventMsg) {
259
    if(checkInitial()) {
260
      return
261
    }
262

    
263
    if(!haveUserCreationEvent) {
264
      DEBUG("No agreements. Ignoring %s", rcEvent)
265

    
266
      return
267
    }
268

    
269
    assert(haveUserState, "haveUserState")
270

    
271
    val oldTotalCredits = this._userStateModel.totalCreditsAsReal
272

    
273
    chargingService.processResourceEvent(
274
      rcEvent.getReceivedMillis,
275
      rcEvent,
276
      this._userStateModel,
277
      aquarium.currentResourceMapping,
278
      true
279
    )
280

    
281
    val newTotalCredits = this._userStateModel.totalCreditsAsReal
282

    
283
    if(oldTotalCredits.signum * newTotalCredits.signum < 0) {
284
      aquarium.eventBus ! new BalanceEvent(userID, newTotalCredits >= 0)
285
    }
286

    
287
    DEBUG("Updated %s", this._userStateModel)
288
  }
289

    
290
  def onGetUserBillRequest(event: GetUserBillRequest): Unit = {
291
    checkInitial()
292

    
293
    try{
294
      val timeslot = event.timeslot
295
      val resourceTypes: Map[String, ResourceType] = aquarium.policyStore.
296
                          loadSortedPolicyModelsWithin(timeslot.from.getTime,
297
                                                       timeslot.to.getTime).
298
                          values.headOption match {
299
          case None => Map[String,ResourceType]()
300
          case Some(policy:PolicyModel) => policy.resourceTypesMap
301
      }
302
      val state= if(haveUserState) Some(this._userStateModel.userStateMsg) else None
303
      val billEntryMsg = BillEntryMsg.fromWorkingUserState(timeslot,this.userID,state,resourceTypes)
304
      //val billEntryMsg = MessageFactory.createBillEntryMsg(billEntry)
305
      //logger.debug("BILL ENTRY MSG: " + billEntryMsg.toString)
306
      val billData = GetUserBillResponseData(this.userID,billEntryMsg)
307
      sender ! GetUserBillResponse(Right(billData))
308
    } catch {
309
      case e:Exception =>
310
        e.printStackTrace()
311
        sender ! GetUserBillResponse(Left("Internal Server Error [AQU-BILL-0001]"), 500)
312
    }
313
  }
314

    
315
  def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
316
    checkInitial()
317

    
318
    val userID = event.userID
319

    
320
    (haveAgreements, haveUserState) match {
321
      case (true, true) ⇒
322
        // (User CREATEd, with balance state)
323
        val realtimeMillis = TimeHelpers.nowMillis()
324
        chargingService.calculateRealtimeUserState(
325
          this._userStateModel,
326
          aquarium.currentResourceMapping,
327
          realtimeMillis
328
        )
329

    
330
        sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(this.userID, this._userStateModel.totalCredits)))
331

    
332
      case (true, false) ⇒
333
        // (User CREATEd, no balance state)
334
        // Return the default initial balance
335
        sender ! GetUserBalanceResponse(
336
          Right(
337
            GetUserBalanceResponseData(
338
              this.userID,
339
              aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis).toString()
340
            )))
341

    
342
      case (false, true) ⇒
343
        // (Not CREATEd, with balance state)
344
        // Clearly this is internal error
345
        sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
346

    
347
      case (false, false) ⇒
348
        // (Not CREATEd, no balance state)
349
        // The user is completely unknown
350
        sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
351
    }
352
  }
353

    
354
  def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
355
    checkInitial()
356

    
357
    haveUserState match {
358
      case true ⇒
359
        val realtimeMillis = TimeHelpers.nowMillis()
360
        chargingService.calculateRealtimeUserState(
361
          this._userStateModel,
362
          aquarium.currentResourceMapping,
363
          realtimeMillis
364
        )
365

    
366
        sender ! GetUserStateResponse(Right(this._userStateModel.userStateMsg))
367

    
368
      case false ⇒
369
        sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
370
    }
371
  }
372

    
373
  def onGetUserWalletRequest(event: GetUserWalletRequest): Unit = {
374
    checkInitial()
375

    
376
    haveUserState match {
377
      case true ⇒
378
        DEBUG("haveWorkingUserState: %s", event)
379
        val realtimeMillis = TimeHelpers.nowMillis()
380
        chargingService.calculateRealtimeUserState(
381
          this._userStateModel,
382
          aquarium.currentResourceMapping,
383
          realtimeMillis
384
        )
385

    
386
        sender ! GetUserWalletResponse(
387
          Right(
388
            GetUserWalletResponseData(
389
              this.userID,
390
              this._userStateModel.totalCredits,
391
              MessageFactory.newWalletEntriesMsg(this._userStateModel.userStateMsg.getWalletEntries)
392
            )))
393

    
394
      case false ⇒
395
        DEBUG("!haveWorkingUserState: %s", event)
396
        haveAgreements match {
397
          case true ⇒
398
            DEBUG("haveAgreements: %s", event)
399
            sender ! GetUserWalletResponse(
400
              Right(
401
                GetUserWalletResponseData(
402
                  this.userID,
403
                  Real.toMsgField(aquarium.initialUserBalance(this.unsafeUserCreationIMEventMsg.getRole, this.unsafeUserCreationIMEventMsg.getOccurredMillis)),
404
                  MessageFactory.newWalletEntriesMsg()
405
                )))
406

    
407
          case false ⇒
408
            DEBUG("!haveUserCreationIMEvent: %s", event)
409
            sender ! GetUserWalletResponse(Left("No wallet for user %s [AQU-WAL-00 8]".format(event.userID)), 404)
410
        }
411
    }
412
  }
413

    
414
  /**
415
   * Initializes the actor's internal state.
416
   *
417
   * @param userID
418
   */
419
  def onSetUserActorUserID(userID: String) {
420
    // Create the full agreement history from the original sources (IMEvents)
421
    this._userStateModel = ModelFactory.newInitialUserStateModel(
422
      userID,
423
      Real(0),
424
      TimeHelpers.nowMillis()
425
    )
426

    
427
    require(this._userStateModel.isInitial, "this._userStateModel.isInitial")
428
  }
429

    
430
  private[this] def D_userID = {
431
    if(haveUserState) userID else "???"
432
  }
433

    
434
  private[this] def DEBUG(fmt: String, args: Any*) =
435
    logger.debug("[%s] - %s".format(D_userID, fmt.format(args: _*)))
436

    
437
  private[this] def INFO(fmt: String, args: Any*) =
438
    logger.info("[%s] - %s".format(D_userID, fmt.format(args: _*)))
439

    
440
  private[this] def WARN(fmt: String, args: Any*) =
441
    logger.warn("[%s] - %s".format(D_userID, fmt.format(args: _*)))
442

    
443
  private[this] def ERROR(fmt: String, args: Any*) =
444
    logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)))
445

    
446
  private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
447
    logger.error("[%s] - %s".format(D_userID, fmt.format(args: _*)), t)
448
}