Calculate user state from resource events (wip).
authorChristos KK Loverdos <loverdos@gmail.com>
Mon, 2 Jan 2012 16:10:15 +0000 (18:10 +0200)
committerChristos KK Loverdos <loverdos@gmail.com>
Mon, 2 Jan 2012 16:10:15 +0000 (18:10 +0200)
15 files changed:
src/main/scala/gr/grnet/aquarium/logic/events/ProcessedResourceEvent.scala [new file with mode: 0644]
src/main/scala/gr/grnet/aquarium/logic/events/ResourceEvent.scala
src/main/scala/gr/grnet/aquarium/logic/events/ResourceNames.scala
src/main/scala/gr/grnet/aquarium/logic/events/ResourceType.scala
src/main/scala/gr/grnet/aquarium/logic/events/WalletEntry.scala
src/main/scala/gr/grnet/aquarium/processor/actor/DispatcherActor.scala
src/main/scala/gr/grnet/aquarium/processor/actor/DispatcherMessage.scala
src/main/scala/gr/grnet/aquarium/processor/actor/ResourceEventProcessorService.scala
src/main/scala/gr/grnet/aquarium/store/WalletEntryStore.scala
src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala
src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala
src/main/scala/gr/grnet/aquarium/user/UserState.scala
src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala
src/main/scala/gr/grnet/aquarium/user/actor/UserActorManager.scala
src/main/scala/gr/grnet/aquarium/util/TimeHelpers.scala [new file with mode: 0644]

diff --git a/src/main/scala/gr/grnet/aquarium/logic/events/ProcessedResourceEvent.scala b/src/main/scala/gr/grnet/aquarium/logic/events/ProcessedResourceEvent.scala
new file mode 100644 (file)
index 0000000..0766bdb
--- /dev/null
@@ -0,0 +1,11 @@
+package gr.grnet.aquarium.logic.events
+
+/**
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+case class ProcessedResourceEvent(
+    refId: String,       // ID of the resource event this one refers to
+    refResource: String, // The resource name of the referred to event
+    refOccurred: Long    // The millis when the referred to event occurred)
+)
\ No newline at end of file
index b474b8b..4e1a467 100644 (file)
@@ -38,6 +38,7 @@ package gr.grnet.aquarium.logic.events
 import gr.grnet.aquarium.logic.accounting.Policy
 import net.liftweb.json.{JsonAST, Xml}
 import gr.grnet.aquarium.util.json.JsonHelpers
+import gr.grnet.aquarium.user.UserState
 
 /**
  * Event sent to Aquarium by clients for resource accounting.
@@ -49,12 +50,12 @@ case class ResourceEvent(
     override val id: String,           // The id at the client side (the sender) TODO: Rename to remoteId or something...
     override val occurredMillis: Long, // When it occurred at client side (the sender)
     override val receivedMillis: Long, // When it was received by Aquarium
-    userId: String,
-    clientId: String,
+    userId: String,                    // The user for which this resource is relevant
+    clientId: String,                  // The unique client identifier (usually some hash)
     resource: String,                  // String representation of the resource type (e.g. "bndup", "vmtime").
     eventVersion: String,
     value: Float,
-    details: Map[String, String])
+    details: ResourceEvent.Details)
   extends AquariumEvent(id, occurredMillis, receivedMillis) {
 
   def validate() : Boolean = {
@@ -74,7 +75,7 @@ case class ResourceEvent(
     true
   }
 
-  def resourceType = ResourceType fromName resource
+  def resourceType = ResourceType fromResourceEvent this
 
   def isKnownResourceType = resourceType.isKnownType
 
@@ -85,9 +86,18 @@ case class ResourceEvent(
   def isDiskSpace = resourceType.isDiskSpace
 
   def isVMTime = resourceType.isVMTime
+
+  /**
+   * Calculates the new `UserState` based on this resource event, the calculated wallet entries related to this resource
+   * event and the current `UserState`.
+   */
+  def calcStateChange(walletEntries: List[WalletEntry], userState: UserState): UserState =
+    resourceType.calcStateChange(this, walletEntries, userState)
 }
 
 object ResourceEvent {
+  type Details = Map[String, String]
+  
   def fromJson(json: String): ResourceEvent = {
     JsonHelpers.jsonToObject[ResourceEvent](json)
   }
@@ -108,9 +118,13 @@ object ResourceEvent {
     final val _id = "_id"
     final val id = "id"
     final val userId = "userId"
-    final val timestamp = "timestamp"
+    final val timestamp = "timestamp" // TODO: deprecate in favor of "occurredMillis"
+    final val occurredMillis = "occurredMillis"
+    final val receivedMillis = "receivedMillis"
     final val clientId = "clientId"
 
+    // ResourceType: VMTime
     final val vmId = "vmId"
+    final val action = "action" // "on", "off"
   }
 }
