From: Christos KK Loverdos Date: Mon, 2 Jan 2012 16:10:15 +0000 (+0200) Subject: Calculate user state from resource events (wip). X-Git-Tag: aquarium-0.1~115^2 X-Git-Url: https://code.grnet.gr/git/aquarium/commitdiff_plain/3aa0499b3476b617adefbc79ee5bd3243811ff12 Calculate user state from resource events (wip). --- diff --git a/src/main/scala/gr/grnet/aquarium/logic/events/ProcessedResourceEvent.scala b/src/main/scala/gr/grnet/aquarium/logic/events/ProcessedResourceEvent.scala new file mode 100644 index 0000000..0766bdb --- /dev/null +++ b/src/main/scala/gr/grnet/aquarium/logic/events/ProcessedResourceEvent.scala @@ -0,0 +1,11 @@ +package gr.grnet.aquarium.logic.events + +/** + * + * @author Christos KK Loverdos + */ +case class ProcessedResourceEvent( + refId: String, // ID of the resource event this one refers to + refResource: String, // The resource name of the referred to event + refOccurred: Long // The millis when the referred to event occurred) +) \ No newline at end of file diff --git a/src/main/scala/gr/grnet/aquarium/logic/events/ResourceEvent.scala b/src/main/scala/gr/grnet/aquarium/logic/events/ResourceEvent.scala index b474b8b..4e1a467 100644 --- a/src/main/scala/gr/grnet/aquarium/logic/events/ResourceEvent.scala +++ b/src/main/scala/gr/grnet/aquarium/logic/events/ResourceEvent.scala @@ -38,6 +38,7 @@ package gr.grnet.aquarium.logic.events import gr.grnet.aquarium.logic.accounting.Policy import net.liftweb.json.{JsonAST, Xml} import gr.grnet.aquarium.util.json.JsonHelpers +import gr.grnet.aquarium.user.UserState /** * Event sent to Aquarium by clients for resource accounting. @@ -49,12 +50,12 @@ case class ResourceEvent( override val id: String, // The id at the client side (the sender) TODO: Rename to remoteId or something... override val occurredMillis: Long, // When it occurred at client side (the sender) override val receivedMillis: Long, // When it was received by Aquarium - userId: String, - clientId: String, + userId: String, // The user for which this resource is relevant + clientId: String, // The unique client identifier (usually some hash) resource: String, // String representation of the resource type (e.g. "bndup", "vmtime"). eventVersion: String, value: Float, - details: Map[String, String]) + details: ResourceEvent.Details) extends AquariumEvent(id, occurredMillis, receivedMillis) { def validate() : Boolean = { @@ -74,7 +75,7 @@ case class ResourceEvent( true } - def resourceType = ResourceType fromName resource + def resourceType = ResourceType fromResourceEvent this def isKnownResourceType = resourceType.isKnownType @@ -85,9 +86,18 @@ case class ResourceEvent( def isDiskSpace = resourceType.isDiskSpace def isVMTime = resourceType.isVMTime + + /** + * Calculates the new `UserState` based on this resource event, the calculated wallet entries related to this resource + * event and the current `UserState`. + */ + def calcStateChange(walletEntries: List[WalletEntry], userState: UserState): UserState = + resourceType.calcStateChange(this, walletEntries, userState) } object ResourceEvent { + type Details = Map[String, String] + def fromJson(json: String): ResourceEvent = { JsonHelpers.jsonToObject[ResourceEvent](json) } @@ -108,9 +118,13 @@ object ResourceEvent { final val _id = "_id" final val id = "id" final val userId = "userId" - final val timestamp = "timestamp" + final val timestamp = "timestamp" // TODO: deprecate in favor of "occurredMillis" + final val occurredMillis = "occurredMillis" + final val receivedMillis = "receivedMillis" final val clientId = "clientId" + // ResourceType: VMTime final val vmId = "vmId" + final val action = "action" // "on", "off" } } \ No newline at end of file diff --git a/src/main/scala/gr/grnet/aquarium/logic/events/ResourceNames.scala b/src/main/scala/gr/grnet/aquarium/logic/events/ResourceNames.scala index 4512ed8..1865444 100644 --- a/src/main/scala/gr/grnet/aquarium/logic/events/ResourceNames.scala +++ b/src/main/scala/gr/grnet/aquarium/logic/events/ResourceNames.scala @@ -36,7 +36,8 @@ package gr.grnet.aquarium.logic.events /** - * All known resource names are here. + * All known resource names are here. These represent the resources that Aquarium can handle. + * * These must be constants across the platform. * * @author Christos KK Loverdos diff --git a/src/main/scala/gr/grnet/aquarium/logic/events/ResourceType.scala b/src/main/scala/gr/grnet/aquarium/logic/events/ResourceType.scala index 63f992e..e527d52 100644 --- a/src/main/scala/gr/grnet/aquarium/logic/events/ResourceType.scala +++ b/src/main/scala/gr/grnet/aquarium/logic/events/ResourceType.scala @@ -35,25 +35,76 @@ package gr.grnet.aquarium.logic.events +import gr.grnet.aquarium.util.TimeHelpers.nowMillis +import gr.grnet.aquarium.user._ + + /** * This is an object representation for a resource name, which provides convenient querying methods. * + * Also, a `ResourceType` knows how to compute a state change from a particular `ResourceEvent`. + * * @author Christos KK Loverdos */ sealed abstract class ResourceType(_name: String) { def resourceName = _name + /** + * Return true if the resource type must lead to wallet entries generation and, thus, credit diffs. + * + * Normally, this should always be the case. + */ + def isBillableType = true + + /** + * A resource type is independent if it can, by itself only, create a wallet entry. + * + * It is dependent if it needs one or more other events of he same type to + */ + def isIndependentType = true + def isKnownType = true def isDiskSpace = false def isVMTime = false def isBandwidthUpload = false def isBandwidthDownload = false + + /** + * Calculates the new `UserState` based on the provided resource event, the calculated wallet entries + * and the current `UserState`. + * + * This method is an implementation detail and is not exposed. The actual user-level API is provided in `ResourceEvent`. + */ + private[events] final + def calcStateChange(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry], userState: UserState): UserState = { + val otherState = calcOtherStateChange(resourceEvent, walletEntries, userState) + val newCredits = calcNewCreditSnapshot(walletEntries, userState) + otherState.copy(credits = newCredits) + } + + private[events] + def calcOtherStateChange(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry], userState: UserState): UserState + + private[events] final + def calcNewCreditSnapshot(walletEntries: List[WalletEntry], userState: UserState): CreditSnapshot = { + val newCredits = for { + walletEntry <- walletEntries if(walletEntry.finalized) + } yield walletEntry.value.toDouble + + val newCreditSum = newCredits.sum + val now = System.currentTimeMillis() + + CreditSnapshot(userState.credits.data + newCreditSum, now) + } } /** * Companion object used to parse a resource name and provide an object representation in the form * of a `ResourceType`. * + * Known resource names, which represent Aquarium resources, are like "bndup", "vmtime" etc. and they are all + * defined in `ResourceNames`. + * * @author Christos KK Loverdos */ object ResourceType { @@ -65,24 +116,75 @@ object ResourceType { case _ ⇒ UnknownResourceType(name) } } + + def fromResourceEvent(resourceEvent: ResourceEvent): ResourceType = fromName(resourceEvent.resource) } case object BandwidthDown extends ResourceType(ResourceNames.bnddown) { override def isBandwidthDownload = true + + private[events] + def calcOtherStateChange(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry], userState: UserState) = { + val oldBandwidthDownValue = userState.bandwidthDown.data + val bandwidthDownDiff = resourceEvent.value + + val newBandwidth = BandwidthDownSnapshot(oldBandwidthDownValue + bandwidthDownDiff, nowMillis) + + userState.copy(bandwidthDown = newBandwidth) + } } case object BandwidthUp extends ResourceType(ResourceNames.bndup) { override def isBandwidthUpload = true + + private[events] + def calcOtherStateChange(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry], userState: UserState) = { + val oldBandwidthUpValue = userState.bandwidthUp.data + val bandwidthUpDiff = resourceEvent.value + + val newBandwidth = BandwidthUpSnapshot(oldBandwidthUpValue + bandwidthUpDiff, nowMillis) + + userState.copy(bandwidthUp = newBandwidth) + } } case object VMTime extends ResourceType(ResourceNames.vmtime) { override def isVMTime = true + + override def isIndependentType = false + + def isVMTimeOn(eventDetails: ResourceEvent.Details) = eventDetails.get(ResourceEvent.JsonNames.action) match { + case Some("on") ⇒ true + case Some("up") ⇒ true + case _ ⇒ false + } + + def isVMTimeOff(eventDetails: ResourceEvent.Details) = eventDetails.get(ResourceEvent.JsonNames.action) match { + case Some("off") ⇒ true + case Some("down") ⇒ true + case _ ⇒ false + } + + private[events] def calcOtherStateChange(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry], userState: UserState) = { + // FIXME: implement + userState + } } case object DiskSpace extends ResourceType(ResourceNames.dsksp) { override def isDiskSpace = true + + private[events] def calcOtherStateChange(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry], userState: UserState) = { + val oldDiskSpaceValue = userState.diskSpace.data + val diskSpaceDiff = resourceEvent.value + val newDiskSpace = DiskSpaceSnapshot(oldDiskSpaceValue + diskSpaceDiff, nowMillis) + userState.copy(diskSpace = newDiskSpace) + } } case class UnknownResourceType(originalName: String) extends ResourceType(ResourceNames.unknown) { override def isKnownType = false + + private[events] def + calcOtherStateChange(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry], userState: UserState) = userState } diff --git a/src/main/scala/gr/grnet/aquarium/logic/events/WalletEntry.scala b/src/main/scala/gr/grnet/aquarium/logic/events/WalletEntry.scala index ebd2a74..3b11a2c 100644 --- a/src/main/scala/gr/grnet/aquarium/logic/events/WalletEntry.scala +++ b/src/main/scala/gr/grnet/aquarium/logic/events/WalletEntry.scala @@ -6,12 +6,18 @@ import gr.grnet.aquarium.util.json.JsonHelpers * A WalletEntry is a derived entity. Its data represent money/credits and are calculated based on * resource events. * + * Wallet entries give us a picture of when credits are calculated from resource events. + * * @author Georgios Gousios * @author Christos KK Loverdos */ case class WalletEntry( override val id: String, // The id at the client side (the sender) TODO: Rename to remoteId or something... - override val occurredMillis: Long, // When it occurred at client side (the sender) + // When it occurred at client side (the sender). + // This denotes the `occurredMillis` attribute of the oldest resource event that is referenced + // by `sourceEventIDs`. The reason for this convention is pure from a query-oriented point of view. + // See how things are calculated in `UserActor`. + override val occurredMillis: Long, override val receivedMillis: Long, // When it was received by Aquarium sourceEventIDs: List[String], // The events that triggered this WalletEntry value: Float, @@ -25,6 +31,10 @@ case class WalletEntry( assert(!userId.isEmpty) def validate = true + + def fromResourceEvent(rceId: String): Boolean = { + sourceEventIDs contains rceId + } } object WalletEntry { diff --git a/src/main/scala/gr/grnet/aquarium/processor/actor/DispatcherActor.scala b/src/main/scala/gr/grnet/aquarium/processor/actor/DispatcherActor.scala index e2c8517..bceab01 100644 --- a/src/main/scala/gr/grnet/aquarium/processor/actor/DispatcherActor.scala +++ b/src/main/scala/gr/grnet/aquarium/processor/actor/DispatcherActor.scala @@ -37,6 +37,7 @@ package gr.grnet.aquarium.processor.actor import gr.grnet.aquarium.util.Loggable import gr.grnet.aquarium.actor._ +import gr.grnet.aquarium.logic.events.ResourceEvent /** * Business logic dispatcher. @@ -66,5 +67,12 @@ class DispatcherActor extends AquariumActor with Loggable { // forward to the user actor manager, which in turn will // forward to the appropriate user actor (and create one if it does not exist) userActorManager forward m + + case m @ ProcessResourceEvent(resourceEvent) ⇒ + logger.debug("Received %s".format(m)) + val userActorManager = _actorProvider.actorForRole(UserActorManagerRole) + // forward to the user actor manager, which in turn will + // forward to the appropriate user actor (and create one if it does not exist) + userActorManager forward m } } \ No newline at end of file diff --git a/src/main/scala/gr/grnet/aquarium/processor/actor/DispatcherMessage.scala b/src/main/scala/gr/grnet/aquarium/processor/actor/DispatcherMessage.scala index 1c4d2a2..035cab0 100644 --- a/src/main/scala/gr/grnet/aquarium/processor/actor/DispatcherMessage.scala +++ b/src/main/scala/gr/grnet/aquarium/processor/actor/DispatcherMessage.scala @@ -38,6 +38,7 @@ package gr.grnet.aquarium.processor.actor import gr.grnet.aquarium.actor.ActorMessage import gr.grnet.aquarium.user.UserState import gr.grnet.aquarium.util.json.{JsonSupport, JsonHelpers} +import gr.grnet.aquarium.logic.events.ResourceEvent /** * This is the base class of the messages the Dispatcher understands. @@ -71,3 +72,5 @@ case class UserResponseGetState(userId: String, state: UserState) extends Dispat def responseBody = state } +case class ProcessResourceEvent(rce: ResourceEvent) extends DispatcherMessage + diff --git a/src/main/scala/gr/grnet/aquarium/processor/actor/ResourceEventProcessorService.scala b/src/main/scala/gr/grnet/aquarium/processor/actor/ResourceEventProcessorService.scala index 8150dcc..0688213 100644 --- a/src/main/scala/gr/grnet/aquarium/processor/actor/ResourceEventProcessorService.scala +++ b/src/main/scala/gr/grnet/aquarium/processor/actor/ResourceEventProcessorService.scala @@ -51,6 +51,7 @@ import akka.config.Supervision.OneForOneStrategy import java.util.concurrent.ConcurrentSkipListSet import gr.grnet.aquarium.messaging.{MessagingNames, AkkaAMQP} import akka.amqp._ +import gr.grnet.aquarium.actor.DispatcherRole /** * An actor that gets events from the queue, stores them persistently @@ -69,6 +70,12 @@ with Lifecycle { val redeliveries = new ConcurrentSkipListSet[String]() + private[this] def _configurator: Configurator = Configurator.MasterConfigurator + private[this] def _calcStateChanges(resourceEvent: ResourceEvent): Unit = { + val businessLogicDispacther = _configurator.actorProvider.actorForRole(DispatcherRole) + businessLogicDispacther ! ProcessResourceEvent(resourceEvent) // all state change, credit calc etc will happen there + } + class QueueReader extends Actor { def receive = { @@ -85,8 +92,10 @@ with Lifecycle { redeliveries.add(event.id) PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get)) } - } else - PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get)) + } else { + val eventWithReceivedMillis = event.copy(receivedMillis = System.currentTimeMillis()) + PersisterManager.lb ! Persist(eventWithReceivedMillis, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get)) + } case PersistOK(ackData) => logger.debug("Stored event[%s] msg[%d]".format(ackData.msgId, ackData.deliveryTag)) @@ -120,19 +129,20 @@ with Lifecycle { case Persist(event, sender, ackData) => if (exists(event)) sender ! Duplicate(ackData) - else if (persist(event)) + else if (persist(event)) { sender ! PersistOK(ackData) - // TODO: Hook here for further processing - else + // TODO: Move to some proper place (after ACK?) + _calcStateChanges(event) + } else sender ! PersistFailed(ackData) case _ => logger.warn("Unknown message") } def exists(event: ResourceEvent): Boolean = - Configurator.MasterConfigurator.resourceEventStore.findResourceEventById(event.id).isJust + _configurator.resourceEventStore.findResourceEventById(event.id).isJust def persist(event: ResourceEvent): Boolean = { - Configurator.MasterConfigurator.resourceEventStore.storeResourceEvent(event) match { + _configurator.resourceEventStore.storeResourceEvent(event) match { case Just(x) => true case x: Failed => logger.error("Could not save event: %s".format(event)) diff --git a/src/main/scala/gr/grnet/aquarium/store/WalletEntryStore.scala b/src/main/scala/gr/grnet/aquarium/store/WalletEntryStore.scala index e8f0b67..5337245 100644 --- a/src/main/scala/gr/grnet/aquarium/store/WalletEntryStore.scala +++ b/src/main/scala/gr/grnet/aquarium/store/WalletEntryStore.scala @@ -19,4 +19,9 @@ trait WalletEntryStore { def findUserWalletEntries(userId: String): List[WalletEntry] def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date): List[WalletEntry] + + /** + * Finds latest wallet entries with same timestamp. + */ + def findLatestUserWalletEntries(userId: String): Maybe[List[WalletEntry]] } \ No newline at end of file diff --git a/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala b/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala index 747c9ba..c7c008f 100644 --- a/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala +++ b/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala @@ -45,8 +45,8 @@ import gr.grnet.aquarium.store._ import gr.grnet.aquarium.logic.events.{WalletEntry, ResourceEvent, AquariumEvent} import gr.grnet.aquarium.logic.events.ResourceEvent.JsonNames import java.util.Date +import com.ckkloverdos.maybe.Maybe import com.mongodb._ -import com.ckkloverdos.maybe.{Failed, Just, Maybe} /** * Mongodb implementation of the various aquarium stores. @@ -167,14 +167,50 @@ class MongoDBStore( MongoDBStore.runQuery[WalletEntry](q, wallets)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc)) } + + //-WalletEntryStore + def findLatestUserWalletEntries(userId: String) = { + Maybe { + val orderBy = new BasicDBObject(JsonNames.occurredMillis, -1) // -1 is descending order + val cursor = wallets.find().sort(orderBy) + + try { + val buffer = new scala.collection.mutable.ListBuffer[WalletEntry] + if(cursor.hasNext) { + val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next()) + buffer += walletEntry + + var _previousOccurredMillis = walletEntry.occurredMillis + var _ok = true + + while(cursor.hasNext && _ok) { + val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next()) + var currentOccurredMillis = walletEntry.occurredMillis + _ok = currentOccurredMillis == _previousOccurredMillis + + if(_ok) { + buffer += walletEntry + } + } + + buffer.toList + } else { + null + } + } finally { + cursor.close() + } + } + } } object MongoDBStore { - def RESOURCE_EVENTS_COLLECTION = "resevents" - def USERS_COLLECTION = "users" - def IM_EVENTS_COLLECTION = "imevents" - def IM_WALLETS = "wallets" + final val RESOURCE_EVENTS_COLLECTION = "resevents" + final val PROCESSED_RESOURCE_EVENTS_COLLECTION = "procresevents" + final val USERS_COLLECTION = "users" + final val IM_EVENTS_COLLECTION = "imevents" + final val IM_WALLETS = "wallets" def dbObjectToResourceEvent(dbObject: DBObject): ResourceEvent = { ResourceEvent.fromJson(JSON.serialize(dbObject)) @@ -202,21 +238,27 @@ object MongoDBStore { } } - def runQuery[A <: AquariumEvent](query: BasicDBObject, collection: DBCollection) + def runQuery[A <: AquariumEvent](query: DBObject, collection: DBCollection, orderBy: DBObject = null) (deserializer: (DBObject) => A) (sortWith: Option[(A, A) => Boolean]): List[A] = { - val cur = collection find query - if(!cur.hasNext) { - cur.close() + val cursor0 = collection find query + val cursor = if(orderBy ne null) { + cursor0 sort orderBy + } else { + cursor0 + } // I really know that docs say that it is the same cursor. + + if(!cursor.hasNext) { + cursor.close() Nil } else { val buff = new ListBuffer[A]() - while(cur.hasNext) { - buff += deserializer apply cur.next + while(cursor.hasNext) { + buff += deserializer apply cursor.next } - cur.close() + cursor.close() sortWith match { case Some(sorter) => buff.toList.sortWith(sorter) diff --git a/src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala b/src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala index 4384cee..15876a1 100644 --- a/src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala +++ b/src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala @@ -60,3 +60,33 @@ case class OwnedGroupsSnapshot(data: List[String], snapshotTime: Long) extends U case class GroupMembershipsSnapshot(data: List[String], snapshotTime: Long) extends UserDataSnapshot[List[String]] case class OwnedResourcesSnapshot(data: Map[DSLResource, Any /*ResourceState*/], snapshotTime: Long) extends UserDataSnapshot[Map[DSLResource, Any /*ResourceState*/]] + +/** + * Bandwidth is counted in MB (?) + * + * @author Christos KK Loverdos + */ +case class BandwidthUpSnapshot(data: Double, snapshotTime: Long) extends UserDataSnapshot[Double] + +/** + * Bandwidth is counted in MB (?) + * + * @author Christos KK Loverdos + */ +case class BandwidthDownSnapshot(data: Double, snapshotTime: Long) extends UserDataSnapshot[Double] + +/** + * Time is counted in seconds (?) + * + * @author Christos KK Loverdos + */ +case class VMTimeSnapshot(data: Double, snapshotTime: Long) extends UserDataSnapshot[Double] + + +/** + * Disk space is counted in MB (?) + * + * @author Christos KK Loverdos + */ +case class DiskSpaceSnapshot(data: Double, snapshotTime: Long) extends UserDataSnapshot[Double] + diff --git a/src/main/scala/gr/grnet/aquarium/user/UserState.scala b/src/main/scala/gr/grnet/aquarium/user/UserState.scala index e650aff..935edc9 100644 --- a/src/main/scala/gr/grnet/aquarium/user/UserState.scala +++ b/src/main/scala/gr/grnet/aquarium/user/UserState.scala @@ -36,8 +36,7 @@ package gr.grnet.aquarium.user import gr.grnet.aquarium.util.json.{JsonHelpers, JsonSupport} -import net.liftweb.json.{Extraction, parse => parseJson, JsonAST, Xml} -import gr.grnet.aquarium.logic.accounting.dsl.DSLResource +import net.liftweb.json.{parse => parseJson, JsonAST, Xml} /** @@ -48,6 +47,8 @@ import gr.grnet.aquarium.logic.accounting.dsl.DSLResource * The different snapshots need not agree on the snapshot time, ie. some state * part may be stale, while other may be fresh. * + * The user state is meant to be partially updated according to relevant events landing on Aquarium. + * * @author Christos KK Loverdos */ @@ -60,19 +61,24 @@ case class UserState( paymentOrders: PaymentOrdersSnapshot, ownedGroups: OwnedGroupsSnapshot, groupMemberships: GroupMembershipsSnapshot, - ownedResources: OwnedResourcesSnapshot) - extends JsonSupport { + ownedResources: OwnedResourcesSnapshot, + bandwidthUp: BandwidthUpSnapshot, + bandwidthDown: BandwidthDownSnapshot, + diskSpace: DiskSpaceSnapshot +) extends JsonSupport { private[this] def _allSnapshots: List[Long] = { List( credits.snapshotTime, agreement.snapshotTime, roles.snapshotTime, paymentOrders.snapshotTime, ownedGroups.snapshotTime, groupMemberships.snapshotTime, - ownedResources.snapshotTime) + ownedResources.snapshotTime, + bandwidthUp.snapshotTime, bandwidthDown.snapshotTime, + diskSpace.snapshotTime) } - def earlierSnapshotTime: Long = _allSnapshots min + def oldestSnapshotTime: Long = _allSnapshots min - def latestSnapshotTime: Long = _allSnapshots max + def newestSnapshotTime: Long = _allSnapshots max } diff --git a/src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala b/src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala index 962947d..d7a1341 100644 --- a/src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala +++ b/src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala @@ -35,17 +35,18 @@ package gr.grnet.aquarium.user.actor -import gr.grnet.aquarium.user.UserState import gr.grnet.aquarium.util.Loggable -import scala.PartialFunction import gr.grnet.aquarium.actor._ -import com.ckkloverdos.maybe.Maybe import gr.grnet.aquarium.Configurator -import gr.grnet.aquarium.processor.actor.{UserResponseGetState, UserRequestGetState, UserResponseGetBalance, UserRequestGetBalance} +import gr.grnet.aquarium.processor.actor._ +import gr.grnet.aquarium.logic.accounting.Accounting +import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe} +import gr.grnet.aquarium.user.UserState +import gr.grnet.aquarium.logic.events.{WalletEntry, ProcessedResourceEvent, ResourceEvent} /** - * + * * @author Christos KK Loverdos */ @@ -57,12 +58,118 @@ class UserActor extends AquariumActor with Loggable { @volatile private[this] var _userState: UserState = _ @volatile - private[this] var _actorProvider: ActorProvider = _ - @volatile private[this] var _timestampTheshold: Long = _ def role = UserActorRole + private[this] def _configurator: Configurator = Configurator.MasterConfigurator + + private[this] def processResourceEvent(resourceEvent: ResourceEvent, checkForOlderEvents: Boolean): Unit = { + if(checkForOlderEvents) { + logger.debug("Checking for events older than %s" format resourceEvent) + processOlderResourceEvents(resourceEvent) + } + + justProcessTheResourceEvent(resourceEvent, "ACTUAL") + } + + /** + * Find and process older resource events. + * + * Older resource events are found based on the latest credit calculation, that is the latest + * wallet entry. If there are resource events past that wallet entry, then we deduce that no wallet entries + * have been calculated for these resource events and start from there. + */ + private[this] def processOlderResourceEvents(resourceEvent: ResourceEvent): Unit = { + assert(_userId == resourceEvent.userId) + val rceId = resourceEvent.id + val userId = resourceEvent.userId + val resourceEventStore = _configurator.resourceEventStore + val walletEntriesStore = _configurator.walletStore + + // 1. Find latest wallet entry + val latestWalletEntriesM = walletEntriesStore.findLatestUserWalletEntries(userId) + latestWalletEntriesM match { + case Just(latestWalletEntries) ⇒ + // The time on which we base the selection of the older events + val selectionTime = latestWalletEntries.head.occurredMillis + + // 2. Now find all resource events past the time of the latest wallet entry. + // These events have not been processed, except probably those ones + // that have the same `occurredMillis` with `selectionTime` + val oldRCEvents = resourceEventStore.findResourceEventsByUserIdAfterTimestamp(userId, selectionTime) + + // 3. Filter out those resource events for which no wallet entry has actually been + // produced. + val rcEventsToProcess = for { + oldRCEvent <- oldRCEvents + oldRCEventId = oldRCEvent.id + latestWalletEntry <- latestWalletEntries if(!latestWalletEntry.fromResourceEvent(oldRCEventId) && rceId != oldRCEventId) + } yield { + oldRCEvent + } + + logger.debug("Found %s events older than %s".format(rcEventsToProcess.size, resourceEvent)) + + for { + rcEventToProcess <- rcEventsToProcess + } { + justProcessTheResourceEvent(rcEventToProcess, "OLDER") + } + case NoVal ⇒ + logger.debug("No events to process older than %s".format(resourceEvent)) + case Failed(e, m) ⇒ + logger.error("[%s][%s] %s".format(e.getClass.getName, m, e.getMessage)) + } + } + + private[this] def _calcNewUserState(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry]): Unit = { + val walletEntriesStore = _configurator.walletStore + // 1. Store the new wallet entries + for(walletEntry <- walletEntries) { + walletEntriesStore.storeWalletEntry(walletEntry) + } + // 2. Update user state + val newUserState = resourceEvent.calcStateChange(walletEntries, _userState) + if(_userState == newUserState) { + logger.debug("No state change for %s".format(_userState)) + } else { + logger.debug("State change from %s".format(_userState)) + logger.debug("State change to %s".format(newUserState)) + _userState = newUserState + } + } + + /** + * Process the resource event as if nothing else matters. Just do it. + */ + private[this] def justProcessTheResourceEvent(resourceEvent: ResourceEvent, logLabel: String): Unit = { + logger.debug("Processing [%s] %s".format(logLabel, resourceEvent)) + + if(resourceEvent.resourceType.isIndependentType) { + // There is a one-to-one correspondence from an event to credit diff generation (wallet entry) + + // TODO: find some other way to use the services of Accounting. + val accounting = new Accounting {} + val walletEntriesM = accounting chargeEvent resourceEvent + + walletEntriesM match { + case Just(walletEntries) ⇒ + _calcNewUserState(resourceEvent, walletEntries) + case NoVal ⇒ + logger.debug("No wallet entries generated for %s".format(resourceEvent)) + _calcNewUserState(resourceEvent, Nil) + case f @ Failed(e, m) ⇒ + logger.error("[%s][%s] %s".format(e.getClass.getName, m, e.getMessage)) + // TODO: What else to do on error? + } + } else { + // We need more than one resource event to calculate credit diffs. + // FIXME: implement + logger.error("Not processing %s".format(resourceEvent)) + } + } + protected def receive: Receive = { case UserActorStop ⇒ self.stop() @@ -77,9 +184,12 @@ class UserActor extends AquariumActor with Loggable { // TODO: query DB etc to get internal state logger.info("Setup my userId = %s".format(userId)) - case m @ ActorProviderConfigured(actorProvider) ⇒ - this._actorProvider = actorProvider - logger.info("Configured %s with %s".format(this, m)) +// case m @ ActorProviderConfigured(actorProvider) ⇒ +// this._actorProvider = actorProvider +// logger.info("Configured %s with %s".format(this, m)) + + case m @ ProcessResourceEvent(resourceEvent) ⇒ + processResourceEvent(resourceEvent, true) case m @ UserRequestGetBalance(userId, timestamp) ⇒ if(this._userId != userId) { diff --git a/src/main/scala/gr/grnet/aquarium/user/actor/UserActorManager.scala b/src/main/scala/gr/grnet/aquarium/user/actor/UserActorManager.scala index 547b22c..1517d61 100644 --- a/src/main/scala/gr/grnet/aquarium/user/actor/UserActorManager.scala +++ b/src/main/scala/gr/grnet/aquarium/user/actor/UserActorManager.scala @@ -38,7 +38,7 @@ package gr.grnet.aquarium.user.actor import gr.grnet.aquarium.util.Loggable import akka.actor.ActorRef import gr.grnet.aquarium.actor._ -import gr.grnet.aquarium.processor.actor.{DispatcherMessage, UserRequestGetState, UserRequestGetBalance} +import gr.grnet.aquarium.processor.actor.{ProcessResourceEvent, DispatcherMessage, UserRequestGetState, UserRequestGetBalance} /** @@ -95,5 +95,7 @@ class UserActorManager extends AquariumActor with Loggable { case m @ UserRequestGetState(userId, timestamp) ⇒ _forwardToUserActor(userId, m) + case m @ ProcessResourceEvent(resourceEvent) ⇒ + _forwardToUserActor(resourceEvent.userId, m) } } \ No newline at end of file diff --git a/src/main/scala/gr/grnet/aquarium/util/TimeHelpers.scala b/src/main/scala/gr/grnet/aquarium/util/TimeHelpers.scala new file mode 100644 index 0000000..b7c8cd8 --- /dev/null +++ b/src/main/scala/gr/grnet/aquarium/util/TimeHelpers.scala @@ -0,0 +1,12 @@ +package gr.grnet.aquarium.util + +/** + * + * @author Christos KK Loverdos + */ + +object TimeHelpers { + def nowMillis: Long = { + System.currentTimeMillis() + } +} \ No newline at end of file