--- /dev/null
+package gr.grnet.aquarium.logic.events
+
+/**
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+case class ProcessedResourceEvent(
+ refId: String, // ID of the resource event this one refers to
+ refResource: String, // The resource name of the referred to event
+ refOccurred: Long // The millis when the referred to event occurred)
+)
\ No newline at end of file
import gr.grnet.aquarium.logic.accounting.Policy
import net.liftweb.json.{JsonAST, Xml}
import gr.grnet.aquarium.util.json.JsonHelpers
+import gr.grnet.aquarium.user.UserState
/**
* Event sent to Aquarium by clients for resource accounting.
override val id: String, // The id at the client side (the sender) TODO: Rename to remoteId or something...
override val occurredMillis: Long, // When it occurred at client side (the sender)
override val receivedMillis: Long, // When it was received by Aquarium
- userId: String,
- clientId: String,
+ userId: String, // The user for which this resource is relevant
+ clientId: String, // The unique client identifier (usually some hash)
resource: String, // String representation of the resource type (e.g. "bndup", "vmtime").
eventVersion: String,
value: Float,
- details: Map[String, String])
+ details: ResourceEvent.Details)
extends AquariumEvent(id, occurredMillis, receivedMillis) {
def validate() : Boolean = {
true
}
- def resourceType = ResourceType fromName resource
+ def resourceType = ResourceType fromResourceEvent this
def isKnownResourceType = resourceType.isKnownType
def isDiskSpace = resourceType.isDiskSpace
def isVMTime = resourceType.isVMTime
+
+ /**
+ * Calculates the new `UserState` based on this resource event, the calculated wallet entries related to this resource
+ * event and the current `UserState`.
+ */
+ def calcStateChange(walletEntries: List[WalletEntry], userState: UserState): UserState =
+ resourceType.calcStateChange(this, walletEntries, userState)
}
object ResourceEvent {
+ type Details = Map[String, String]
+
def fromJson(json: String): ResourceEvent = {
JsonHelpers.jsonToObject[ResourceEvent](json)
}
final val _id = "_id"
final val id = "id"
final val userId = "userId"
- final val timestamp = "timestamp"
+ final val timestamp = "timestamp" // TODO: deprecate in favor of "occurredMillis"
+ final val occurredMillis = "occurredMillis"
+ final val receivedMillis = "receivedMillis"
final val clientId = "clientId"
+ // ResourceType: VMTime
final val vmId = "vmId"
+ final val action = "action" // "on", "off"
}
}
\ No newline at end of file
package gr.grnet.aquarium.logic.events
/**
- * All known resource names are here.
+ * All known resource names are here. These represent the resources that Aquarium can handle.
+ *
* These must be constants across the platform.
*
* @author Christos KK Loverdos <loverdos@gmail.com>
package gr.grnet.aquarium.logic.events
+import gr.grnet.aquarium.util.TimeHelpers.nowMillis
+import gr.grnet.aquarium.user._
+
+
/**
* This is an object representation for a resource name, which provides convenient querying methods.
*
+ * Also, a `ResourceType` knows how to compute a state change from a particular `ResourceEvent`.
+ *
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
sealed abstract class ResourceType(_name: String) {
def resourceName = _name
+ /**
+ * Return true if the resource type must lead to wallet entries generation and, thus, credit diffs.
+ *
+ * Normally, this should always be the case.
+ */
+ def isBillableType = true
+
+ /**
+ * A resource type is independent if it can, by itself only, create a wallet entry.
+ *
+ * It is dependent if it needs one or more other events of he same type to
+ */
+ def isIndependentType = true
+
def isKnownType = true
def isDiskSpace = false
def isVMTime = false
def isBandwidthUpload = false
def isBandwidthDownload = false
+
+ /**
+ * Calculates the new `UserState` based on the provided resource event, the calculated wallet entries
+ * and the current `UserState`.
+ *
+ * This method is an implementation detail and is not exposed. The actual user-level API is provided in `ResourceEvent`.
+ */
+ private[events] final
+ def calcStateChange(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry], userState: UserState): UserState = {
+ val otherState = calcOtherStateChange(resourceEvent, walletEntries, userState)
+ val newCredits = calcNewCreditSnapshot(walletEntries, userState)
+ otherState.copy(credits = newCredits)
+ }
+
+ private[events]
+ def calcOtherStateChange(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry], userState: UserState): UserState
+
+ private[events] final
+ def calcNewCreditSnapshot(walletEntries: List[WalletEntry], userState: UserState): CreditSnapshot = {
+ val newCredits = for {
+ walletEntry <- walletEntries if(walletEntry.finalized)
+ } yield walletEntry.value.toDouble
+
+ val newCreditSum = newCredits.sum
+ val now = System.currentTimeMillis()
+
+ CreditSnapshot(userState.credits.data + newCreditSum, now)
+ }
}
/**
* Companion object used to parse a resource name and provide an object representation in the form
* of a `ResourceType`.
*
+ * Known resource names, which represent Aquarium resources, are like "bndup", "vmtime" etc. and they are all
+ * defined in `ResourceNames`.
+ *
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
object ResourceType {
case _ ⇒ UnknownResourceType(name)
}
}
+
+ def fromResourceEvent(resourceEvent: ResourceEvent): ResourceType = fromName(resourceEvent.resource)
}
case object BandwidthDown extends ResourceType(ResourceNames.bnddown) {
override def isBandwidthDownload = true
+
+ private[events]
+ def calcOtherStateChange(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry], userState: UserState) = {
+ val oldBandwidthDownValue = userState.bandwidthDown.data
+ val bandwidthDownDiff = resourceEvent.value
+
+ val newBandwidth = BandwidthDownSnapshot(oldBandwidthDownValue + bandwidthDownDiff, nowMillis)
+
+ userState.copy(bandwidthDown = newBandwidth)
+ }
}
case object BandwidthUp extends ResourceType(ResourceNames.bndup) {
override def isBandwidthUpload = true
+
+ private[events]
+ def calcOtherStateChange(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry], userState: UserState) = {
+ val oldBandwidthUpValue = userState.bandwidthUp.data
+ val bandwidthUpDiff = resourceEvent.value
+
+ val newBandwidth = BandwidthUpSnapshot(oldBandwidthUpValue + bandwidthUpDiff, nowMillis)
+
+ userState.copy(bandwidthUp = newBandwidth)
+ }
}
case object VMTime extends ResourceType(ResourceNames.vmtime) {
override def isVMTime = true
+
+ override def isIndependentType = false
+
+ def isVMTimeOn(eventDetails: ResourceEvent.Details) = eventDetails.get(ResourceEvent.JsonNames.action) match {
+ case Some("on") ⇒ true
+ case Some("up") ⇒ true
+ case _ ⇒ false
+ }
+
+ def isVMTimeOff(eventDetails: ResourceEvent.Details) = eventDetails.get(ResourceEvent.JsonNames.action) match {
+ case Some("off") ⇒ true
+ case Some("down") ⇒ true
+ case _ ⇒ false
+ }
+
+ private[events] def calcOtherStateChange(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry], userState: UserState) = {
+ // FIXME: implement
+ userState
+ }
}
case object DiskSpace extends ResourceType(ResourceNames.dsksp) {
override def isDiskSpace = true
+
+ private[events] def calcOtherStateChange(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry], userState: UserState) = {
+ val oldDiskSpaceValue = userState.diskSpace.data
+ val diskSpaceDiff = resourceEvent.value
+ val newDiskSpace = DiskSpaceSnapshot(oldDiskSpaceValue + diskSpaceDiff, nowMillis)
+ userState.copy(diskSpace = newDiskSpace)
+ }
}
case class UnknownResourceType(originalName: String) extends ResourceType(ResourceNames.unknown) {
override def isKnownType = false
+
+ private[events] def
+ calcOtherStateChange(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry], userState: UserState) = userState
}
* A WalletEntry is a derived entity. Its data represent money/credits and are calculated based on
* resource events.
*
+ * Wallet entries give us a picture of when credits are calculated from resource events.
+ *
* @author Georgios Gousios <gousiosg@gmail.com>
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
case class WalletEntry(
override val id: String, // The id at the client side (the sender) TODO: Rename to remoteId or something...
- override val occurredMillis: Long, // When it occurred at client side (the sender)
+ // When it occurred at client side (the sender).
+ // This denotes the `occurredMillis` attribute of the oldest resource event that is referenced
+ // by `sourceEventIDs`. The reason for this convention is pure from a query-oriented point of view.
+ // See how things are calculated in `UserActor`.
+ override val occurredMillis: Long,
override val receivedMillis: Long, // When it was received by Aquarium
sourceEventIDs: List[String], // The events that triggered this WalletEntry
value: Float,
assert(!userId.isEmpty)
def validate = true
+
+ def fromResourceEvent(rceId: String): Boolean = {
+ sourceEventIDs contains rceId
+ }
}
object WalletEntry {
import gr.grnet.aquarium.util.Loggable
import gr.grnet.aquarium.actor._
+import gr.grnet.aquarium.logic.events.ResourceEvent
/**
* Business logic dispatcher.
// forward to the user actor manager, which in turn will
// forward to the appropriate user actor (and create one if it does not exist)
userActorManager forward m
+
+ case m @ ProcessResourceEvent(resourceEvent) ⇒
+ logger.debug("Received %s".format(m))
+ val userActorManager = _actorProvider.actorForRole(UserActorManagerRole)
+ // forward to the user actor manager, which in turn will
+ // forward to the appropriate user actor (and create one if it does not exist)
+ userActorManager forward m
}
}
\ No newline at end of file
import gr.grnet.aquarium.actor.ActorMessage
import gr.grnet.aquarium.user.UserState
import gr.grnet.aquarium.util.json.{JsonSupport, JsonHelpers}
+import gr.grnet.aquarium.logic.events.ResourceEvent
/**
* This is the base class of the messages the Dispatcher understands.
def responseBody = state
}
+case class ProcessResourceEvent(rce: ResourceEvent) extends DispatcherMessage
+
import java.util.concurrent.ConcurrentSkipListSet
import gr.grnet.aquarium.messaging.{MessagingNames, AkkaAMQP}
import akka.amqp._
+import gr.grnet.aquarium.actor.DispatcherRole
/**
* An actor that gets events from the queue, stores them persistently
val redeliveries = new ConcurrentSkipListSet[String]()
+ private[this] def _configurator: Configurator = Configurator.MasterConfigurator
+ private[this] def _calcStateChanges(resourceEvent: ResourceEvent): Unit = {
+ val businessLogicDispacther = _configurator.actorProvider.actorForRole(DispatcherRole)
+ businessLogicDispacther ! ProcessResourceEvent(resourceEvent) // all state change, credit calc etc will happen there
+ }
+
class QueueReader extends Actor {
def receive = {
redeliveries.add(event.id)
PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
}
- } else
- PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
+ } else {
+ val eventWithReceivedMillis = event.copy(receivedMillis = System.currentTimeMillis())
+ PersisterManager.lb ! Persist(eventWithReceivedMillis, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
+ }
case PersistOK(ackData) =>
logger.debug("Stored event[%s] msg[%d]".format(ackData.msgId, ackData.deliveryTag))
case Persist(event, sender, ackData) =>
if (exists(event))
sender ! Duplicate(ackData)
- else if (persist(event))
+ else if (persist(event)) {
sender ! PersistOK(ackData)
- // TODO: Hook here for further processing
- else
+ // TODO: Move to some proper place (after ACK?)
+ _calcStateChanges(event)
+ } else
sender ! PersistFailed(ackData)
case _ => logger.warn("Unknown message")
}
def exists(event: ResourceEvent): Boolean =
- Configurator.MasterConfigurator.resourceEventStore.findResourceEventById(event.id).isJust
+ _configurator.resourceEventStore.findResourceEventById(event.id).isJust
def persist(event: ResourceEvent): Boolean = {
- Configurator.MasterConfigurator.resourceEventStore.storeResourceEvent(event) match {
+ _configurator.resourceEventStore.storeResourceEvent(event) match {
case Just(x) => true
case x: Failed =>
logger.error("Could not save event: %s".format(event))
def findUserWalletEntries(userId: String): List[WalletEntry]
def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date): List[WalletEntry]
+
+ /**
+ * Finds latest wallet entries with same timestamp.
+ */
+ def findLatestUserWalletEntries(userId: String): Maybe[List[WalletEntry]]
}
\ No newline at end of file
import gr.grnet.aquarium.logic.events.{WalletEntry, ResourceEvent, AquariumEvent}
import gr.grnet.aquarium.logic.events.ResourceEvent.JsonNames
import java.util.Date
+import com.ckkloverdos.maybe.Maybe
import com.mongodb._
-import com.ckkloverdos.maybe.{Failed, Just, Maybe}
/**
* Mongodb implementation of the various aquarium stores.
MongoDBStore.runQuery[WalletEntry](q, wallets)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
}
+
+
//-WalletEntryStore
+ def findLatestUserWalletEntries(userId: String) = {
+ Maybe {
+ val orderBy = new BasicDBObject(JsonNames.occurredMillis, -1) // -1 is descending order
+ val cursor = wallets.find().sort(orderBy)
+
+ try {
+ val buffer = new scala.collection.mutable.ListBuffer[WalletEntry]
+ if(cursor.hasNext) {
+ val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
+ buffer += walletEntry
+
+ var _previousOccurredMillis = walletEntry.occurredMillis
+ var _ok = true
+
+ while(cursor.hasNext && _ok) {
+ val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next())
+ var currentOccurredMillis = walletEntry.occurredMillis
+ _ok = currentOccurredMillis == _previousOccurredMillis
+
+ if(_ok) {
+ buffer += walletEntry
+ }
+ }
+
+ buffer.toList
+ } else {
+ null
+ }
+ } finally {
+ cursor.close()
+ }
+ }
+ }
}
object MongoDBStore {
- def RESOURCE_EVENTS_COLLECTION = "resevents"
- def USERS_COLLECTION = "users"
- def IM_EVENTS_COLLECTION = "imevents"
- def IM_WALLETS = "wallets"
+ final val RESOURCE_EVENTS_COLLECTION = "resevents"
+ final val PROCESSED_RESOURCE_EVENTS_COLLECTION = "procresevents"
+ final val USERS_COLLECTION = "users"
+ final val IM_EVENTS_COLLECTION = "imevents"
+ final val IM_WALLETS = "wallets"
def dbObjectToResourceEvent(dbObject: DBObject): ResourceEvent = {
ResourceEvent.fromJson(JSON.serialize(dbObject))
}
}
- def runQuery[A <: AquariumEvent](query: BasicDBObject, collection: DBCollection)
+ def runQuery[A <: AquariumEvent](query: DBObject, collection: DBCollection, orderBy: DBObject = null)
(deserializer: (DBObject) => A)
(sortWith: Option[(A, A) => Boolean]): List[A] = {
- val cur = collection find query
- if(!cur.hasNext) {
- cur.close()
+ val cursor0 = collection find query
+ val cursor = if(orderBy ne null) {
+ cursor0 sort orderBy
+ } else {
+ cursor0
+ } // I really know that docs say that it is the same cursor.
+
+ if(!cursor.hasNext) {
+ cursor.close()
Nil
} else {
val buff = new ListBuffer[A]()
- while(cur.hasNext) {
- buff += deserializer apply cur.next
+ while(cursor.hasNext) {
+ buff += deserializer apply cursor.next
}
- cur.close()
+ cursor.close()
sortWith match {
case Some(sorter) => buff.toList.sortWith(sorter)
case class GroupMembershipsSnapshot(data: List[String], snapshotTime: Long) extends UserDataSnapshot[List[String]]
case class OwnedResourcesSnapshot(data: Map[DSLResource, Any /*ResourceState*/], snapshotTime: Long) extends UserDataSnapshot[Map[DSLResource, Any /*ResourceState*/]]
+
+/**
+ * Bandwidth is counted in MB (?)
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+case class BandwidthUpSnapshot(data: Double, snapshotTime: Long) extends UserDataSnapshot[Double]
+
+/**
+ * Bandwidth is counted in MB (?)
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+case class BandwidthDownSnapshot(data: Double, snapshotTime: Long) extends UserDataSnapshot[Double]
+
+/**
+ * Time is counted in seconds (?)
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+case class VMTimeSnapshot(data: Double, snapshotTime: Long) extends UserDataSnapshot[Double]
+
+
+/**
+ * Disk space is counted in MB (?)
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+case class DiskSpaceSnapshot(data: Double, snapshotTime: Long) extends UserDataSnapshot[Double]
+
package gr.grnet.aquarium.user
import gr.grnet.aquarium.util.json.{JsonHelpers, JsonSupport}
-import net.liftweb.json.{Extraction, parse => parseJson, JsonAST, Xml}
-import gr.grnet.aquarium.logic.accounting.dsl.DSLResource
+import net.liftweb.json.{parse => parseJson, JsonAST, Xml}
/**
* The different snapshots need not agree on the snapshot time, ie. some state
* part may be stale, while other may be fresh.
*
+ * The user state is meant to be partially updated according to relevant events landing on Aquarium.
+ *
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
paymentOrders: PaymentOrdersSnapshot,
ownedGroups: OwnedGroupsSnapshot,
groupMemberships: GroupMembershipsSnapshot,
- ownedResources: OwnedResourcesSnapshot)
- extends JsonSupport {
+ ownedResources: OwnedResourcesSnapshot,
+ bandwidthUp: BandwidthUpSnapshot,
+ bandwidthDown: BandwidthDownSnapshot,
+ diskSpace: DiskSpaceSnapshot
+) extends JsonSupport {
private[this] def _allSnapshots: List[Long] = {
List(
credits.snapshotTime, agreement.snapshotTime, roles.snapshotTime,
paymentOrders.snapshotTime, ownedGroups.snapshotTime, groupMemberships.snapshotTime,
- ownedResources.snapshotTime)
+ ownedResources.snapshotTime,
+ bandwidthUp.snapshotTime, bandwidthDown.snapshotTime,
+ diskSpace.snapshotTime)
}
- def earlierSnapshotTime: Long = _allSnapshots min
+ def oldestSnapshotTime: Long = _allSnapshots min
- def latestSnapshotTime: Long = _allSnapshots max
+ def newestSnapshotTime: Long = _allSnapshots max
}
package gr.grnet.aquarium.user.actor
-import gr.grnet.aquarium.user.UserState
import gr.grnet.aquarium.util.Loggable
-import scala.PartialFunction
import gr.grnet.aquarium.actor._
-import com.ckkloverdos.maybe.Maybe
import gr.grnet.aquarium.Configurator
-import gr.grnet.aquarium.processor.actor.{UserResponseGetState, UserRequestGetState, UserResponseGetBalance, UserRequestGetBalance}
+import gr.grnet.aquarium.processor.actor._
+import gr.grnet.aquarium.logic.accounting.Accounting
+import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
+import gr.grnet.aquarium.user.UserState
+import gr.grnet.aquarium.logic.events.{WalletEntry, ProcessedResourceEvent, ResourceEvent}
/**
- *
+ *
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
@volatile
private[this] var _userState: UserState = _
@volatile
- private[this] var _actorProvider: ActorProvider = _
- @volatile
private[this] var _timestampTheshold: Long = _
def role = UserActorRole
+ private[this] def _configurator: Configurator = Configurator.MasterConfigurator
+
+ private[this] def processResourceEvent(resourceEvent: ResourceEvent, checkForOlderEvents: Boolean): Unit = {
+ if(checkForOlderEvents) {
+ logger.debug("Checking for events older than %s" format resourceEvent)
+ processOlderResourceEvents(resourceEvent)
+ }
+
+ justProcessTheResourceEvent(resourceEvent, "ACTUAL")
+ }
+
+ /**
+ * Find and process older resource events.
+ *
+ * Older resource events are found based on the latest credit calculation, that is the latest
+ * wallet entry. If there are resource events past that wallet entry, then we deduce that no wallet entries
+ * have been calculated for these resource events and start from there.
+ */
+ private[this] def processOlderResourceEvents(resourceEvent: ResourceEvent): Unit = {
+ assert(_userId == resourceEvent.userId)
+ val rceId = resourceEvent.id
+ val userId = resourceEvent.userId
+ val resourceEventStore = _configurator.resourceEventStore
+ val walletEntriesStore = _configurator.walletStore
+
+ // 1. Find latest wallet entry
+ val latestWalletEntriesM = walletEntriesStore.findLatestUserWalletEntries(userId)
+ latestWalletEntriesM match {
+ case Just(latestWalletEntries) ⇒
+ // The time on which we base the selection of the older events
+ val selectionTime = latestWalletEntries.head.occurredMillis
+
+ // 2. Now find all resource events past the time of the latest wallet entry.
+ // These events have not been processed, except probably those ones
+ // that have the same `occurredMillis` with `selectionTime`
+ val oldRCEvents = resourceEventStore.findResourceEventsByUserIdAfterTimestamp(userId, selectionTime)
+
+ // 3. Filter out those resource events for which no wallet entry has actually been
+ // produced.
+ val rcEventsToProcess = for {
+ oldRCEvent <- oldRCEvents
+ oldRCEventId = oldRCEvent.id
+ latestWalletEntry <- latestWalletEntries if(!latestWalletEntry.fromResourceEvent(oldRCEventId) && rceId != oldRCEventId)
+ } yield {
+ oldRCEvent
+ }
+
+ logger.debug("Found %s events older than %s".format(rcEventsToProcess.size, resourceEvent))
+
+ for {
+ rcEventToProcess <- rcEventsToProcess
+ } {
+ justProcessTheResourceEvent(rcEventToProcess, "OLDER")
+ }
+ case NoVal ⇒
+ logger.debug("No events to process older than %s".format(resourceEvent))
+ case Failed(e, m) ⇒
+ logger.error("[%s][%s] %s".format(e.getClass.getName, m, e.getMessage))
+ }
+ }
+
+ private[this] def _calcNewUserState(resourceEvent: ResourceEvent, walletEntries: List[WalletEntry]): Unit = {
+ val walletEntriesStore = _configurator.walletStore
+ // 1. Store the new wallet entries
+ for(walletEntry <- walletEntries) {
+ walletEntriesStore.storeWalletEntry(walletEntry)
+ }
+ // 2. Update user state
+ val newUserState = resourceEvent.calcStateChange(walletEntries, _userState)
+ if(_userState == newUserState) {
+ logger.debug("No state change for %s".format(_userState))
+ } else {
+ logger.debug("State change from %s".format(_userState))
+ logger.debug("State change to %s".format(newUserState))
+ _userState = newUserState
+ }
+ }
+
+ /**
+ * Process the resource event as if nothing else matters. Just do it.
+ */
+ private[this] def justProcessTheResourceEvent(resourceEvent: ResourceEvent, logLabel: String): Unit = {
+ logger.debug("Processing [%s] %s".format(logLabel, resourceEvent))
+
+ if(resourceEvent.resourceType.isIndependentType) {
+ // There is a one-to-one correspondence from an event to credit diff generation (wallet entry)
+
+ // TODO: find some other way to use the services of Accounting.
+ val accounting = new Accounting {}
+ val walletEntriesM = accounting chargeEvent resourceEvent
+
+ walletEntriesM match {
+ case Just(walletEntries) ⇒
+ _calcNewUserState(resourceEvent, walletEntries)
+ case NoVal ⇒
+ logger.debug("No wallet entries generated for %s".format(resourceEvent))
+ _calcNewUserState(resourceEvent, Nil)
+ case f @ Failed(e, m) ⇒
+ logger.error("[%s][%s] %s".format(e.getClass.getName, m, e.getMessage))
+ // TODO: What else to do on error?
+ }
+ } else {
+ // We need more than one resource event to calculate credit diffs.
+ // FIXME: implement
+ logger.error("Not processing %s".format(resourceEvent))
+ }
+ }
+
protected def receive: Receive = {
case UserActorStop ⇒
self.stop()
// TODO: query DB etc to get internal state
logger.info("Setup my userId = %s".format(userId))
- case m @ ActorProviderConfigured(actorProvider) ⇒
- this._actorProvider = actorProvider
- logger.info("Configured %s with %s".format(this, m))
+// case m @ ActorProviderConfigured(actorProvider) ⇒
+// this._actorProvider = actorProvider
+// logger.info("Configured %s with %s".format(this, m))
+
+ case m @ ProcessResourceEvent(resourceEvent) ⇒
+ processResourceEvent(resourceEvent, true)
case m @ UserRequestGetBalance(userId, timestamp) ⇒
if(this._userId != userId) {
import gr.grnet.aquarium.util.Loggable
import akka.actor.ActorRef
import gr.grnet.aquarium.actor._
-import gr.grnet.aquarium.processor.actor.{DispatcherMessage, UserRequestGetState, UserRequestGetBalance}
+import gr.grnet.aquarium.processor.actor.{ProcessResourceEvent, DispatcherMessage, UserRequestGetState, UserRequestGetBalance}
/**
case m @ UserRequestGetState(userId, timestamp) ⇒
_forwardToUserActor(userId, m)
+ case m @ ProcessResourceEvent(resourceEvent) ⇒
+ _forwardToUserActor(resourceEvent.userId, m)
}
}
\ No newline at end of file
--- /dev/null
+package gr.grnet.aquarium.util
+
+/**
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+
+object TimeHelpers {
+ def nowMillis: Long = {
+ System.currentTimeMillis()
+ }
+}
\ No newline at end of file