\ No newline at end of file
index 4512ed8..1865444 100644 (file)
@@ -36,7 +36,8 @@
 package gr.grnet.aquarium.logic.events
 
 /**
- * All known resource names are here.
+ * All known resource names are here. These represent the resources that Aquarium can handle.
+ *
  * These must be constants across the platform.
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>
index 63f992e..e527d52 100644 (file)
 
 package gr.grnet.aquarium.logic.events
 
+import gr.grnet.aquarium.util.TimeHelpers.nowMillis
+import gr.grnet.aquarium.user._
+
+
 /**
  * This is an object representation for a resource name, which provides convenient querying methods.
  *
+ * Also, a `ResourceType` knows how to compute a state change from a particular `ResourceEvent`.
+ *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 sealed abstract class ResourceType(_name: String) {
   def resourceName = _name
 
+  /**
+   * Return true if the resource type must lead to wallet entries generation and, thus, credit diffs.
+   *
+   * Normally, this should always be the case.
+   */
+  def isBillableType = true
+
+  /**
+   * A resource type is independent if it can, by itself only, create a wallet entry.
+   *
+   * It is dependent if it needs one or more other events of he same type to
+   */
+  def isIndependentType = true
+
   def isKnownType = true
   def isDiskSpace = false
   def isVMTime = false
   def isBandwidthUpload = false
   def isBandwidthDownload = false
