WIP: ResourceEvent-related refactorings
[aquarium] / src / main / scala / gr / grnet / aquarium / user / UserStateComputations.scala
index 4a5bebf..c015377 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright 2011 GRNET S.A. All rights reserved.
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or
  * without modification, are permitted provided that the following
 
 package gr.grnet.aquarium.user
 
-import scala.collection.mutable
 
-import gr.grnet.aquarium.store.ResourceEventStore
-import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
+import scala.collection.mutable
+import gr.grnet.aquarium.util.{ContextualLogger, Loggable}
+import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
+import gr.grnet.aquarium.logic.accounting.dsl.{DSLAgreement, DSLResourcesMap}
+import gr.grnet.aquarium.store.{StoreProvider, PolicyStore}
 import gr.grnet.aquarium.logic.accounting.Accounting
-import gr.grnet.aquarium.util.date.{TimeHelpers, DateCalculator}
-import gr.grnet.aquarium.logic.accounting.dsl.{DSLResourcesMap, DSLCostPolicy, DSLPolicy, DSLAgreement}
-import gr.grnet.aquarium.util.Loggable
-import gr.grnet.aquarium.logic.events.{WalletEntry, ResourceEvent}
-
-sealed abstract class CalculationType(_name: String) {
-  def name = _name
-}
-
-/**
- * Normal calculations that are part of the bill generation procedure
- */
-case object PeriodicCalculation extends CalculationType("periodic")
-
-/**
- * Adhoc calculations, e.g. when computing the state in realtime.
- */
-case object AdhocCalculation extends CalculationType("adhoc")
-
-trait UserPolicyFinder {
-  def findUserPolicyAt(userId: String, whenMillis: Long): DSLPolicy
-}
-
-trait FullStateFinder {
-  def findFullState(userId: String, whenMillis: Long): Any
-}
-
-trait UserStateCache {
-  def findUserStateAtEndOfPeriod(userId: String, year: Int, month: Int): Maybe[UserState]
-
-  /**
-   * Find the most up-to-date user state for the particular billing period.
-   */
-  def findLatestUserStateForEndOfBillingMonth(userId: String, yearOfBillingMonth: Int, billingMonth: Int): Maybe[UserState]
-}
+import gr.grnet.aquarium.logic.accounting.algorithm.CostPolicyAlgorithmCompiler
+import gr.grnet.aquarium.event.NewWalletEntry
+import gr.grnet.aquarium.event.resource.ResourceEventModel
+import gr.grnet.aquarium.event.im.{IMEventModel, StdIMEvent}
+import gr.grnet.aquarium.{AquariumInternalError, AquariumException}
 
 /**
- * Use this to keep track of implicit OFFs at the end of the billing period.
- *
- * The use case is this: A VM may have been started (ON state) before the end of the billing period
- * and ended (OFF state) after the beginning of the next billing period. In order to bill this, we must assume
- * an implicit OFF even right at the end of the billing period and an implicit ON event with the beginning of the
- * next billing period.
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>
- *
- * @param onEvents The `ON` events that need to be implicitly terminated.
  */
