WIP: ResourceEvent-related refactorings
[aquarium] / src / main / scala / gr / grnet / aquarium / user / UserStateComputations.scala
index 5740909..c015377 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2011 GRNET S.A. All rights reserved.
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or
  * without modification, are permitted provided that the following
 package gr.grnet.aquarium.user
 
 
-import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
-import gr.grnet.aquarium.util.{ContextualLogger, Loggable, justForSure, failedForSure}
-import gr.grnet.aquarium.logic.accounting.algorithm.SimpleCostPolicyAlgorithmCompiler
-import gr.grnet.aquarium.logic.events.{NewWalletEntry, ResourceEvent}
+import scala.collection.mutable
+import gr.grnet.aquarium.util.{ContextualLogger, Loggable}
 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
-import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLCostPolicy, DSLResourcesMap, DSLPolicy}
-import gr.grnet.aquarium.store.{RecordID, StoreProvider, PolicyStore, UserStateStore, ResourceEventStore}
-import gr.grnet.aquarium.logic.accounting.{Chargeslot, Accounting}
-import collection.mutable.{Buffer, ListBuffer}
+import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLResourcesMap}
+import gr.grnet.aquarium.store.{StoreProvider, PolicyStore}
+import gr.grnet.aquarium.logic.accounting.Accounting
+import gr.grnet.aquarium.logic.accounting.algorithm.CostPolicyAlgorithmCompiler
+import gr.grnet.aquarium.event.NewWalletEntry
+import gr.grnet.aquarium.event.resource.ResourceEventModel
+import gr.grnet.aquarium.event.im.{IMEventModel, StdIMEvent}
+import gr.grnet.aquarium.{AquariumInternalError, AquariumException}
 
 /**
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 class UserStateComputations extends Loggable {
-  def createInitialUserState(userId: String,
+  def createInitialUserState(imEvent: IMEventModel, credits: Double, agreementName: String) = {
+    if(!imEvent.isCreateUser) {
+      throw new AquariumInternalError(
+        "Got '%s' instead of '%s'".format(imEvent.eventType, IMEventModel.EventTypeNames.create))
+    }
+
+    val userID = imEvent.userID
+    val userCreationMillis = imEvent.occurredMillis
+    val now = TimeHelpers.nowMillis()
+
+    UserState(
+      true,
+      userID,
+      userCreationMillis,
+      0L,
+      false,
+      null,
+      ImplicitlyIssuedResourceEventsSnapshot(List(), now),
+      Nil,
+      Nil,
+      LatestResourceEventsSnapshot(List(), now),
+      0L,
+      0L,
+      IMStateSnapshot(imEvent, now),
+      CreditSnapshot(credits, now),
+      AgreementSnapshot(List(Agreement(agreementName, userCreationMillis)), now),
+      OwnedResourcesSnapshot(Nil, now),
+      Nil,
+      InitialUserStateSetup
+    )
+  }
+
+  def createInitialUserState(userID: String,
                              userCreationMillis: Long,
                              isActive: Boolean,
                              credits: Double,
@@ -60,7 +94,8 @@ class UserStateComputations extends Loggable {
     val now = userCreationMillis
 
     UserState(
-      userId,
+      true,
+      userID,
       userCreationMillis,
       0L,
       false,
@@ -71,26 +106,29 @@ class UserStateComputations extends Loggable {
       LatestResourceEventsSnapshot(List(), now),
       0L,
       0L,
-      ActiveStateSnapshot(isActive, now),
+      IMStateSnapshot(
+        StdIMEvent(
+          "",
+          now, now, userID,
+          "",
+          isActive, roleNames.headOption.getOrElse("default"),
+          "1.0",
+          IMEventModel.EventTypeNames.create, Map()),
+        now
+      ),
       CreditSnapshot(credits, now),
       AgreementSnapshot(List(Agreement(agreementName, userCreationMillis)), now),
-      RolesSnapshot(roleNames, now),
       OwnedResourcesSnapshot(Nil, now),
       Nil,
-      UserStateChangeReasonCodes.InitialCalculationCode,
-      InitialUserStateCalculation
+      InitialUserStateSetup
     )
   }
 
   def createInitialUserStateFrom(us: UserState): UserState = {
     createInitialUserState(
-      us.userId,
-      us.userCreationMillis,
-      us.activeStateSnapshot.isActive,
+      us.imStateSnapshot.imEvent,
       us.creditsSnapshot.creditAmount,
-      us.rolesSnapshot.roles,
-      us.agreementsSnapshot.agreementsByTimeslot.valuesIterator.toList.last
-    )
+      us.agreementsSnapshot.agreementsByTimeslot.valuesIterator.toList.last)
   }
 
   def findUserStateAtEndOfBillingMonth(userId: String,
@@ -99,16 +137,17 @@ class UserStateComputations extends Loggable {
                                        currentUserState: UserState,
                                        defaultResourcesMap: DSLResourcesMap,
                                        accounting: Accounting,
+                                       algorithmCompiler: CostPolicyAlgorithmCompiler,
                                        calculationReason: UserStateChangeReason,
-                                       contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[UserState] = {
+                                       clogOpt: Option[ContextualLogger] = None): UserState = {
 
     val clog = ContextualLogger.fromOther(
-      contextualLogger,
+      clogOpt,
       logger,
       "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo)
     clog.begin()
 
-    def doCompute: Maybe[UserState] = {
+    def doCompute: UserState = {
       doFullMonthlyBilling(
         userId,
         billingMonthInfo,
@@ -116,8 +155,9 @@ class UserStateComputations extends Loggable {
         currentUserState,
         defaultResourcesMap,
         accounting,
+        algorithmCompiler,
         calculationReason,
-        Just(clog))
+        Some(clog))
     }
 
     val userStateStore = storeProvider.userStateStore
@@ -132,101 +172,80 @@ class UserStateComputations extends Loggable {
       // If the user did not exist for this billing month, piece of cake
       clog.debug("User did not exist before %s", userCreationDateCalc)
 
-      // NOTE: Reason here will be: InitialUserStateCalculation
+      // NOTE: Reason here will be: InitialUserStateSetup$
       val initialUserState0 = createInitialUserStateFrom(currentUserState)
-      val initialUserStateM = userStateStore.storeUserState2(initialUserState0)
+      val initialUserState1 = userStateStore.insertUserState(initialUserState0)
 
-      clog.debug("Returning ZERO state [_idM=%s] %s".format(initialUserStateM.map(_._id), initialUserStateM))
+      clog.debug("Returning INITIAL state [_id=%s] %s".format(initialUserState1._id, initialUserState1))
       clog.end()
 
-      initialUserStateM
+      initialUserState1
     } else {
       // Ask DB cache for the latest known user state for this billing period
-      val latestUserStateM = userStateStore.findLatestUserStateForEndOfBillingMonth(
+      val latestUserStateOpt = userStateStore.findLatestUserStateForEndOfBillingMonth(
         userId,
         billingMonthInfo.year,
         billingMonthInfo.month)
 
-      latestUserStateM match {
-        case NoVal ⇒
+      latestUserStateOpt match {
+        case None ⇒
           // Not found, must compute
           clog.debug("No user state found from cache, will have to (re)compute")
           val result = doCompute
           clog.end()
           result
-          
-        case failed @ Failed(_, _) ⇒
-          clog.warn("Failure while quering cache for user state: %s", failed)
-          clog.end()
-          failed
 
-        case Just(latestUserState) ⇒
+        case Some(latestUserState) ⇒
           // Found a "latest" user state but need to see if it is indeed the true and one latest.
           // For this reason, we must count the events again.
          val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
-         val actualOOSEventsCounterM = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
+         val actualOOSEventsCounter = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
            userId,
            billingMonthStartMillis,
            billingMonthStopMillis)
 
-         actualOOSEventsCounterM match {
-           case NoVal ⇒
-             val errMsg = "No counter computed for out of sync events. Should at least be zero."
-             clog.warn(errMsg)
-             val result = Failed(new Exception(errMsg))
+         val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
+         counterDiff match {
+           // ZERO, we are OK!
+           case 0 ⇒
+             // NOTE: Keep the caller's calculation reason
+             latestUserState.copyForChangeReason(calculationReason)
+
+           // We had more, so must recompute
+           case n if n > 0 ⇒
+             clog.debug(
+               "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
+             val result = doCompute
              clog.end()
              result
 
-           case failed @ Failed(_, _) ⇒
-             clog.warn("Failure while querying for out of sync events: %s", failed)
-             clog.end()
-             failed
-
-           case Just(actualOOSEventsCounter) ⇒
-             val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
-             counterDiff match {
-               // ZERO, we are OK!
-               case 0 ⇒
-                 // NOTE: Keep the caller's calculation reason
-                 Just(latestUserState.copyForChangeReason(calculationReason))
-
-               // We had more, so must recompute
-               case n if n > 0 ⇒
-                 clog.debug(
-                   "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
-                 val result = doCompute
-                 clog.end()
-                 result
-
-               // We had less????
-               case n if n < 0 ⇒
-                 val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
-                 clog.warn(errMsg)
-                 val result = Failed(new Exception(errMsg))
-                 clog.end()
-                 result
-             }
+           // We had less????
+           case n if n < 0 ⇒
+             val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
+             clog.warn(errMsg)
+             throw new AquariumException(errMsg)
          }
       }
     }
   }
 
   //+ Utility methods
-  def rcDebugInfo(rcEvent: ResourceEvent) = {
+  def rcDebugInfo(rcEvent: ResourceEventModel) = {
     rcEvent.toDebugString(false)
   }
   //- Utility methods
 
   def processResourceEvent(startingUserState: UserState,
                            userStateWorker: UserStateWorker,
-                           currentResourceEvent: ResourceEvent,
+                           currentResourceEvent: ResourceEventModel,
                            policyStore: PolicyStore,
                            stateChangeReason: UserStateChangeReason,
                            billingMonthInfo: BillingMonthInfo,
-                           walletEntriesBuffer: Buffer[NewWalletEntry],
-                           clogM: Maybe[ContextualLogger] = NoVal): UserState = {
+                           walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
+                           algorithmCompiler: CostPolicyAlgorithmCompiler,
+                           clogOpt: Option[ContextualLogger] = None): UserState = {
 
-    val clog = ContextualLogger.fromOther(clogM, logger, "walletEntriesForResourceEvent(%s)", currentResourceEvent.id)
+    val clog = ContextualLogger.fromOther(clogOpt, logger, "walletEntriesForResourceEvent(%s)", currentResourceEvent.id)
 
     var _workingUserState = startingUserState
 
@@ -258,15 +277,15 @@ class UserStateComputations extends Loggable {
           // The resource event is billable
           // Find the previous event.
           // This is (potentially) needed to calculate new credit amount and new resource instance amount
-          val previousResourceEventM = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
-          clog.debug("PreviousM %s", previousResourceEventM.map(rcDebugInfo(_)))
+          val previousResourceEventOpt = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
+          clog.debug("PreviousM %s", previousResourceEventOpt.map(rcDebugInfo(_)))
 
-          val havePreviousResourceEvent = previousResourceEventM.isJust
+          val havePreviousResourceEvent = previousResourceEventOpt.isDefined
           val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation
           if(needPreviousResourceEvent && !havePreviousResourceEvent) {
             // This must be the first resource event of its kind, ever.
             // TODO: We should normally check the DB to verify the claim (?)
-            clog.info("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
+            clog.debug("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
             userStateWorker.updateIgnored(currentResourceEvent)
           } else {
             val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
@@ -283,8 +302,8 @@ class UserStateComputations extends Loggable {
             val alltimeAgreements = _workingUserState.agreementsSnapshot.agreementsByTimeslot
 
             //              clog.debug("Computing full chargeslots")
-            val fullChargeslotsM = accounting.computeFullChargeslots(
-              previousResourceEventM,
+            val (referenceTimeslot, fullChargeslots) = accounting.computeFullChargeslots(
+              previousResourceEventOpt,
               currentResourceEvent,
               oldCredits,
               oldAmount,
@@ -292,67 +311,62 @@ class UserStateComputations extends Loggable {
               dslResource,
               resourcesMap,
               alltimeAgreements,
-              SimpleCostPolicyAlgorithmCompiler,
+              algorithmCompiler,
               policyStore,
-              Just(clog)
+              Some(clog)
             )
 
             // We have the chargeslots, let's associate them with the current event
-            fullChargeslotsM match {
-              case Just((referenceTimeslot, fullChargeslots)) ⇒
-                if(fullChargeslots.length == 0) {
-                  // At least one chargeslot is required.
-                  throw new Exception("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
-                }
-                clog.debugSeq("fullChargeslots", fullChargeslots, 0)
-
-                // C. Compute new credit amount (based on the charge slots)
-                val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
-                val newCredits = oldCredits - newCreditsDiff
-
-                if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
-                  val newWalletEntry = NewWalletEntry(
-                    userStateWorker.userId,
-                    newCreditsDiff,
-                    oldCredits,
-                    newCredits,
-                    TimeHelpers.nowMillis,
-                    referenceTimeslot,
-                    billingMonthInfo.year,
-                    billingMonthInfo.month,
-                    if(havePreviousResourceEvent)
-                      List(currentResourceEvent, justForSure(previousResourceEventM).get)
-                    else
-                      List(currentResourceEvent),
-                    fullChargeslots,
-                    dslResource
-                  )
-                  clog.debug("New %s", newWalletEntry)
-
-                  walletEntriesBuffer += newWalletEntry
-                } else {
-                  clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
-                }
-
-                _workingUserState = _workingUserState.copy(
-                  creditsSnapshot = CreditSnapshot(newCredits, TimeHelpers.nowMillis),
-                  stateChangeCounter = _workingUserState.stateChangeCounter + 1,
-                  totalEventsProcessedCounter = _workingUserState.totalEventsProcessedCounter + 1
-                )
-
-              case NoVal ⇒
-                // At least one chargeslot is required.
-                throw new Exception("No chargeslots computed")
-
-              case failed@Failed(e, m) ⇒
-                throw new Exception(m, e)
+            if(fullChargeslots.length == 0) {
+              // At least one chargeslot is required.
+              throw new AquariumInternalError("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
             }
+            clog.debugSeq("fullChargeslots", fullChargeslots, 0)
+
+            // C. Compute new credit amount (based on the charge slots)
+            val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
+            val newCredits = oldCredits - newCreditsDiff
+
+            if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
+              val newWalletEntry = NewWalletEntry(
+                userStateWorker.userId,
+                newCreditsDiff,
+                oldCredits,
+                newCredits,
+                TimeHelpers.nowMillis(),
+                referenceTimeslot,
+                billingMonthInfo.year,
+                billingMonthInfo.month,
+                if(havePreviousResourceEvent)
+                  List(currentResourceEvent, previousResourceEventOpt.get)
+                else
+                  List(currentResourceEvent),
+                fullChargeslots,
+                dslResource,
+                currentResourceEvent.isSynthetic
+              )
+              clog.debug("New %s", newWalletEntry)
+
+              walletEntriesBuffer += newWalletEntry
+            } else {
+              clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
+            }
+
+            _workingUserState = _workingUserState.copy(
+              creditsSnapshot = CreditSnapshot(newCredits, TimeHelpers.nowMillis()),
+              stateChangeCounter = _workingUserState.stateChangeCounter + 1,
+              totalEventsProcessedCounter = _workingUserState.totalEventsProcessedCounter + 1
+            )
           }
         }
 
         // After processing, all events billable or not update the previous state
         userStateWorker.updatePrevious(currentResourceEvent)
 
+        _workingUserState = _workingUserState.copy(
+          latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(TimeHelpers.nowMillis())
+        )
+
       // We do not have a resource (and thus, no cost policy)
       case None ⇒
         // Now, this is a matter of politics: what do we do if no policy was found?
@@ -364,18 +378,19 @@ class UserStateComputations extends Loggable {
     _workingUserState
   }
 
-  def processResourceEvents(resourceEvents: Traversable[ResourceEvent],
+  def processResourceEvents(resourceEvents: Traversable[ResourceEventModel],
                             startingUserState: UserState,
                             userStateWorker: UserStateWorker,
                             policyStore: PolicyStore,
                             stateChangeReason: UserStateChangeReason,
                             billingMonthInfo: BillingMonthInfo,
-                            walletEntriesBuffer: Buffer[NewWalletEntry],
-                            clogM: Maybe[ContextualLogger] = NoVal): UserState = {
+                            walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
+                            algorithmCompiler: CostPolicyAlgorithmCompiler,
+                            clogOpt: Option[ContextualLogger] = None): UserState = {
 
     var _workingUserState = startingUserState
 
-    for(currentResourceEvent <- resourceEvents) {
+    for(currentResourceEvent ← resourceEvents) {
 
       _workingUserState = processResourceEvent(
         _workingUserState,
@@ -385,7 +400,8 @@ class UserStateComputations extends Loggable {
         stateChangeReason,
         billingMonthInfo,
         walletEntriesBuffer,
-        clogM
+        algorithmCompiler,
+        clogOpt
       )
     }
 
@@ -399,35 +415,32 @@ class UserStateComputations extends Loggable {
                            currentUserState: UserState,
                            defaultResourcesMap: DSLResourcesMap,
                            accounting: Accounting,
+                           algorithmCompiler: CostPolicyAlgorithmCompiler,
                            calculationReason: UserStateChangeReason = NoSpecificChangeReason,
-                           contextualLogger: Maybe[ContextualLogger] = NoVal): Maybe[UserState] = Maybe {
+                           clogOpt: Option[ContextualLogger] = None): UserState = {
 
 
     val clog = ContextualLogger.fromOther(
-      contextualLogger,
+      clogOpt,
       logger,
       "doFullMonthlyBilling(%s)", billingMonthInfo)
     clog.begin()
 
-    val previousBillingMonthUserStateM = findUserStateAtEndOfBillingMonth(
+    val clogSome = Some(clog)
+
+    val previousBillingMonthUserState = findUserStateAtEndOfBillingMonth(
       userId,
       billingMonthInfo.previousMonth,
       storeProvider,
       currentUserState,
       defaultResourcesMap,
       accounting,
+      algorithmCompiler,
       calculationReason.forPreviousBillingMonth,
-      Just(clog)
+      clogSome
     )
-    
-    if(previousBillingMonthUserStateM.isNoVal) {
-      throw new Exception("Could not calculate initial user state for billing %s".format(billingMonthInfo))
-    }
-    if(previousBillingMonthUserStateM.isFailed) {
-      throw failedForSure(previousBillingMonthUserStateM).exception
-    }
 
-    val startingUserState = justForSure(previousBillingMonthUserStateM).get
+    val startingUserState = previousBillingMonthUserState
 
     val userStateStore = storeProvider.userStateStore
     val resourceEventStore = storeProvider.resourceEventStore
@@ -441,22 +454,16 @@ class UserStateComputations extends Loggable {
     // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
     var _workingUserState = startingUserState.copyForChangeReason(calculationReason)
 
-    val userStateWorker = UserStateWorker.fromUserState(startingUserState, accounting, defaultResourcesMap)
+    val userStateWorker = UserStateWorker.fromUserState(_workingUserState, accounting, defaultResourcesMap)
 
     userStateWorker.debugTheMaps(clog)(rcDebugInfo)
 
-    // Find the actual resource events from DB
+    // First, find and process the actual resource events from DB
     val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod(
       userId,
       billingMonthStartMillis,
       billingMonthEndMillis)
 
-    if(allResourceEventsForMonth.size > 0) {
-      clog.debug("Found %s resource events, starting processing...", allResourceEventsForMonth.size)
-    } else {
-      clog.debug("Not found any resource events")
-    }
-
     val newWalletEntries = scala.collection.mutable.ListBuffer[NewWalletEntry]()
 
     _workingUserState = processResourceEvents(
@@ -467,13 +474,48 @@ class UserStateComputations extends Loggable {
       calculationReason,
       billingMonthInfo,
       newWalletEntries,
-      Just(clog)
+      algorithmCompiler,
+      clogSome
+    )
+
+    // Second, for the remaining events which must contribute an implicit OFF, we collect those OFFs
+    // ... in order to generate an implicit ON later
+    val (specialEvents, theirImplicitEnds) = userStateWorker.
+      findAndRemoveGeneratorsOfImplicitEndEvents(billingMonthEndMillis)
+    if(specialEvents.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
+      clog.debug("")
+      clog.debug("Process implicitly issued events")
+      clog.debugSeq("specialEvents", specialEvents, 0)
+      clog.debugSeq("theirImplicitEnds", theirImplicitEnds, 0)
+    }
+
+    // Now, the previous and implicitly started must be our base for the following computation, so we create an
+    // appropriate worker
+    val specialUserStateWorker = UserStateWorker(
+      userStateWorker.userId,
+      LatestResourceEventsWorker.fromList(specialEvents),
+      ImplicitlyIssuedResourceEventsWorker.Empty,
+      IgnoredFirstResourceEventsWorker.Empty,
+      userStateWorker.accounting,
+      userStateWorker.resourcesMap
+    )
+
+    _workingUserState = processResourceEvents(
+      theirImplicitEnds,
+      _workingUserState,
+      specialUserStateWorker,
+      policyStore,
+      calculationReason,
+      billingMonthInfo,
+      newWalletEntries,
+      algorithmCompiler,
+      clogSome
     )
 
-    val lastUpdateTime = TimeHelpers.nowMillis
+    val lastUpdateTime = TimeHelpers.nowMillis()
 
     _workingUserState = _workingUserState.copy(
-      implicitlyIssuedSnapshot = userStateWorker.implicitlyIssuedResourceEvents.toImmutableSnapshot(lastUpdateTime),
+      implicitlyIssuedSnapshot = userStateWorker.implicitlyIssuedStartEvents.toImmutableSnapshot(lastUpdateTime),
       latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(lastUpdateTime),
       stateChangeCounter = _workingUserState.stateChangeCounter + 1,
       parentUserStateId = startingUserState.idOpt,
@@ -483,16 +525,9 @@ class UserStateComputations extends Loggable {
     clog.debug("calculationReason = %s", calculationReason)
 
     if(calculationReason.shouldStoreUserState) {
-      val storedUserStateM = userStateStore.storeUserState2(_workingUserState)
-      storedUserStateM match {
-        case Just(storedUserState) ⇒
-          clog.info("Saved [_id=%s] %s", storedUserState._id, storedUserState)
-          _workingUserState = storedUserState
-        case NoVal ⇒
-          clog.warn("Could not store %s", _workingUserState)
-        case failed @ Failed(e, m) ⇒
-          clog.error(e, "Could not store %s", _workingUserState)
-      }
+      val storedUserState = userStateStore.insertUserState(_workingUserState)
+      clog.debug("Saved [_id=%s] %s", storedUserState._id, storedUserState)
+      _workingUserState = storedUserState
     }
 
     clog.debug("RETURN %s", _workingUserState)
@@ -509,8 +544,8 @@ class UserStateComputations extends Loggable {
  *          We want these in order to correlate incoming resource events with their previous (in `occurredMillis` time)
  *          ones. Will be updated on processing the next resource event.
  *
- * @param implicitlyIssuedResourceEvents
- *          The implicitly issued resource events (from previous billing period).
+ * @param implicitlyIssuedStartEvents
+ *          The implicitly issued resource events at the beginning of the billing period.
  *
  * @param ignoredFirstResourceEvents
  *          The resource events that were first (and unused) of their kind.
@@ -519,7 +554,7 @@ class UserStateComputations extends Loggable {
  */
 case class UserStateWorker(userId: String,
                            previousResourceEvents: LatestResourceEventsWorker,
-                           implicitlyIssuedResourceEvents: ImplicitlyIssuedResourceEventsWorker,
+                           implicitlyIssuedStartEvents: ImplicitlyIssuedResourceEventsWorker,
                            ignoredFirstResourceEvents: IgnoredFirstResourceEventsWorker,
                            accounting: Accounting,
                            resourcesMap: DSLResourcesMap) {
@@ -536,39 +571,37 @@ case class UserStateWorker(userId: String,
    * @param instanceId
    * @return
    */
-  def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Maybe[ResourceEvent] = {
+  def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Option[ResourceEventModel] = {
     // implicitly issued events are checked first
-    implicitlyIssuedResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match {
-      case just @ Just(_) ⇒
-        just
-      case NoVal ⇒
+    implicitlyIssuedStartEvents.findAndRemoveResourceEvent(resource, instanceId) match {
+      case some @ Some(_) ⇒
+        some
+      case None ⇒
         // explicit previous resource events are checked second
         previousResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match {
-          case just @ Just(_) ⇒
-            just
-          case noValOrFailed ⇒
-            noValOrFailed
+          case some @ Some(_) ⇒
+            some
+          case _ ⇒
+            None
         }
-      case failed ⇒
-        failed
     }
   }
 
-  def updateIgnored(resourceEvent: ResourceEvent): Unit = {
+  def updateIgnored(resourceEvent: ResourceEventModel): Unit = {
     ignoredFirstResourceEvents.updateResourceEvent(resourceEvent)
   }
 
-  def updatePrevious(resourceEvent: ResourceEvent): Unit = {
+  def updatePrevious(resourceEvent: ResourceEventModel): Unit = {
     previousResourceEvents.updateResourceEvent(resourceEvent)
   }
 
-  def debugTheMaps(clog: ContextualLogger)(rcDebugInfo: ResourceEvent ⇒ String): Unit = {
+  def debugTheMaps(clog: ContextualLogger)(rcDebugInfo: ResourceEventModel ⇒ String): Unit = {
     if(previousResourceEvents.size > 0) {
-      val map = previousResourceEvents.resourceEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
+      val map = previousResourceEvents.latestEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
       clog.debugMap("previousResourceEvents", map, 0)
     }
-    if(implicitlyIssuedResourceEvents.size > 0) {
-      val map = implicitlyIssuedResourceEvents.implicitlyIssuedEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
+    if(implicitlyIssuedStartEvents.size > 0) {
+      val map = implicitlyIssuedStartEvents.implicitlyIssuedEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
       clog.debugMap("implicitlyTerminatedResourceEvents", map, 0)
     }
     if(ignoredFirstResourceEvents.size > 0) {
@@ -577,12 +610,62 @@ case class UserStateWorker(userId: String,
     }
   }
 
+//  private[this]
+//  def allPreviousAndAllImplicitlyStarted: List[ResourceEvent] = {
+//    val buffer: FullMutableResourceTypeMap = scala.collection.mutable.Map[FullResourceType, ResourceEvent]()
+//
+//    buffer ++= implicitlyIssuedStartEvents.implicitlyIssuedEventsMap
+//    buffer ++= previousResourceEvents.latestEventsMap
+//
+//    buffer.valuesIterator.toList
+//  }
+
+  /**
+   * Find those events from `implicitlyIssuedStartEvents` and `previousResourceEvents` that will generate implicit
+   * end events along with those implicitly issued events. Before returning, remove the events that generated the
+   * implicit ends from the internal state of this instance.
+   *
+   * @see [[gr.grnet.aquarium.logic.accounting.dsl.DSLCostPolicy]]
+   */
+  def findAndRemoveGeneratorsOfImplicitEndEvents(newOccuredMillis: Long
+                                                ): (List[ResourceEventModel], List[ResourceEventModel]) = {
+    val buffer = mutable.ListBuffer[(ResourceEventModel, ResourceEventModel)]()
+    val checkSet = mutable.Set[ResourceEventModel]()
+
+    def doItFor(map: ResourceEventModel.FullMutableResourceTypeMap): Unit = {
+      val resourceEvents = map.valuesIterator
+      for {
+        resourceEvent ← resourceEvents
+        dslResource   ← resourcesMap.findResource(resourceEvent.safeResource)
+        costPolicy    =  dslResource.costPolicy
+      } {
+        if(costPolicy.supportsImplicitEvents) {
+          if(costPolicy.mustConstructImplicitEndEventFor(resourceEvent)) {
+            val implicitEnd = costPolicy.constructImplicitEndEventFor(resourceEvent, newOccuredMillis)
+
+            if(!checkSet.contains(resourceEvent)) {
+              checkSet.add(resourceEvent)
+              buffer append ((resourceEvent, implicitEnd))
+            }
+
+            // remove it anyway
+            map.remove((resourceEvent.safeResource, resourceEvent.safeInstanceId))
+          }
+        }
+      }
+    }
+
+    doItFor(previousResourceEvents.latestEventsMap)                // we give priority for previous
+    doItFor(implicitlyIssuedStartEvents.implicitlyIssuedEventsMap) // ... over implicitly issued...
+
+    (buffer.view.map(_._1).toList, buffer.view.map(_._2).toList)
+  }
 }
 
 object UserStateWorker {
   def fromUserState(userState: UserState, accounting: Accounting, resourcesMap: DSLResourcesMap): UserStateWorker = {
     UserStateWorker(
-      userState.userId,
+      userState.userID,
       userState.latestResourceEventsSnapshot.toMutableWorker,
       userState.implicitlyIssuedSnapshot.toMutableWorker,
       IgnoredFirstResourceEventsWorker.Empty,