+
+  /**
+   * Calculates the new `UserState` based on the provided resource event, the calculated wallet entries
+   * and the current `UserState`.
+   *
+   * This method is an implementation detail and is not exposed. The actual user-level API is provided in `ResourceEvent`.
+   */
+  private[events] final
+  def calcStateChange(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry], userState: UserState): UserState = {
+    val otherState = calcOtherStateChange(resourceEvent, walletEntries, userState)
+    val newCredits = calcNewCreditSnapshot(walletEntries, userState)
+    otherState.copy(credits = newCredits)
+  }
+
+  private[events]
+  def calcOtherStateChange(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry], userState: UserState): UserState
+  
+  private[events] final
+  def calcNewCreditSnapshot(walletEntries: List[WalletEntry], userState: UserState): CreditSnapshot = {
+    val newCredits = for {
+      walletEntry <- walletEntries if(walletEntry.finalized)
+    } yield walletEntry.value.toDouble
+
+    val newCreditSum = newCredits.sum
+    val now = System.currentTimeMillis()
+
+    CreditSnapshot(userState.credits.data + newCreditSum, now)
+  }
 }
 
 /**
  * Companion object used to parse a resource name and provide an object representation in the form
  * of a `ResourceType`.
  *
+ * Known resource names, which represent Aquarium resources, are like "bndup", "vmtime" etc. and they are all
+ * defined in `ResourceNames`.
+ *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 object ResourceType {
@@ -65,24 +116,75 @@ object ResourceType {
       case _                     ⇒ UnknownResourceType(name)
     }
   }
+
+  def fromResourceEvent(resourceEvent: ResourceEvent): ResourceType = fromName(resourceEvent.resource)
 }
 
 case object BandwidthDown extends ResourceType(ResourceNames.bnddown) {
   override def isBandwidthDownload = true
+
+  private[events]
+  def calcOtherStateChange(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry], userState: UserState) = {
+    val oldBandwidthDownValue = userState.bandwidthDown.data
+    val bandwidthDownDiff = resourceEvent.value
+
+    val newBandwidth = BandwidthDownSnapshot(oldBandwidthDownValue + bandwidthDownDiff, nowMillis)
+
+    userState.copy(bandwidthDown = newBandwidth)
+  }
 }
 
 case object BandwidthUp extends ResourceType(ResourceNames.bndup) {
   override def isBandwidthUpload = true
+
+  private[events]
+  def calcOtherStateChange(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry], userState: UserState) = {
+    val oldBandwidthUpValue = userState.bandwidthUp.data
+    val bandwidthUpDiff = resourceEvent.value
+
+    val newBandwidth = BandwidthUpSnapshot(oldBandwidthUpValue + bandwidthUpDiff, nowMillis)
+
+    userState.copy(bandwidthUp = newBandwidth)
+  }
 }
 
 case object VMTime extends ResourceType(ResourceNames.vmtime) {
   override def isVMTime = true
+
+  override def isIndependentType = false
+
+  def isVMTimeOn(eventDetails: ResourceEvent.Details) = eventDetails.get(ResourceEvent.JsonNames.action) match {
+    case Some("on") ⇒ true
+    case Some("up") ⇒ true
+    case _          ⇒ false
+  }
+  
+  def isVMTimeOff(eventDetails: ResourceEvent.Details) = eventDetails.get(ResourceEvent.JsonNames.action) match {
+    case Some("off")  ⇒ true
+    case Some("down") ⇒ true
+    case _            ⇒ false
+  }
+
+  private[events] def calcOtherStateChange(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry], userState: UserState) = {
+    // FIXME: implement
+    userState
+  }
 }
 
 case object DiskSpace extends ResourceType(ResourceNames.dsksp) {
   override def isDiskSpace = true
+
+  private[events] def calcOtherStateChange(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry], userState: UserState) = {
+    val oldDiskSpaceValue = userState.diskSpace.data
+    val diskSpaceDiff = resourceEvent.value
+    val newDiskSpace = DiskSpaceSnapshot(oldDiskSpaceValue + diskSpaceDiff, nowMillis)
+    userState.copy(diskSpace = newDiskSpace)
+  }
 }
 
 case class UnknownResourceType(originalName: String) extends ResourceType(ResourceNames.unknown) {
   override def isKnownType = false
+
+  private[events] def
+  calcOtherStateChange(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry], userState: UserState) = userState
 }
index ebd2a74..3b11a2c 100644 (file)
@@ -6,12 +6,18 @@ import gr.grnet.aquarium.util.json.JsonHelpers
  * A WalletEntry is a derived entity. Its data represent money/credits and are calculated based on
  * resource events.
  *
+ * Wallet entries give us a picture of when credits are calculated from resource events.
+ *
  * @author Georgios Gousios <gousiosg@gmail.com>
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 case class WalletEntry(
     override val id: String,           // The id at the client side (the sender) TODO: Rename to remoteId or something...
-    override val occurredMillis: Long, // When it occurred at client side (the sender)
+    // When it occurred at client side (the sender).
+    // This denotes the `occurredMillis` attribute of the oldest resource event that is referenced
+    // by `sourceEventIDs`. The reason for this convention is pure from a query-oriented point of view.
+    // See how things are calculated in `UserActor`.
+    override val occurredMillis: Long,
     override val receivedMillis: Long, // When it was received by Aquarium
     sourceEventIDs: List[String],      // The events that triggered this WalletEntry
     value: Float,
@@ -25,6 +31,10 @@ case class WalletEntry(
   assert(!userId.isEmpty)
 
   def validate = true
+  
+  def fromResourceEvent(rceId: String): Boolean = {
+    sourceEventIDs contains rceId
+  }
 }
 
 object WalletEntry {
index e2c8517..bceab01 100644 (file)
@@ -37,6 +37,7 @@ package gr.grnet.aquarium.processor.actor
 
 import gr.grnet.aquarium.util.Loggable
 import gr.grnet.aquarium.actor._
+import gr.grnet.aquarium.logic.events.ResourceEvent
 
 /**
  * Business logic dispatcher.
@@ -66,5 +67,12 @@ class DispatcherActor extends AquariumActor with Loggable {
       // forward to the user actor manager, which in turn will
       // forward to the appropriate user actor (and create one if it does not exist)
       userActorManager forward m
+
+    case m @ ProcessResourceEvent(resourceEvent) ⇒
+      logger.debug("Received %s".format(m))
+      val userActorManager = _actorProvider.actorForRole(UserActorManagerRole)
+      // forward to the user actor manager, which in turn will
+      // forward to the appropriate user actor (and create one if it does not exist)
+      userActorManager forward m
   }
 }
\ No newline at end of file
index 1c4d2a2..035cab0 100644 (file)
@@ -38,6 +38,7 @@ package gr.grnet.aquarium.processor.actor
 import gr.grnet.aquarium.actor.ActorMessage
 import gr.grnet.aquarium.user.UserState
 import gr.grnet.aquarium.util.json.{JsonSupport, JsonHelpers}
+import gr.grnet.aquarium.logic.events.ResourceEvent
 
 /**
  * This is the base class of the messages the Dispatcher understands.
@@ -71,3 +72,5 @@ case class UserResponseGetState(userId: String, state: UserState) extends Dispat
   def responseBody = state
 }
 
+case class ProcessResourceEvent(rce: ResourceEvent) extends DispatcherMessage
+
index 8150dcc..0688213 100644 (file)
@@ -51,6 +51,7 @@ import akka.config.Supervision.OneForOneStrategy
 import java.util.concurrent.ConcurrentSkipListSet
 import gr.grnet.aquarium.messaging.{MessagingNames, AkkaAMQP}
 import akka.amqp._
+import gr.grnet.aquarium.actor.DispatcherRole
 
 /**
  * An actor that gets events from the queue, stores them persistently
@@ -69,6 +70,12 @@ with Lifecycle {
 
   val redeliveries = new ConcurrentSkipListSet[String]()
 
+  private[this] def _configurator: Configurator = Configurator.MasterConfigurator
+  private[this] def _calcStateChanges(resourceEvent: ResourceEvent): Unit = {
+    val businessLogicDispacther = _configurator.actorProvider.actorForRole(DispatcherRole)
+    businessLogicDispacther ! ProcessResourceEvent(resourceEvent) // all state change, credit calc etc will happen there
+  }
+
   class QueueReader extends Actor {
 
     def receive = {
@@ -85,8 +92,10 @@ with Lifecycle {
             redeliveries.add(event.id)
             PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
           }
-        } else
-          PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
+        } else {
+          val eventWithReceivedMillis = event.copy(receivedMillis = System.currentTimeMillis())
+          PersisterManager.lb ! Persist(eventWithReceivedMillis, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
+        }
 
       case PersistOK(ackData) =>
         logger.debug("Stored event[%s] msg[%d]".format(ackData.msgId, ackData.deliveryTag))
@@ -120,19 +129,20 @@ with Lifecycle {
       case Persist(event, sender, ackData) =>
         if (exists(event))
           sender ! Duplicate(ackData)
-        else if (persist(event))
+        else if (persist(event)) {
           sender ! PersistOK(ackData)
-          // TODO: Hook here for further processing
-        else
+          // TODO: Move to some proper place (after ACK?)
+          _calcStateChanges(event)
+        } else
           sender ! PersistFailed(ackData)
       case _ => logger.warn("Unknown message")
     }
 
     def exists(event: ResourceEvent): Boolean =
-      Configurator.MasterConfigurator.resourceEventStore.findResourceEventById(event.id).isJust
+      _configurator.resourceEventStore.findResourceEventById(event.id).isJust
 
     def persist(event: ResourceEvent): Boolean = {
-      Configurator.MasterConfigurator.resourceEventStore.storeResourceEvent(event) match {
+      _configurator.resourceEventStore.storeResourceEvent(event) match {
         case Just(x) => true
         case x: Failed =>
           logger.error("Could not save event: %s".format(event))
index e8f0b67..5337245 100644 (file)
@@ -19,4 +19,9 @@ trait WalletEntryStore {
   def findUserWalletEntries(userId: String): List[WalletEntry]
 
   def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date): List[WalletEntry]
+
+  /**
+   * Finds latest wallet entries with same timestamp.
+   */
+  def findLatestUserWalletEntries(userId: String): Maybe[List[WalletEntry]]
 }
