/*
- * Copyright 2011 GRNET S.A. All rights reserved.
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
package gr.grnet.aquarium.store.memory
-import gr.grnet.aquarium.user.UserState
-import gr.grnet.aquarium.Configurable
import com.ckkloverdos.props.Props
import com.ckkloverdos.maybe.{NoVal, Just, Maybe}
import gr.grnet.aquarium.store._
import scala.collection.JavaConversions._
import java.util.Date
import collection.mutable.ConcurrentMap
-import gr.grnet.aquarium.logic.events.{WalletEntry, ResourceEvent, UserEvent, PolicyEntry}
import java.util.concurrent.ConcurrentHashMap
+import gr.grnet.aquarium.uid.ConcurrentVMLocalUIDGenerator
+import gr.grnet.aquarium.Configurable
+import gr.grnet.aquarium.event.model.{WalletEntry, PolicyEntry}
+import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
+import org.bson.types.ObjectId
+import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
+import gr.grnet.aquarium.computation.UserState
/**
- * An implementation of various stores that persists data in memory
+ * An implementation of various stores that persists data in memory.
+ *
+ * This is just for testing purposes.
*
* @author Christos KK Loverdos <loverdos@gmail.com>
* @author Georgios Gousios <gousiosg@gmail.com>
class MemStore extends UserStateStore
with Configurable with PolicyStore
- with ResourceEventStore with UserEventStore
+ with ResourceEventStore with IMEventStore
with WalletEntryStore
with StoreProvider {
- private[this] val userStateByUserId = new ConcurrentHashMap[String, Just[UserState]]()
- private val policyById: ConcurrentMap[String, PolicyEntry] = new ConcurrentHashMap[String, PolicyEntry]()
+ override type IMEvent = MemIMEvent
+ override type ResourceEvent = MemResourceEvent
+
+ private[this] val idGen = new ConcurrentVMLocalUIDGenerator(1000)
+
+ private[this] var _userStates = List[UserState]()
+ private[this] var _policyEntries = List[PolicyEntry]()
+ private[this] var _resourceEvents = List[ResourceEvent]()
+
private[this] val walletEntriesById: ConcurrentMap[String, WalletEntry] = new ConcurrentHashMap[String, WalletEntry]()
- private val userEventById: ConcurrentMap[String, UserEvent] = new ConcurrentHashMap[String, UserEvent]()
- private[this] val resourceEventsById: ConcurrentMap[String, ResourceEvent] = new ConcurrentHashMap[String, ResourceEvent]()
+ private[this] val imEventById: ConcurrentMap[String, MemIMEvent] = new ConcurrentHashMap[String, MemIMEvent]()
def configure(props: Props) = {
}
override def toString = {
val map = Map(
- "UserState" -> userStateByUserId.size,
- "ResourceEvent" -> resourceEventsById.size,
- "UserEvent" -> userEventById.size,
- "PolicyEntry" -> policyById.size,
+ "UserState" -> _userStates.size,
+ "ResourceEvent" -> _resourceEvents.size,
+ "IMEvent" -> imEventById.size,
+ "PolicyEntry" -> _policyEntries.size,
"WalletEntry" -> walletEntriesById.size
)
def walletEntryStore = this
- def userEventStore = this
+ def imEventStore = this
def policyStore = this
//- StoreProvider
//+ UserStateStore
- def storeUserState(userState: UserState): Maybe[RecordID] = {
- val userId = userState.userId
- val userStateJ = Just(userState)
- userStateByUserId.put(userId, userStateJ)
- Just(RecordID(userId))
+ def insertUserState(userState: UserState): UserState = {
+ _userStates = userState.copy(_id = new ObjectId()) :: _userStates
+ userState
}
- def findUserStateByUserId(userId: String) = {
- userStateByUserId.get(userId) match {
- case null ⇒ NoVal
- case userStateJ ⇒ userStateJ
+ def findUserStateByUserID(userID: String) = {
+ _userStates.find(_.userID == userID)
+ }
+
+ def findLatestUserStateByUserID(userID: String) = {
+ val goodOnes = _userStates.filter(_.userID == userID)
+
+ goodOnes.sortWith {
+ case (us1, us2) ⇒
+ us1.occurredMillis > us2.occurredMillis
+ } match {
+ case head :: _ ⇒
+ Some(head)
+ case _ ⇒
+ None
}
}
- def findLatestUserStateForEndOfBillingMonth(userId: String,
+ def findLatestUserStateForEndOfBillingMonth(userID: String,
yearOfBillingMonth: Int,
- billingMonth: Int): Maybe[UserState] = {
- NoVal // FIXME: implement
+ billingMonth: Int): Option[UserState] = {
+ val goodOnes = _userStates.filter { userState ⇒
+ val f1 = userState.userID == userID
+ val f2 = userState.isFullBillingMonthState
+ val bm = userState.theFullBillingMonth
+ val f3 = (bm ne null) && {
+ bm.year == yearOfBillingMonth && bm.month == billingMonth
+ }
+
+ f1 && f2 && f3
+ }
+
+ goodOnes.sortWith {
+ case (us1, us2) ⇒
+ us1.occurredMillis > us2.occurredMillis
+ } match {
+ case head :: _ ⇒
+ Some(head)
+ case _ ⇒
+ None
+ }
}
def deleteUserState(userId: String) {
- if (userStateByUserId.containsKey(userId))
- userStateByUserId.remove(userId)
+ _userStates.filterNot(_.userID == userId)
}
//- UserStateStore
//- WalletEntryStore
//+ ResourceEventStore
- def storeResourceEvent(event: ResourceEvent) = {
- resourceEventsById(event.id) = event
- Just(RecordID(event.id))
+ def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent = {
+ if(event.isInstanceOf[MemResourceEvent]) event.asInstanceOf[MemResourceEvent]
+ else {
+ import event._
+ new StdResourceEvent(
+ id,
+ occurredMillis,
+ receivedMillis,
+ userID,
+ clientID,
+ resource,
+ instanceID,
+ value,
+ eventVersion,
+ details
+ ): MemResourceEvent
+ }
+ }
+
+ override def clearResourceEvents() = {
+ _resourceEvents = Nil
+ }
+
+ def insertResourceEvent(event: ResourceEventModel) = {
+ val localEvent = createResourceEventFromOther(event)
+ _resourceEvents ::= localEvent
+ localEvent
}
def findResourceEventById(id: String) = {
- Maybe(resourceEventsById(id))
+ _resourceEvents.find(ev ⇒ ev.id == id)
}
def findResourceEventsByUserId(userId: String)
(sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
- val byUserId = resourceEventsById.valuesIterator.filter(_.userId == userId).toArray
+ val byUserId = _resourceEvents.filter(_.userID == userId).toArray
val sorted = sortWith match {
case Some(sorter) ⇒
byUserId.sortWith(sorter)
}
def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
- resourceEventsById.valuesIterator.filter { ev ⇒
- ev.userId == userId &&
+ _resourceEvents.filter { ev ⇒
+ ev.userID == userId &&
(ev.occurredMillis > timestamp)
}.toList
}
def findResourceEventsForReceivedPeriod(userId: String,
startTimeMillis: Long,
stopTimeMillis: Long): List[ResourceEvent] = {
- resourceEventsById.valuesIterator.filter { ev ⇒
- ev.userId == userId &&
- ev.receivedMillis >= startTimeMillis &&
- ev.receivedMillis <= stopTimeMillis
+ _resourceEvents.filter { ev ⇒
+ ev.userID == userId &&
+ ev.isReceivedWithinMillis(startTimeMillis, stopTimeMillis)
}.toList
}
- def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Maybe[Long] = Maybe {
- resourceEventsById.valuesIterator.filter { case ev ⇒
+ def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Long = {
+ _resourceEvents.filter { case ev ⇒
// out of sync events are those that were received in the billing month but occurred in previous (or next?)
// months
ev.isOutOfSyncForBillingPeriod(startMillis, stopMillis)
override def findAllRelevantResourceEventsForBillingPeriod(userId: String,
startMillis: Long,
stopMillis: Long): List[ResourceEvent] = {
- resourceEventsById.valuesIterator.filter { case ev ⇒
+ _resourceEvents.filter { case ev ⇒
ev.isOccurredOrReceivedWithinMillis(startMillis, stopMillis)
}.toList sortWith { case (ev1, ev2) ⇒ ev1.occurredMillis <= ev2.occurredMillis }
}
//- ResourceEventStore
- def storeUserEvent(event: UserEvent) = {userEventById += (event.id -> event); Just(RecordID(event.id))}
+ //+ IMEventStore
+ def createIMEventFromJson(json: String) = {
+ StdIMEvent.fromJsonString(json)
+ }
- def findUserEventById(id: String) = Maybe{userEventById.getOrElse(id, null)}
+ def createIMEventFromOther(event: IMEventModel) = {
+ StdIMEvent.fromOther(event)
+ }
- def findUserEventsByUserId(userId: String) = userEventById.values.filter{v => v.userId == userId}.toList
+ def insertIMEvent(event: IMEventModel) = {
+ val localEvent = createIMEventFromOther(event)
+ imEventById += (event.id -> localEvent)
+ localEvent
+ }
- def loadPolicies(after: Long) = policyById.values.foldLeft(List[PolicyEntry]()){
- (acc, v) => if(v.validFrom > after) v :: acc else acc
+ def findIMEventById(id: String) = imEventById.get(id)
+ //- IMEventStore
+
+ def loadPolicyEntriesAfter(after: Long) =
+ _policyEntries.filter(p => p.validFrom > after)
+ .sortWith((a,b) => a.validFrom < b.validFrom)
+
+ def storePolicyEntry(policy: PolicyEntry) = {_policyEntries = policy :: _policyEntries; Just(RecordID(policy.id))}
+
+ def updatePolicyEntry(policy: PolicyEntry) =
+ _policyEntries = _policyEntries.foldLeft(List[PolicyEntry]()){
+ (acc, p) =>
+ if (p.id == policy.id)
+ policy :: acc
+ else
+ p :: acc
}
- def storePolicy(policy: PolicyEntry) = {policyById += (policy.id -> policy); Just(RecordID(policy.id))}
+ def findPolicyEntry(id: String) = _policyEntries.find(p => p.id == id) match {
+ case Some(x) => Just(x)
+ case None => NoVal
+ }
+}
- def updatePolicy(policy: PolicyEntry) = storePolicy(policy)
+object MemStore {
+ final def isLocalIMEvent(event: IMEventModel) = event match {
+ case _: MemIMEvent ⇒ true
+ case _ ⇒ false
+ }
}
\ No newline at end of file