-case class ImplicitOffEvents(onEvents: List[ResourceEvent])
+class UserStateComputations extends Loggable {
+  def createInitialUserState(imEvent: IMEventModel, credits: Double, agreementName: String) = {
+    if(!imEvent.isCreateUser) {
+      throw new AquariumInternalError(
+        "Got '%s' instead of '%s'".format(imEvent.eventType, IMEventModel.EventTypeNames.create))
+    }
 
-case class OutOfSyncWalletEntries(entries: List[WalletEntry])
+    val userID = imEvent.userID
+    val userCreationMillis = imEvent.occurredMillis
+    val now = TimeHelpers.nowMillis()
 
-/**
- * Full user state at the end of a billing month.
- *
- * @param userState
- * @param implicitOffs
- */
-case class EndOfBillingState(userState: UserState, implicitOffs: ImplicitOffEvents, outOfSyncWalletEntries: OutOfSyncWalletEntries)
+    UserState(
+      true,
+      userID,
+      userCreationMillis,
+      0L,
+      false,
+      null,
+      ImplicitlyIssuedResourceEventsSnapshot(List(), now),
+      Nil,
+      Nil,
+      LatestResourceEventsSnapshot(List(), now),
+      0L,
+      0L,
+      IMStateSnapshot(imEvent, now),
+      CreditSnapshot(credits, now),
+      AgreementSnapshot(List(Agreement(agreementName, userCreationMillis)), now),
+      OwnedResourcesSnapshot(Nil, now),
+      Nil,
+      InitialUserStateSetup
+    )
+  }
+
+  def createInitialUserState(userID: String,
+                             userCreationMillis: Long,
+                             isActive: Boolean,
+                             credits: Double,
+                             roleNames: List[String] = List(),
+                             agreementName: String = DSLAgreement.DefaultAgreementName) = {
+    val now = userCreationMillis
 
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>
- */
-class UserStateComputations extends Loggable {
-  def createFirstUserState(userId: String, agreementName: String = "default") = {
-    val now = 0L
     UserState(
-      userId,
-      now,
+      true,
+      userID,
+      userCreationMillis,
       0L,
       false,
       null,
+      ImplicitlyIssuedResourceEventsSnapshot(List(), now),
+      Nil,
+      Nil,
+      LatestResourceEventsSnapshot(List(), now),
       0L,
-      ActiveSuspendedSnapshot(false, now),
-      CreditSnapshot(0, now),
-      AgreementSnapshot(Agreement(agreementName, now, -1) :: Nil, now),
-      RolesSnapshot(List(), now),
-      OwnedResourcesSnapshot(List(), now)
+      0L,
+      IMStateSnapshot(
+        StdIMEvent(
+          "",
+          now, now, userID,
+          "",
+          isActive, roleNames.headOption.getOrElse("default"),
+          "1.0",
+          IMEventModel.EventTypeNames.create, Map()),
+        now
+      ),
+      CreditSnapshot(credits, now),
+      AgreementSnapshot(List(Agreement(agreementName, userCreationMillis)), now),
+      OwnedResourcesSnapshot(Nil, now),
+      Nil,
+      InitialUserStateSetup
     )
   }
 
-  def createFirstUserState(userId: String, agreementName: String, resourcesMap: DSLResourcesMap) = {
-      val now = 0L
-      UserState(
+  def createInitialUserStateFrom(us: UserState): UserState = {
+    createInitialUserState(
+      us.imStateSnapshot.imEvent,
+      us.creditsSnapshot.creditAmount,
+      us.agreementsSnapshot.agreementsByTimeslot.valuesIterator.toList.last)
+  }
+
+  def findUserStateAtEndOfBillingMonth(userId: String,
+                                       billingMonthInfo: BillingMonthInfo,
+                                       storeProvider: StoreProvider,
+                                       currentUserState: UserState,
+                                       defaultResourcesMap: DSLResourcesMap,
+                                       accounting: Accounting,
+                                       algorithmCompiler: CostPolicyAlgorithmCompiler,
+                                       calculationReason: UserStateChangeReason,
+                                       clogOpt: Option[ContextualLogger] = None): UserState = {
+
+    val clog = ContextualLogger.fromOther(
+      clogOpt,
+      logger,
+      "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo)
+    clog.begin()
+
+    def doCompute: UserState = {
+      doFullMonthlyBilling(
+        userId,
+        billingMonthInfo,
+        storeProvider,
+        currentUserState,
+        defaultResourcesMap,
+        accounting,
+        algorithmCompiler,
+        calculationReason,
+        Some(clog))
+    }
+
+    val userStateStore = storeProvider.userStateStore
+    val resourceEventStore = storeProvider.resourceEventStore
+
+    val userCreationMillis = currentUserState.userCreationMillis
+    val userCreationDateCalc = new MutableDateCalc(userCreationMillis)
+    val billingMonthStartMillis = billingMonthInfo.startMillis
+    val billingMonthStopMillis  = billingMonthInfo.stopMillis
+
+    if(billingMonthStopMillis < userCreationMillis) {
+      // If the user did not exist for this billing month, piece of cake
+      clog.debug("User did not exist before %s", userCreationDateCalc)
+
+      // NOTE: Reason here will be: InitialUserStateSetup$
+      val initialUserState0 = createInitialUserStateFrom(currentUserState)
+      val initialUserState1 = userStateStore.insertUserState(initialUserState0)
+
+      clog.debug("Returning INITIAL state [_id=%s] %s".format(initialUserState1._id, initialUserState1))
+      clog.end()
+
+      initialUserState1
+    } else {
+      // Ask DB cache for the latest known user state for this billing period
+      val latestUserStateOpt = userStateStore.findLatestUserStateForEndOfBillingMonth(
         userId,
-        now,
-        0L,
-        false,
-        null,
-        0L,
-        ActiveSuspendedSnapshot(false, now),
-        CreditSnapshot(0, now),
-        AgreementSnapshot(Agreement(agreementName, now, - 1) :: Nil, now),
-        RolesSnapshot(List(), now),
-        OwnedResourcesSnapshot(List(), now)
+        billingMonthInfo.year,
+        billingMonthInfo.month)
+
+      latestUserStateOpt match {
+        case None ⇒
+          // Not found, must compute
+          clog.debug("No user state found from cache, will have to (re)compute")
+          val result = doCompute
+          clog.end()
+          result
+
+        case Some(latestUserState) ⇒
+          // Found a "latest" user state but need to see if it is indeed the true and one latest.
+          // For this reason, we must count the events again.
+         val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter
+         val actualOOSEventsCounter = resourceEventStore.countOutOfSyncEventsForBillingPeriod(
+           userId,
+           billingMonthStartMillis,
+           billingMonthStopMillis)
+
+         val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter
+         counterDiff match {
+           // ZERO, we are OK!
+           case 0 ⇒
+             // NOTE: Keep the caller's calculation reason
+             latestUserState.copyForChangeReason(calculationReason)
+
+           // We had more, so must recompute
+           case n if n > 0 ⇒
+             clog.debug(
+               "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n)
+             val result = doCompute
+             clog.end()
+             result
+
+           // We had less????
+           case n if n < 0 ⇒
+             val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n)
+             clog.warn(errMsg)
+             throw new AquariumException(errMsg)
+         }
+      }
+    }
+  }
+
+  //+ Utility methods
+  def rcDebugInfo(rcEvent: ResourceEventModel) = {
+    rcEvent.toDebugString(false)
+  }
+  //- Utility methods
+
+  def processResourceEvent(startingUserState: UserState,
+                           userStateWorker: UserStateWorker,
+                           currentResourceEvent: ResourceEventModel,
+                           policyStore: PolicyStore,
+                           stateChangeReason: UserStateChangeReason,
+                           billingMonthInfo: BillingMonthInfo,
+                           walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
+                           algorithmCompiler: CostPolicyAlgorithmCompiler,
+                           clogOpt: Option[ContextualLogger] = None): UserState = {
+
+    val clog = ContextualLogger.fromOther(clogOpt, logger, "walletEntriesForResourceEvent(%s)", currentResourceEvent.id)
+
+    var _workingUserState = startingUserState
+
+    val theResource = currentResourceEvent.safeResource
+    val theInstanceId = currentResourceEvent.safeInstanceId
+    val theValue = currentResourceEvent.value
+
+    val accounting = userStateWorker.accounting
+    val resourcesMap = userStateWorker.resourcesMap
+
+    val currentResourceEventDebugInfo = rcDebugInfo(currentResourceEvent)
+    clog.begin(currentResourceEventDebugInfo)
+
+    userStateWorker.debugTheMaps(clog)(rcDebugInfo)
+
+    // Ignore the event if it is not billable (but still record it in the "previous" stuff).
+    // But to make this decision, first we need the resource definition (and its cost policy).
+    val dslResourceOpt = resourcesMap.findResource(theResource)
+    dslResourceOpt match {
+      // We have a resource (and thus a cost policy)
+      case Some(dslResource) ⇒
+        val costPolicy = dslResource.costPolicy
+        clog.debug("Cost policy %s for %s", costPolicy, dslResource)
+        val isBillable = costPolicy.isBillableEventBasedOnValue(theValue)
+        if(!isBillable) {
+          // The resource event is not billable
+          clog.debug("Ignoring not billable event %s", currentResourceEventDebugInfo)
+        } else {
+          // The resource event is billable
+          // Find the previous event.
+          // This is (potentially) needed to calculate new credit amount and new resource instance amount
+          val previousResourceEventOpt = userStateWorker.findAndRemovePreviousResourceEvent(theResource, theInstanceId)
+          clog.debug("PreviousM %s", previousResourceEventOpt.map(rcDebugInfo(_)))
+
+          val havePreviousResourceEvent = previousResourceEventOpt.isDefined
+          val needPreviousResourceEvent = costPolicy.needsPreviousEventForCreditAndAmountCalculation
+          if(needPreviousResourceEvent && !havePreviousResourceEvent) {
+            // This must be the first resource event of its kind, ever.
+            // TODO: We should normally check the DB to verify the claim (?)
+            clog.debug("Ignoring first event of its kind %s", currentResourceEventDebugInfo)
+            userStateWorker.updateIgnored(currentResourceEvent)
+          } else {
+            val defaultInitialAmount = costPolicy.getResourceInstanceInitialAmount
+            val oldAmount = _workingUserState.getResourceInstanceAmount(theResource, theInstanceId, defaultInitialAmount)
+            val oldCredits = _workingUserState.creditsSnapshot.creditAmount
+
+            // A. Compute new resource instance accumulating amount
+            val newAmount = costPolicy.computeNewAccumulatingAmount(oldAmount, theValue)
+
+            clog.debug("theValue = %s, oldAmount = %s, newAmount = %s, oldCredits = %s", theValue, oldAmount, newAmount, oldCredits)
+
+            // B. Compute new wallet entries
+            clog.debug("agreementsSnapshot = %s", _workingUserState.agreementsSnapshot)
+            val alltimeAgreements = _workingUserState.agreementsSnapshot.agreementsByTimeslot
+
+            //              clog.debug("Computing full chargeslots")
+            val (referenceTimeslot, fullChargeslots) = accounting.computeFullChargeslots(
+              previousResourceEventOpt,
+              currentResourceEvent,
+              oldCredits,
+              oldAmount,
+              newAmount,
+              dslResource,
+              resourcesMap,
+              alltimeAgreements,
+              algorithmCompiler,
+              policyStore,
+              Some(clog)
+            )
+
+            // We have the chargeslots, let's associate them with the current event
+            if(fullChargeslots.length == 0) {
+              // At least one chargeslot is required.
+              throw new AquariumInternalError("No chargeslots computed for resource event %s".format(currentResourceEvent.id))
+            }
+            clog.debugSeq("fullChargeslots", fullChargeslots, 0)
+
+            // C. Compute new credit amount (based on the charge slots)
+            val newCreditsDiff = fullChargeslots.map(_.computedCredits.get).sum
+            val newCredits = oldCredits - newCreditsDiff
+
+            if(stateChangeReason.shouldStoreCalculatedWalletEntries) {
+              val newWalletEntry = NewWalletEntry(
+                userStateWorker.userId,
+                newCreditsDiff,
+                oldCredits,
+                newCredits,
+                TimeHelpers.nowMillis(),
+                referenceTimeslot,
+                billingMonthInfo.year,
+                billingMonthInfo.month,
+                if(havePreviousResourceEvent)
+                  List(currentResourceEvent, previousResourceEventOpt.get)
+                else
+                  List(currentResourceEvent),
+                fullChargeslots,
+                dslResource,
+                currentResourceEvent.isSynthetic
+              )
+              clog.debug("New %s", newWalletEntry)
+
+              walletEntriesBuffer += newWalletEntry
+            } else {
+              clog.debug("newCreditsDiff = %s, newCredits = %s", newCreditsDiff, newCredits)
+            }
+
+            _workingUserState = _workingUserState.copy(
+              creditsSnapshot = CreditSnapshot(newCredits, TimeHelpers.nowMillis()),
+              stateChangeCounter = _workingUserState.stateChangeCounter + 1,
+              totalEventsProcessedCounter = _workingUserState.totalEventsProcessedCounter + 1
+            )
+          }
+        }
+
+        // After processing, all events billable or not update the previous state
+        userStateWorker.updatePrevious(currentResourceEvent)
+
+        _workingUserState = _workingUserState.copy(
+          latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(TimeHelpers.nowMillis())
+        )
+
+      // We do not have a resource (and thus, no cost policy)
+      case None ⇒
+        // Now, this is a matter of politics: what do we do if no policy was found?
+        clog.warn("Unknown resource for %s", currentResourceEventDebugInfo)
+    } // dslResourceOpt match
+
+    clog.end(currentResourceEventDebugInfo)
+
+    _workingUserState
+  }
+
+  def processResourceEvents(resourceEvents: Traversable[ResourceEventModel],
+                            startingUserState: UserState,
+                            userStateWorker: UserStateWorker,
+                            policyStore: PolicyStore,
+                            stateChangeReason: UserStateChangeReason,
+                            billingMonthInfo: BillingMonthInfo,
+                            walletEntriesBuffer: mutable.Buffer[NewWalletEntry],
+                            algorithmCompiler: CostPolicyAlgorithmCompiler,
+                            clogOpt: Option[ContextualLogger] = None): UserState = {
+
+    var _workingUserState = startingUserState
+
+    for(currentResourceEvent ← resourceEvents) {
+
+      _workingUserState = processResourceEvent(
+        _workingUserState,
+        userStateWorker,
+        currentResourceEvent,
+        policyStore,
+        stateChangeReason,
+        billingMonthInfo,
+        walletEntriesBuffer,
+        algorithmCompiler,
+        clogOpt
       )
     }
 
-  /**
-   * Get the user state as computed up to (and not including) the start of the new billing period.
-   *
-   * Always compute, taking into account any "out of sync" resource events
-   */
-  def computeUserStateAtEndOfBillingPeriod(billingYear: Int,
-                                             billingMonth: Int,
-                                             knownUserState: UserState,
-                                             accounting: Accounting): Maybe[EndOfBillingState] = {
-
-    val billingDate = new DateCalculator(billingYear, billingMonth, 1)
-    val billingDateMillis = billingDate.toMillis
-
-//    if(billingDateMillis < knownUserState.startDateMillis) {
-//      val userId = knownUserState.userId
-//      val agreementName = knownUserState.agreement match {
-//        case null      ⇒ "default"
-//        case agreement ⇒ agreement.data
-//      }
-//      createFirstUserState(userId, agreementName)
-//    } else {
-      // We really need to compute the user state here
-
-      // get all events that
-      // FIXME: Implement
-    Just(EndOfBillingState(knownUserState, ImplicitOffEvents(Nil), OutOfSyncWalletEntries(Nil)))
-
-//    }
+    _workingUserState
   }
-  
-  def findBillingStateAtEndOfBillingPeriod(yearOfBillingMonth: Int,
-                                           billingMonth: Int,
-                                           userId: String,
-                                           userStateCache: UserStateCache,
-                                           accounting: Accounting): Maybe[EndOfBillingState] = {
-    userStateCache.findLatestUserStateForEndOfBillingMonth(userId, yearOfBillingMonth, billingMonth) match {
-      case Just(userState) ⇒
-      case NoVal ⇒
-      case failed @ Failed(e, m) ⇒
+
+
+  def doFullMonthlyBilling(userId: String,
+                           billingMonthInfo: BillingMonthInfo,
+                           storeProvider: StoreProvider,
+                           currentUserState: UserState,
+                           defaultResourcesMap: DSLResourcesMap,
+                           accounting: Accounting,
+                           algorithmCompiler: CostPolicyAlgorithmCompiler,
+                           calculationReason: UserStateChangeReason = NoSpecificChangeReason,
+                           clogOpt: Option[ContextualLogger] = None): UserState = {
+
+
+    val clog = ContextualLogger.fromOther(
+      clogOpt,
+      logger,
+      "doFullMonthlyBilling(%s)", billingMonthInfo)
+    clog.begin()
+
+    val clogSome = Some(clog)
+
+    val previousBillingMonthUserState = findUserStateAtEndOfBillingMonth(
+      userId,
+      billingMonthInfo.previousMonth,
+      storeProvider,
+      currentUserState,
+      defaultResourcesMap,
+      accounting,
+      algorithmCompiler,
+      calculationReason.forPreviousBillingMonth,
+      clogSome
+    )
+
+    val startingUserState = previousBillingMonthUserState
+
+    val userStateStore = storeProvider.userStateStore
+    val resourceEventStore = storeProvider.resourceEventStore
+    val policyStore = storeProvider.policyStore
+
+    val billingMonthStartMillis = billingMonthInfo.startMillis
+    val billingMonthEndMillis = billingMonthInfo.stopMillis
+
+    // Keep the working (current) user state. This will get updated as we proceed with billing for the month
+    // specified in the parameters.
+    // NOTE: The calculation reason is not the one we get from the previous user state but the one our caller specifies
+    var _workingUserState = startingUserState.copyForChangeReason(calculationReason)
+
+    val userStateWorker = UserStateWorker.fromUserState(_workingUserState, accounting, defaultResourcesMap)
+
+    userStateWorker.debugTheMaps(clog)(rcDebugInfo)
+
+    // First, find and process the actual resource events from DB
+    val allResourceEventsForMonth = resourceEventStore.findAllRelevantResourceEventsForBillingPeriod(
+      userId,
+      billingMonthStartMillis,
+      billingMonthEndMillis)
+
+    val newWalletEntries = scala.collection.mutable.ListBuffer[NewWalletEntry]()
+
+    _workingUserState = processResourceEvents(
+      allResourceEventsForMonth,
+      _workingUserState,
+      userStateWorker,
+      policyStore,
+      calculationReason,
+      billingMonthInfo,
+      newWalletEntries,
+      algorithmCompiler,
+      clogSome
+    )
+
+    // Second, for the remaining events which must contribute an implicit OFF, we collect those OFFs
+    // ... in order to generate an implicit ON later
+    val (specialEvents, theirImplicitEnds) = userStateWorker.
+      findAndRemoveGeneratorsOfImplicitEndEvents(billingMonthEndMillis)
+    if(specialEvents.lengthCompare(1) >= 0 || theirImplicitEnds.lengthCompare(1) >= 0) {
+      clog.debug("")
+      clog.debug("Process implicitly issued events")
+      clog.debugSeq("specialEvents", specialEvents, 0)
+      clog.debugSeq("theirImplicitEnds", theirImplicitEnds, 0)
+    }
+
+    // Now, the previous and implicitly started must be our base for the following computation, so we create an
+    // appropriate worker
+    val specialUserStateWorker = UserStateWorker(
+      userStateWorker.userId,
+      LatestResourceEventsWorker.fromList(specialEvents),
+      ImplicitlyIssuedResourceEventsWorker.Empty,
+      IgnoredFirstResourceEventsWorker.Empty,
+      userStateWorker.accounting,
+      userStateWorker.resourcesMap
+    )
+
+    _workingUserState = processResourceEvents(
+      theirImplicitEnds,
+      _workingUserState,
+      specialUserStateWorker,
+      policyStore,
+      calculationReason,
+      billingMonthInfo,
+      newWalletEntries,
+      algorithmCompiler,
+      clogSome
+    )
+
+    val lastUpdateTime = TimeHelpers.nowMillis()
+
+    _workingUserState = _workingUserState.copy(
+      implicitlyIssuedSnapshot = userStateWorker.implicitlyIssuedStartEvents.toImmutableSnapshot(lastUpdateTime),
+      latestResourceEventsSnapshot = userStateWorker.previousResourceEvents.toImmutableSnapshot(lastUpdateTime),
+      stateChangeCounter = _workingUserState.stateChangeCounter + 1,
+      parentUserStateId = startingUserState.idOpt,
+      newWalletEntries = newWalletEntries.toList
+    )
+
+    clog.debug("calculationReason = %s", calculationReason)
+
+    if(calculationReason.shouldStoreUserState) {
+      val storedUserState = userStateStore.insertUserState(_workingUserState)
+      clog.debug("Saved [_id=%s] %s", storedUserState._id, storedUserState)
+      _workingUserState = storedUserState
     }
 
-    Just(EndOfBillingState(createFirstUserState(userId), ImplicitOffEvents(Nil), OutOfSyncWalletEntries(Nil)))
+    clog.debug("RETURN %s", _workingUserState)
+    clog.end()
+    _workingUserState
   }
+}
+
+/**
+ * A helper object holding intermediate state/results during resource event processing.
+ *
+ * @param previousResourceEvents
+ *          This is a collection of all the latest resource events.
+ *          We want these in order to correlate incoming resource events with their previous (in `occurredMillis` time)
+ *          ones. Will be updated on processing the next resource event.
+ *
+ * @param implicitlyIssuedStartEvents
+ *          The implicitly issued resource events at the beginning of the billing period.
+ *
+ * @param ignoredFirstResourceEvents
+ *          The resource events that were first (and unused) of their kind.
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+case class UserStateWorker(userId: String,
+                           previousResourceEvents: LatestResourceEventsWorker,
+                           implicitlyIssuedStartEvents: ImplicitlyIssuedResourceEventsWorker,
+                           ignoredFirstResourceEvents: IgnoredFirstResourceEventsWorker,
+                           accounting: Accounting,
+                           resourcesMap: DSLResourcesMap) {
 
   /**
-   * Find the previous resource event, if needed by the event's cost policy,
-   * in order to use it for any credit calculations.
+   * Finds the previous resource event by checking two possible sources: a) The implicitly terminated resource
+   * events and b) the explicit previous resource events. If the event is found, it is removed from the
+   * respective source.
+   *
+   * If the event is not found, then this must be for a new resource instance.
+   * (and probably then some `zero` resource event must be implied as the previous one)
+   *
+   * @param resource
+   * @param instanceId
+   * @return
    */
-  def findPreviousRCEventOf(rcEvent: ResourceEvent,
-                            costPolicy: DSLCostPolicy,
-                            previousRCEventsMap: mutable.Map[ResourceEvent.FullResourceType, ResourceEvent]): Maybe[ResourceEvent] = {
-
-    if(costPolicy.needsPreviousEventForCreditCalculation) {
-      // Get a previous resource only if this is needed by the policy
-      previousRCEventsMap.get(rcEvent.fullResourceInfo) match {
-        case Some(previousRCEvent) ⇒
-          Just(previousRCEvent)
-        case None ⇒
-          queryForPreviousRCEvent(rcEvent)
-      }
-    } else {
-      // No need for previous event. Will return NoVal
-      NoVal
+  def findAndRemovePreviousResourceEvent(resource: String, instanceId: String): Option[ResourceEventModel] = {
+    // implicitly issued events are checked first
+    implicitlyIssuedStartEvents.findAndRemoveResourceEvent(resource, instanceId) match {
+      case some @ Some(_) ⇒
+        some
+      case None ⇒
+        // explicit previous resource events are checked second
+        previousResourceEvents.findAndRemoveResourceEvent(resource, instanceId) match {
+          case some @ Some(_) ⇒
+            some
+          case _ ⇒
+            None
+        }
     }
   }
 
-  /**
-   * FIXME: implement
-   */
-  def queryForPreviousRCEvent(rcEvent: ResourceEvent): Maybe[ResourceEvent] = {
-    NoVal
+  def updateIgnored(resourceEvent: ResourceEventModel): Unit = {
+    ignoredFirstResourceEvents.updateResourceEvent(resourceEvent)
   }
 
-  type FullResourceType = ResourceEvent.FullResourceType
-  def updatePreviousRCEventWith(previousRCEventsMap: mutable.Map[FullResourceType, ResourceEvent],
-                                newRCEvent: ResourceEvent): Unit = {
-    previousRCEventsMap(newRCEvent.fullResourceInfo) = newRCEvent
+  def updatePrevious(resourceEvent: ResourceEventModel): Unit = {
+    previousResourceEvents.updateResourceEvent(resourceEvent)
   }
 
+  def debugTheMaps(clog: ContextualLogger)(rcDebugInfo: ResourceEventModel ⇒ String): Unit = {
+    if(previousResourceEvents.size > 0) {
+      val map = previousResourceEvents.latestEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
+      clog.debugMap("previousResourceEvents", map, 0)
+    }
+    if(implicitlyIssuedStartEvents.size > 0) {
+      val map = implicitlyIssuedStartEvents.implicitlyIssuedEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
+      clog.debugMap("implicitlyTerminatedResourceEvents", map, 0)
+    }
+    if(ignoredFirstResourceEvents.size > 0) {
+      val map = ignoredFirstResourceEvents.ignoredFirstEventsMap.map { case (k, v) => (k, rcDebugInfo(v)) }
+      clog.debugMap("ignoredFirstResourceEvents", map, 0)
+    }
+  }
+
+//  private[this]
+//  def allPreviousAndAllImplicitlyStarted: List[ResourceEvent] = {
+//    val buffer: FullMutableResourceTypeMap = scala.collection.mutable.Map[FullResourceType, ResourceEvent]()
+//
+//    buffer ++= implicitlyIssuedStartEvents.implicitlyIssuedEventsMap
+//    buffer ++= previousResourceEvents.latestEventsMap
+//
+//    buffer.valuesIterator.toList
+//  }
+
   /**
-   * Do a full month billing.
+   * Find those events from `implicitlyIssuedStartEvents` and `previousResourceEvents` that will generate implicit
+   * end events along with those implicitly issued events. Before returning, remove the events that generated the
+   * implicit ends from the internal state of this instance.
    *
-   * Takes into account "out of sync events".
-   * 
+   * @see [[gr.grnet.aquarium.logic.accounting.dsl.DSLCostPolicy]]
    */
-  def computeFullMonthlyBilling(yearOfBillingMonth: Int,
-                                billingMonth: Int,
-                                userId: String,
-                                policyFinder: UserPolicyFinder,
-                                fullStateFinder: FullStateFinder,
-                                userStateCache: UserStateCache,
-                                rcEventStore: ResourceEventStore,
-                                currentUserState: UserState,
-                                otherStuff: Traversable[Any],
-                                defaultPolicy: DSLPolicy, // Policy.policy
-                                defaultResourcesMap: DSLResourcesMap,
-                                accounting: Accounting): Maybe[EndOfBillingState] = Maybe {
-
-    val billingMonthStartDate = new DateCalculator(yearOfBillingMonth, billingMonth, 1)
-    val billingMonthStopDate = billingMonthStartDate.copy.goEndOfThisMonth
-
-    logger.debug("billingMonthStartDate = %s".format(billingMonthStartDate))
-    logger.debug("billingMonthStopDate  = %s".format(billingMonthStopDate))
-
-    val prevBillingMonthStartDate = billingMonthStartDate.copy.goPreviousMonth
-    val yearOfPrevBillingMonth = prevBillingMonthStartDate.getYear
-    val prevBillingMonth = prevBillingMonthStartDate.getMonthOfYear
-
-    // Check if this value is already cached and valid, otherwise compute the value
-    // TODO : cache it in case of new computation
-    val cachedStartUserStateM = userStateCache.findLatestUserStateForEndOfBillingMonth(
-      userId,
-      yearOfPrevBillingMonth,
-      prevBillingMonth)
-
-    val (previousStartUserState, newStartUserState) = cachedStartUserStateM match {
-      case Just(cachedStartUserState) ⇒
-        // So, we do have a cached user state but must check if this is still valid
-        logger.debug("Found cachedStartUserState = %s".format(cachedStartUserState))
-
-        // Check how many resource events were used to produce this user state
-        val cachedHowmanyRCEvents = cachedStartUserState.resourceEventsCounter
-
-        // Ask resource event store to see if we had any "out of sync" events for the particular (== previous)
-        // billing period.
-        val prevHowmanyOutOfSyncRCEvents = rcEventStore.countOutOfSyncEventsForBillingMonth(
-          userId,
-          yearOfPrevBillingMonth,
-          prevBillingMonth)
-        logger.debug("prevHowmanyOutOfSyncRCEvents = %s".format(prevHowmanyOutOfSyncRCEvents))
-        
-        val recomputedStartUserState = if(prevHowmanyOutOfSyncRCEvents == 0) {
-        logger.debug("Not necessary to recompute start user state, using cachedStartUserState")
-          // This is good, there were no "out of sync" resource events, so we can use the cached value
-          cachedStartUserState
-        } else {
-          // Oops, there are "out of sync" resource event. Must compute (potentially recursively)
-          logger.debug("Recompute start user state...")
-          val computedUserStateAtStartOfBillingPeriod = computeUserStateAtEndOfBillingPeriod(
-            yearOfPrevBillingMonth,
-            prevBillingMonth,
-            cachedStartUserState,
-            accounting)
-          logger.debug("computedUserStateAtStartOfiingPeriodllB = %s".format(computedUserStateAtStartOfBillingPeriod))
-          val recomputedStartUserState = computedUserStateAtStartOfBillingPeriod.asInstanceOf[Just[EndOfBillingState]].get.userState // FIXME
-          logger.debug("recomputedStartUserState = %s".format(recomputedStartUserState))
-          recomputedStartUserState
-        }
-
-        (cachedStartUserState, recomputedStartUserState)
-      case NoVal ⇒
-        // We do not even have a cached value, so compute one!
-        logger.debug("Do not have a cachedStartUserState, computing one...")
-        val computedUserStateAtStartOfBillingPeriod = computeUserStateAtEndOfBillingPeriod(
-          yearOfPrevBillingMonth,
-          prevBillingMonth,
-          currentUserState,
-          accounting)
-        logger.debug("computedUserStateAtStartOfBillingPeriod = %s".format(computedUserStateAtStartOfBillingPeriod))
-        val recomputedStartUserState = computedUserStateAtStartOfBillingPeriod.asInstanceOf[Just[EndOfBillingState]].get.userState // FIXME
-        logger.debug("recomputedStartUserState = %s".format(recomputedStartUserState))
-
-        (recomputedStartUserState, recomputedStartUserState)
-      case Failed(e, m) ⇒
-        logger.error("[Could not find latest user state for billing month %s-%s] %s".format(yearOfPrevBillingMonth, prevBillingMonth, m), e)
-        throw new Exception(m, e)
-    }
+  def findAndRemoveGeneratorsOfImplicitEndEvents(newOccuredMillis: Long
+                                                ): (List[ResourceEventModel], List[ResourceEventModel]) = {
+    val buffer = mutable.ListBuffer[(ResourceEventModel, ResourceEventModel)]()
+    val checkSet = mutable.Set[ResourceEventModel]()
 
-    // OK. Now that we have a user state to start with (= start of billing period reference point),
-    // let us deal with the events themselves.
-    val billingStartMillis = billingMonthStartDate.toMillis
-    val billingStopMillis  = billingMonthStopDate.toMillis
-    val allBillingPeriodRelevantRCEvents = rcEventStore.findAllRelevantResourceEventsForBillingPeriod(userId, billingStartMillis, billingStopMillis)
-    logger.debug("allBillingPeriodRelevantRCEvents [%s] = %s".format(allBillingPeriodRelevantRCEvents.size, allBillingPeriodRelevantRCEvents))
-
-    type FullResourceType = ResourceEvent.FullResourceType
-    // For each type and instance of resource, we keep the previously met resource event.
-    val previousRCEventsMap = mutable.Map[FullResourceType, ResourceEvent]()
-    // Since we may already have some implicit events from the beginning of the billing period, we put
-    // them to the map.
-    // TODO:
-    val impliedRCEventsMap  = mutable.Map[FullResourceType, ResourceEvent]() // those which do not exists but are
-    // implied in order to do billing calculations (e.g. the "off" vmtime resource event)
-
-    // Our temporary state holder.
-    var _workingUserState = newStartUserState
-    val nowMillis = TimeHelpers.nowMillis
-    var _counter = 0
-
-    for(currentResourceEvent <- allBillingPeriodRelevantRCEvents) {
-      _counter = _counter + 1
-      val resource = currentResourceEvent.resource
-      val instanceId = currentResourceEvent.instanceId
-
-      logger.debug("%02d. Processing %s".format(_counter, currentResourceEvent.toDebugString(defaultResourcesMap, true)))
-      // ResourCe events Debug
-      // =     =         =
-      def RCD(fmt: String, args: Any*) = logger.debug(" ⇒ " + fmt.format(args:_*))
-      
-      RCD("previousRCEventsMap: ")
+    def doItFor(map: ResourceEventModel.FullMutableResourceTypeMap): Unit = {
+      val resourceEvents = map.valuesIterator
       for {
-        (k, v) <- previousRCEventsMap
+        resourceEvent ← resourceEvents
+        dslResource   ← resourcesMap.findResource(resourceEvent.safeResource)
+        costPolicy    =  dslResource.costPolicy
       } {
-        RCD(" %s ⇒ %s".format(k, v.toDebugString(defaultResourcesMap, true)))
-      }
+        if(costPolicy.supportsImplicitEvents) {
+          if(costPolicy.mustConstructImplicitEndEventFor(resourceEvent)) {
+            val implicitEnd = costPolicy.constructImplicitEndEventFor(resourceEvent, newOccuredMillis)
 
-      // We need to do these kinds of calculations:
-      // 1. Credit state calculations
-      // 2. Resource state calculations
-
-      // How credits are computed:
-      // - "onoff" events (think "vmtime"):
-      //   - need to be considered in on/off pairs
-      //   - just use the time difference of this event to the previous one for the credit computation
-      // - "discrete" events (think "bandwidth"):
-      //   - just use their value, which is a difference already for the credit computation
-      // - "continuous" events (think "bandwidth"):
-      //   - need the previous absolute value
-      //   - need the time difference of this event to the previous one
-      //   - use both the above (previous absolute value, time difference) for the credit computation
-      //
-      // BUT ALL THE ABOVE SHOULD NOT BE CONSIDERED HERE; RATHER THEY ARE POLYMORPHIC BEHAVIOURS
-
-      // What we need to do is:
-      // A. Update user state with new resource instance amount
-      // B. Update user state with new credit
-      // C. Update ??? state with wallet entries
-
-      // The DSLCostPolicy for the resource does not change, so it is safe to use the default DSLPolicy to obtain it.
-      val costPolicyOpt = currentResourceEvent.findCostPolicy(defaultResourcesMap)
-      costPolicyOpt match {
-        case Some(costPolicy) ⇒
-          RCD("Found costPolicy = %s".format(costPolicy))
-          
-          // If this is an event for which no action is required, then OK, proceed with the next one
-          // Basically, we do nothing for ON events but we treat everything polymorphically here
-          costPolicy.isBillableEventBasedOnValue(currentResourceEvent.value) match {
-            case true ⇒
-              ///////////////////////////////////////
-              // A. Update user state with new resource instance amount
-              // TODO: Check if we are at beginning of billing period, so as to use
-              //       costPolicy.computeResourceInstanceAmountForNewBillingPeriod
-              val DefaultResourceInstanceAmount = costPolicy.getResourceInstanceInitialAmount
-              RCD("DefaultResourceInstanceAmount = %s".format(DefaultResourceInstanceAmount))
-
-              val previousAmount = currentUserState.getResourceInstanceAmount(resource, instanceId, DefaultResourceInstanceAmount)
-              RCD("previousAmount = %s".format(previousAmount))
-              val newAmount = costPolicy.computeNewResourceInstanceAmount(previousAmount, currentResourceEvent.value)
-              RCD("newAmount = %s".format(newAmount))
-
-              _workingUserState = _workingUserState.copyForResourcesSnapshotUpdate(resource, instanceId, newAmount, nowMillis)
-              // A. Update user state with new resource instance amount
-              ///////////////////////////////////////
-
-
-              ///////////////////////////////////////
-              // B. Update user state with new credit
-              val previousRCEventM = findPreviousRCEventOf(currentResourceEvent, costPolicy, previousRCEventsMap)
-              _workingUserState.findResourceInstanceSnapshot(resource, instanceId)
-              // B. Update user state with new credit
-              ///////////////////////////////////////
-
-
-              ///////////////////////////////////////
-              // C. Update ??? state with wallet entries
-
-              // C. Update ??? state with wallet entries
-              ///////////////////////////////////////
-
-            case false ⇒ // costPolicy.isBillableEventBasedOnValue(currentResourceEvent.value)
-              RCD("Ignoring not billabe (%s) %s".format(
-                currentResourceEvent.beautifyValue(defaultResourcesMap),
-                currentResourceEvent.toDebugString(defaultResourcesMap, true)))
-          }
+            if(!checkSet.contains(resourceEvent)) {
+              checkSet.add(resourceEvent)
+              buffer append ((resourceEvent, implicitEnd))
+            }
 
-        case None ⇒
-          () // ERROR
+            // remove it anyway
+            map.remove((resourceEvent.safeResource, resourceEvent.safeInstanceId))
+          }
+        }
       }
+    }
 
+    doItFor(previousResourceEvents.latestEventsMap)                // we give priority for previous
+    doItFor(implicitlyIssuedStartEvents.implicitlyIssuedEventsMap) // ... over implicitly issued...
 
-      updatePreviousRCEventWith(previousRCEventsMap, currentResourceEvent)
-    } // for(newResourceEvent <- allBillingPeriodRelevantRCEvents)
-
-
-    null
+    (buffer.view.map(_._1).toList, buffer.view.map(_._2).toList)
   }
-
-
-  /**
-  * Runs the billing algorithm on the specified period.
-  * By default, a billing period is monthly.
-  * The start of the billing period is midnight of the first day of the month we compute the bill for.
-  *
-  */
-   def doPartialMonthlyBilling(startBillingYear: Int,
-                               startBillingMonth: Int,
-                               stopBillingMillis: Long,
-                               userId: String,
-                               policyFinder: UserPolicyFinder,
-                               fullStateFinder: FullStateFinder,
-                               userStateFinder: UserStateCache,
-                               rcEventStore: ResourceEventStore,
-                               currentUserState: UserState,
-                               otherStuff: Traversable[Any],
-                               accounting: Accounting): Maybe[UserState] = Maybe {
-  
-
-     null.asInstanceOf[UserState]
-   }
 }
 
-object DefaultUserStateComputations extends UserStateComputations
\ No newline at end of file
+object UserStateWorker {
+  def fromUserState(userState: UserState, accounting: Accounting, resourcesMap: DSLResourcesMap): UserStateWorker = {
+    UserStateWorker(
+      userState.userID,
+      userState.latestResourceEventsSnapshot.toMutableWorker,
+      userState.implicitlyIssuedSnapshot.toMutableWorker,
+      IgnoredFirstResourceEventsWorker.Empty,
+      accounting,
+      resourcesMap
+    )
+  }
+}