\ No newline at end of file
index 747c9ba..c7c008f 100644 (file)
@@ -45,8 +45,8 @@ import gr.grnet.aquarium.store._
 import gr.grnet.aquarium.logic.events.{WalletEntry, ResourceEvent, AquariumEvent}
 import gr.grnet.aquarium.logic.events.ResourceEvent.JsonNames
 import java.util.Date
+import com.ckkloverdos.maybe.Maybe
 import com.mongodb._
-import com.ckkloverdos.maybe.{Failed, Just, Maybe}
 
 /**
  * Mongodb implementation of the various aquarium stores.
@@ -167,14 +167,50 @@ class MongoDBStore(
 
     MongoDBStore.runQuery[WalletEntry](q, wallets)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
   }
+
+
   //-WalletEntryStore
+  def findLatestUserWalletEntries(userId: String) = {
+    Maybe {
+      val orderBy = new BasicDBObject(JsonNames.occurredMillis, -1) // -1 is descending order
+      val cursor = wallets.find().sort(orderBy)
+
+      try {
+        val buffer = new scala.collection.mutable.ListBuffer[WalletEntry]
+        if(cursor.hasNext) {
+          val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
+          buffer += walletEntry
+
+          var _previousOccurredMillis = walletEntry.occurredMillis
+          var _ok = true
+
+          while(cursor.hasNext && _ok) {
+            val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
+            var currentOccurredMillis = walletEntry.occurredMillis
+            _ok = currentOccurredMillis == _previousOccurredMillis
+            
+            if(_ok) {
+              buffer += walletEntry
+            }
+          }
+
+          buffer.toList
+        } else {
+          null
+        }
+      } finally {
+        cursor.close()
+      }
+    }
+  }
 }
 
 object MongoDBStore {
-  def RESOURCE_EVENTS_COLLECTION = "resevents"
-  def USERS_COLLECTION = "users"
-  def IM_EVENTS_COLLECTION = "imevents"
-  def IM_WALLETS = "wallets"
+  final val RESOURCE_EVENTS_COLLECTION = "resevents"
+  final val PROCESSED_RESOURCE_EVENTS_COLLECTION = "procresevents"
+  final val USERS_COLLECTION = "users"
+  final val IM_EVENTS_COLLECTION = "imevents"
+  final val IM_WALLETS = "wallets"
 
   def dbObjectToResourceEvent(dbObject: DBObject): ResourceEvent = {
     ResourceEvent.fromJson(JSON.serialize(dbObject))
@@ -202,21 +238,27 @@ object MongoDBStore {
     }
   }
 
-  def runQuery[A <: AquariumEvent](query: BasicDBObject, collection: DBCollection)
+  def runQuery[A <: AquariumEvent](query: DBObject, collection: DBCollection, orderBy: DBObject = null)
                                   (deserializer: (DBObject) => A)
                                   (sortWith: Option[(A, A) => Boolean]): List[A] = {
-    val cur = collection find query
-    if(!cur.hasNext) {
-      cur.close()
+    val cursor0 = collection find query
+    val cursor = if(orderBy ne null) {
+      cursor0 sort orderBy
+    } else {
+      cursor0
+    } // I really know that docs say that it is the same cursor.
+
+    if(!cursor.hasNext) {
+      cursor.close()
       Nil
     } else {
       val buff = new ListBuffer[A]()
 
-      while(cur.hasNext) {
-        buff += deserializer apply cur.next
+      while(cursor.hasNext) {
+        buff += deserializer apply cursor.next
       }
 
-      cur.close()
+      cursor.close()
 
       sortWith match {
         case Some(sorter) => buff.toList.sortWith(sorter)
index 4384cee..15876a1 100644 (file)
@@ -60,3 +60,33 @@ case class OwnedGroupsSnapshot(data: List[String], snapshotTime: Long) extends U
 case class GroupMembershipsSnapshot(data: List[String], snapshotTime: Long) extends UserDataSnapshot[List[String]]
 
 case class OwnedResourcesSnapshot(data: Map[DSLResource, Any /*ResourceState*/], snapshotTime: Long) extends UserDataSnapshot[Map[DSLResource, Any /*ResourceState*/]]
