Merge branch 'master'
authorChristos KK Loverdos <loverdos@gmail.com>
Wed, 19 Sep 2012 07:55:11 +0000 (10:55 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Wed, 19 Sep 2012 07:55:35 +0000 (10:55 +0300)
src/main/scala/gr/grnet/aquarium/actor/ActorRole.scala
src/main/scala/gr/grnet/aquarium/actor/message/config/InitializeUserActorState.scala [deleted file]
src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala
src/main/scala/gr/grnet/aquarium/charging/state/UserAgreementHistoryModel.scala
src/main/scala/gr/grnet/aquarium/service/AkkaService.scala

index 7989914..0acff42 100644 (file)
@@ -36,7 +36,7 @@ package gr.grnet.aquarium.actor
 
 import service.user.UserActor
 import message.{GetUserBillRequest, GetUserWalletRequest, GetUserStateRequest, GetUserBalanceRequest}
-import gr.grnet.aquarium.actor.message.config.{InitializeUserActorState, AquariumPropertiesLoaded, ActorConfigurationMessage}
+import gr.grnet.aquarium.actor.message.config.{AquariumPropertiesLoaded, ActorConfigurationMessage}
 import gr.grnet.aquarium.message.avro.gen.{IMEventMsg, ResourceEventMsg}
 
 /**
@@ -92,5 +92,4 @@ case object UserActorRole
                           classOf[GetUserBalanceRequest],
                           classOf[GetUserBillRequest],
                           classOf[GetUserStateRequest]),
-                      Set(classOf[InitializeUserActorState],
-                          classOf[AquariumPropertiesLoaded]))
+                      Set(classOf[AquariumPropertiesLoaded]))
diff --git a/src/main/scala/gr/grnet/aquarium/actor/message/config/InitializeUserActorState.scala b/src/main/scala/gr/grnet/aquarium/actor/message/config/InitializeUserActorState.scala
deleted file mode 100644 (file)
index 69c0400..0000000
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- *   1. Redistributions of source code must retain the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer in the documentation and/or other materials
- *      provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.actor.message.config
-
-import gr.grnet.aquarium.actor.message.ActorMessage
-import gr.grnet.aquarium.util.shortClassNameOf
-import gr.grnet.aquarium.util.date.MutableDateCalc
-
-
-/**
- * This is sent to a [[gr.grnet.aquarium.actor.service.user.UserActor]] in order to initialize its internal state.
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>
- */
-
-case class InitializeUserActorState(userID: String, referenceTimeMillis: Long)
-extends ActorMessage
-   with ActorConfigurationMessage {
-
-  override def toString = {
-    "%s(%s, %s)".format(shortClassNameOf(this), userID, new MutableDateCalc(referenceTimeMillis).toString)
-  }
-}
index e84402b..7d339a4 100644 (file)
@@ -50,7 +50,6 @@ import gr.grnet.aquarium.actor.message.GetUserWalletRequest
 import gr.grnet.aquarium.actor.message.GetUserWalletResponse
 import gr.grnet.aquarium.actor.message.GetUserWalletResponseData
 import gr.grnet.aquarium.actor.message.config.AquariumPropertiesLoaded
-import gr.grnet.aquarium.actor.message.config.InitializeUserActorState
 import gr.grnet.aquarium.charging.bill.AbstractBillEntry
 import gr.grnet.aquarium.charging.state.{UserStateModel, UserAgreementHistoryModel, UserStateBootstrap}
 import gr.grnet.aquarium.computation.BillingMonthInfo
@@ -67,6 +66,9 @@ import gr.grnet.aquarium.policy.{ResourceType, PolicyModel}
  */
 
 class UserActor extends ReflectiveRoleableActor {
+  private[this] var _isFirstMessage = true
+  private[this] var _rcMsgCount = 0
+  private[this] var _imMsgCount = 0
   private[this] var _userID: String = "<?>"
   private[this] var _userState: UserStateModel = _
   private[this] var _userCreationIMEvent: IMEventMsg = _
@@ -172,18 +174,13 @@ class UserActor extends ReflectiveRoleableActor {
    * Creates the initial state that is related to IMEvents.
    */
   private[this] def initializeStateOfIMEvents(): Unit = {
-    // NOTE: this._userID is already set up by onInitializeUserActorState()
+    // NOTE: this._userID is already set up our caller
     aquarium.imEventStore.foreachIMEventInOccurrenceOrder(this._userID) { imEvent ⇒
       DEBUG("Replaying %s", imEvent)
 
       updateAgreementHistoryFrom(imEvent)
       updateLatestIMEventIDFrom(imEvent)
     }
-
-    if(haveAgreements) {
-      DEBUG("Initial agreement history %s", this._userAgreementHistoryModel.toJsonString)
-      logSeparator()
-    }
   }
 
   /**
@@ -220,14 +217,14 @@ class UserActor extends ReflectiveRoleableActor {
     }
   }
 
-  private[this] def initializeStateOfResourceEvents(event: InitializeUserActorState): Unit = {
+  private[this] def initializeStateOfResourceEvents(): Unit = {
     if(!this.haveAgreements) {
-      DEBUG("Cannot initializeResourceEventsState() from %s. There are no agreements", event)
+      DEBUG("No agreements to initialize resources state")
       return
     }
 
     if(!this.haveUserCreationIMEvent) {
-      DEBUG("Cannot initializeResourceEventsState() from %s. I never got a CREATE IMEvent", event)
+      DEBUG("No CREATE IMEvent to initialize resources state")
       return
     }
 
@@ -240,12 +237,16 @@ class UserActor extends ReflectiveRoleableActor {
     }
   }
 
-  def onInitializeUserActorState(event: InitializeUserActorState): Unit = {
-    this._userID = event.userID
-    DEBUG("Got %s", event)
+  /**
+   * Initializes the actor state from DB.
+   */
+  def initializeUserActorState(userID: String): Unit = {
+    this._userID = userID
+
+    DEBUG("Initializing user actor state")
 
     initializeStateOfIMEvents()
-    initializeStateOfResourceEvents(event)
+    initializeStateOfResourceEvents()
   }
 
   /**
@@ -253,26 +254,21 @@ class UserActor extends ReflectiveRoleableActor {
    * When this method is called, we assume that all proper checks have been made and it
    * is OK to proceed with the event processing.
    */
-  def onIMEventMsg(imEvent: IMEventMsg): Unit = {
+  def onIMEventMsg(imEvent: IMEventMsg) {
+    if(this._isFirstMessage) {
+      initializeUserActorState(imEvent.getUserID)
+      // we ignore this event, since it is already saved in the store and all messages in
+      // the store have been consulted by initializeUserActorState()
+      this._isFirstMessage = false
+      return
+    }
+
     val hadUserCreationIMEvent = haveUserCreationIMEvent
 
     if(!haveAgreements) {
-      // This IMEvent has arrived after any ResourceEvents
-      INFO("Arrived after any ResourceEvent: %s", AvroHelpers.jsonStringOfSpecificRecord(imEvent))
       initializeStateOfIMEvents()
     }
     else {
-      if(this._latestIMEventOriginalID == imEvent.getOriginalID) {
-        // This happens when the actor is brought to life, then immediately initialized, and then
-        // sent the first IM event. But from the initialization procedure, this IM event will have
-        // already been loaded from DB!
-        INFO("Ignoring first %s", AvroHelpers.jsonStringOfSpecificRecord(imEvent))
-        logSeparator()
-
-        //this._latestIMEventID = imEvent.id
-        return
-      }
-
       updateAgreementHistoryFrom(imEvent)
       updateLatestIMEventIDFrom(imEvent)
     }
@@ -286,7 +282,15 @@ class UserActor extends ReflectiveRoleableActor {
     logSeparator()
   }
 
-  def onResourceEventMsg(rcEvent: ResourceEventMsg): Unit = {
+  def onResourceEventMsg(rcEvent: ResourceEventMsg) {
+    if(this._isFirstMessage) {
+      initializeUserActorState(rcEvent.getUserID)
+      // we ignore this event, since it is already saved in the store and all messages in
+      // the store have been consulted by initializeUserActorState()
+      this._isFirstMessage = false
+      return
+    }
+
     if(!shouldProcessResourceEvents) {
       // This means the user has not been created (at least, as far as Aquarium is concerned).
       // So, we do not process any resource event
@@ -296,17 +300,6 @@ class UserActor extends ReflectiveRoleableActor {
       return
     }
 
-    // Since the latest resource event per resource is recorded in the user state,
-    // we do not need to query the store. Just query the in-memory state.
-    // Note: This is a similar situation with the first IMEvent received right after the user
-    //       actor is created.
-    if(this._latestResourceEventOriginalID == rcEvent.getOriginalID) {
-      INFO("Ignoring first %s", AvroHelpers.jsonStringOfSpecificRecord(rcEvent))
-      logSeparator()
-
-      return
-    }
-
     val now = TimeHelpers.nowMillis()
     // TODO: Review this and its usage in user state.
     // TODO: The assumption is that the resource set increases all the time,
index f516e57..d570eef 100644 (file)
@@ -49,7 +49,7 @@ import gr.grnet.aquarium.message.avro.ModelFactory
 
 final case class UserAgreementHistoryModel(
     msg: UserAgreementHistoryMsg
-) extends JsonSupport {
+) {
 
   private var _userAgreementModels: immutable.SortedSet[UserAgreementModel] = {
     var userAgreementModels = immutable.SortedSet[UserAgreementModel]()
@@ -108,6 +108,8 @@ final case class UserAgreementHistoryModel(
     _userAgreementModels.lastOption
   }
 
+  def toJsonString = msg.toString
+
 //  def agreementInEffectWhen(whenMillis: Long): Option[UserAgreementModel] = {
 //    agreements.to(
 //      UserAgreementModel("", None, whenMillis, Long.MaxValue, "", PolicyDefinedFullPriceTableRef())
index 52aecc3..3eb7610 100644 (file)
@@ -44,8 +44,6 @@ import java.util.concurrent.atomic.AtomicBoolean
 import com.google.common.cache.{CacheStats, RemovalNotification, RemovalListener, CacheBuilder, Cache}
 import com.ckkloverdos.props.{Props ⇒ KKProps}
 import gr.grnet.aquarium.actor.service.user.UserActor
-import gr.grnet.aquarium.service.event.AquariumCreatedEvent
-import gr.grnet.aquarium.actor.message.config.InitializeUserActorState
 import gr.grnet.aquarium.util.date.TimeHelpers
 import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Callable}
 import akka.dispatch.{Await, Future}
@@ -233,9 +231,6 @@ final class AkkaService extends AquariumAwareSkeleton with Configurable with Lif
         // Cache it for subsequent calls
         _userActorCache.put(userID, actorRef)
 
-        // Send the initialization message
-        actorRef ! InitializeUserActorState(userID, TimeHelpers.nowMillis())
-
         actorRef
       }
     })