+
+/**
+ * Bandwidth is counted in MB (?)
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+case class BandwidthUpSnapshot(data: Double, snapshotTime: Long) extends UserDataSnapshot[Double]
+
+/**
+ * Bandwidth is counted in MB (?)
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+case class BandwidthDownSnapshot(data: Double, snapshotTime: Long) extends UserDataSnapshot[Double]
+
+/**
+ * Time is counted in seconds (?)
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+case class VMTimeSnapshot(data: Double, snapshotTime: Long) extends UserDataSnapshot[Double]
+
+
+/**
+ * Disk space is counted in MB (?)
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+case class DiskSpaceSnapshot(data: Double, snapshotTime: Long) extends UserDataSnapshot[Double]
+
index e650aff..935edc9 100644 (file)
@@ -36,8 +36,7 @@
 package gr.grnet.aquarium.user
 
 import gr.grnet.aquarium.util.json.{JsonHelpers, JsonSupport}
-import net.liftweb.json.{Extraction, parse => parseJson, JsonAST, Xml}
-import gr.grnet.aquarium.logic.accounting.dsl.DSLResource
+import net.liftweb.json.{parse => parseJson, JsonAST, Xml}
 
 
 /**
@@ -48,6 +47,8 @@ import gr.grnet.aquarium.logic.accounting.dsl.DSLResource
  * The different snapshots need not agree on the snapshot time, ie. some state
  * part may be stale, while other may be fresh.
  *
+ * The user state is meant to be partially updated according to relevant events landing on Aquarium.
+ *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 
@@ -60,19 +61,24 @@ case class UserState(
     paymentOrders: PaymentOrdersSnapshot,
     ownedGroups: OwnedGroupsSnapshot,
     groupMemberships: GroupMembershipsSnapshot,
-    ownedResources: OwnedResourcesSnapshot)
-  extends JsonSupport {
+    ownedResources: OwnedResourcesSnapshot,
+    bandwidthUp: BandwidthUpSnapshot,
+    bandwidthDown: BandwidthDownSnapshot,
+    diskSpace: DiskSpaceSnapshot
+) extends JsonSupport {
 
   private[this] def _allSnapshots: List[Long] = {
     List(
       credits.snapshotTime, agreement.snapshotTime, roles.snapshotTime,
       paymentOrders.snapshotTime, ownedGroups.snapshotTime, groupMemberships.snapshotTime,
-      ownedResources.snapshotTime)
+      ownedResources.snapshotTime,
+      bandwidthUp.snapshotTime, bandwidthDown.snapshotTime,
+      diskSpace.snapshotTime)
   }
 
-  def earlierSnapshotTime: Long = _allSnapshots min
+  def oldestSnapshotTime: Long = _allSnapshots min
 
-  def latestSnapshotTime: Long  = _allSnapshots max
+  def newestSnapshotTime: Long  = _allSnapshots max
 }
 
 
index 962947d..d7a1341 100644 (file)
 
 package gr.grnet.aquarium.user.actor
 
-import gr.grnet.aquarium.user.UserState
 import gr.grnet.aquarium.util.Loggable
-import scala.PartialFunction
 import gr.grnet.aquarium.actor._
-import com.ckkloverdos.maybe.Maybe
 import gr.grnet.aquarium.Configurator
-import gr.grnet.aquarium.processor.actor.{UserResponseGetState, UserRequestGetState, UserResponseGetBalance, UserRequestGetBalance}
+import gr.grnet.aquarium.processor.actor._
+import gr.grnet.aquarium.logic.accounting.Accounting
+import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
+import gr.grnet.aquarium.user.UserState
+import gr.grnet.aquarium.logic.events.{WalletEntry, ProcessedResourceEvent, ResourceEvent}
 
 
 /**
- * 
+ *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 
@@ -57,12 +58,118 @@ class UserActor extends AquariumActor with Loggable {
   @volatile
   private[this] var _userState: UserState = _
   @volatile
-  private[this] var _actorProvider: ActorProvider = _
-  @volatile
   private[this] var _timestampTheshold: Long = _
 
   def role = UserActorRole
 
+  private[this] def _configurator: Configurator = Configurator.MasterConfigurator
+
+  private[this] def processResourceEvent(resourceEvent: ResourceEvent, checkForOlderEvents: Boolean): Unit = {
+    if(checkForOlderEvents) {
+      logger.debug("Checking for events older than %s" format resourceEvent)
+      processOlderResourceEvents(resourceEvent)
+    }
+
+    justProcessTheResourceEvent(resourceEvent, "ACTUAL")
+  }
+
+  /**
+   * Find and process older resource events.
+   *
+   * Older resource events are found based on the latest credit calculation, that is the latest
+   * wallet entry. If there are resource events past that wallet entry, then we deduce that no wallet entries
+   * have been calculated for these resource events and start from there.
+   */
+  private[this] def processOlderResourceEvents(resourceEvent: ResourceEvent): Unit = {
+    assert(_userId == resourceEvent.userId)
+    val rceId = resourceEvent.id
+    val userId = resourceEvent.userId
+    val resourceEventStore = _configurator.resourceEventStore
+    val walletEntriesStore = _configurator.walletStore
+
+    // 1. Find latest wallet entry
+    val latestWalletEntriesM = walletEntriesStore.findLatestUserWalletEntries(userId)
+    latestWalletEntriesM match {
+      case Just(latestWalletEntries) ⇒
+        // The time on which we base the selection of the older events
+        val selectionTime = latestWalletEntries.head.occurredMillis
+
+        // 2. Now find all resource events past the time of the latest wallet entry.
+        //    These events have not been processed, except probably those ones
+        //    that have the same `occurredMillis` with `selectionTime`
+        val oldRCEvents = resourceEventStore.findResourceEventsByUserIdAfterTimestamp(userId, selectionTime)
+
+        // 3. Filter out those resource events for which no wallet entry has actually been
+        //    produced.
+        val rcEventsToProcess = for {
+          oldRCEvent        <- oldRCEvents
+          oldRCEventId      = oldRCEvent.id
+          latestWalletEntry <- latestWalletEntries if(!latestWalletEntry.fromResourceEvent(oldRCEventId) && rceId != oldRCEventId)
+        } yield {
+          oldRCEvent
+        }
+
+        logger.debug("Found %s events older than %s".format(rcEventsToProcess.size, resourceEvent))
+
+        for {
+          rcEventToProcess <- rcEventsToProcess
+        } {
+          justProcessTheResourceEvent(rcEventToProcess, "OLDER")
+        }
+      case NoVal ⇒
+        logger.debug("No events to process older than %s".format(resourceEvent))
+      case Failed(e, m) ⇒
+        logger.error("[%s][%s] %s".format(e.getClass.getName, m, e.getMessage))
+    }
+  }
+
+  private[this] def _calcNewUserState(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry]): Unit = {
+    val walletEntriesStore = _configurator.walletStore
+    // 1. Store the new wallet entries
+    for(walletEntry <- walletEntries) {
+      walletEntriesStore.storeWalletEntry(walletEntry)
+    }
+    // 2. Update user state
+    val newUserState = resourceEvent.calcStateChange(walletEntries, _userState)
+    if(_userState == newUserState) {
+      logger.debug("No state change for %s".format(_userState))
+    } else {
+      logger.debug("State change from %s".format(_userState))
+      logger.debug("State change   to %s".format(newUserState))
+      _userState = newUserState
+    }
+  }
+
+  /**
+   * Process the resource event as if nothing else matters. Just do it.
+   */
+  private[this] def justProcessTheResourceEvent(resourceEvent: ResourceEvent, logLabel: String): Unit = {
+    logger.debug("Processing [%s] %s".format(logLabel, resourceEvent))
+
+    if(resourceEvent.resourceType.isIndependentType) {
+      // There is a one-to-one correspondence from an event to credit diff generation (wallet entry)
+
+      // TODO: find some other way to use the services of Accounting.
+      val accounting = new Accounting {}
+      val walletEntriesM = accounting chargeEvent resourceEvent
+
+      walletEntriesM match {
+        case Just(walletEntries) ⇒
+          _calcNewUserState(resourceEvent, walletEntries)
+        case NoVal ⇒
+          logger.debug("No wallet entries generated for %s".format(resourceEvent))
+          _calcNewUserState(resourceEvent, Nil)
+        case f @ Failed(e, m) ⇒
+          logger.error("[%s][%s] %s".format(e.getClass.getName, m, e.getMessage))
+          // TODO: What else to do on error?
+      }
+    } else {
+      // We need more than one resource event to calculate credit diffs.
+      // FIXME: implement
+      logger.error("Not processing %s".format(resourceEvent))
+    }
+  }
+
   protected def receive: Receive = {
     case UserActorStop ⇒
       self.stop()
@@ -77,9 +184,12 @@ class UserActor extends AquariumActor with Loggable {
       // TODO: query DB etc to get internal state
       logger.info("Setup my userId = %s".format(userId))
 
-    case m @ ActorProviderConfigured(actorProvider) ⇒
-      this._actorProvider = actorProvider
-      logger.info("Configured %s with %s".format(this, m))
+//    case m @ ActorProviderConfigured(actorProvider) ⇒
+//      this._actorProvider = actorProvider
+//      logger.info("Configured %s with %s".format(this, m))
+
+    case m @ ProcessResourceEvent(resourceEvent) ⇒
+      processResourceEvent(resourceEvent, true)
 
     case m @ UserRequestGetBalance(userId, timestamp) ⇒
       if(this._userId != userId) {
index 547b22c..1517d61 100644 (file)
@@ -38,7 +38,7 @@ package gr.grnet.aquarium.user.actor
 import gr.grnet.aquarium.util.Loggable
 import akka.actor.ActorRef
 import gr.grnet.aquarium.actor._
-import gr.grnet.aquarium.processor.actor.{DispatcherMessage, UserRequestGetState, UserRequestGetBalance}
+import gr.grnet.aquarium.processor.actor.{ProcessResourceEvent, DispatcherMessage, UserRequestGetState, UserRequestGetBalance}
 
 
 /**
@@ -95,5 +95,7 @@ class UserActorManager extends AquariumActor with Loggable {
     case m @ UserRequestGetState(userId, timestamp) ⇒
       _forwardToUserActor(userId, m)
 
+    case m @ ProcessResourceEvent(resourceEvent) ⇒
+      _forwardToUserActor(resourceEvent.userId, m)
   }
 }
\ No newline at end of file
diff --git a/src/main/scala/gr/grnet/aquarium/util/TimeHelpers.scala b/src/main/scala/gr/grnet/aquarium/util/TimeHelpers.scala
new file mode 100644 (file)
index 0000000..b7c8cd8
--- /dev/null
@@ -0,0 +1,12 @@
+package gr.grnet.aquarium.util
+
+/**
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+
+object TimeHelpers {
+  def nowMillis: Long = {
+    System.currentTimeMillis()
+  }
+}
\ No newline at end of file