X-Git-Url: https://code.grnet.gr/git/aquarium/blobdiff_plain/32a6a4336d24a5c061c2ad2ec049d4643de747b6..4b8ff3e0d06204e472dea2cb600a405a30c3ad1e:/src/main/scala/gr/grnet/aquarium/store/memory/MemStoreProvider.scala diff --git a/src/main/scala/gr/grnet/aquarium/store/memory/MemStoreProvider.scala b/src/main/scala/gr/grnet/aquarium/store/memory/MemStoreProvider.scala index e513606..9f0a66a 100644 --- a/src/main/scala/gr/grnet/aquarium/store/memory/MemStoreProvider.scala +++ b/src/main/scala/gr/grnet/aquarium/store/memory/MemStoreProvider.scala @@ -35,20 +35,16 @@ package gr.grnet.aquarium.store.memory +import collection.immutable +import collection.immutable.SortedMap import com.ckkloverdos.props.Props -import com.ckkloverdos.maybe.Just -import gr.grnet.aquarium.store._ -import scala.collection.JavaConversions._ -import collection.mutable.ConcurrentMap -import java.util.concurrent.ConcurrentHashMap import gr.grnet.aquarium.Configurable -import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel} -import org.bson.types.ObjectId -import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel} -import gr.grnet.aquarium.computation.state.UserState -import gr.grnet.aquarium.util.Tags import gr.grnet.aquarium.computation.BillingMonthInfo -import gr.grnet.aquarium.policy.{PolicyModel, StdPolicy} +import gr.grnet.aquarium.logic.accounting.dsl.Timeslot +import gr.grnet.aquarium.message.avro.gen.{UserAgreementHistoryMsg, UserStateMsg, IMEventMsg, ResourceEventMsg, PolicyMsg} +import gr.grnet.aquarium.message.avro.{MessageFactory, MessageHelpers, OrderingHelpers} +import gr.grnet.aquarium.store._ +import gr.grnet.aquarium.util.{Loggable, Tags} /** * An implementation of various stores that persists parts in memory. @@ -57,6 +53,7 @@ import gr.grnet.aquarium.policy.{PolicyModel, StdPolicy} * * @author Christos KK Loverdos * @author Georgios Gousios + * @author Prodromos Gerakios */ class MemStoreProvider @@ -65,18 +62,13 @@ extends StoreProvider with Configurable with PolicyStore with ResourceEventStore - with IMEventStore { - - override type IMEvent = MemIMEvent - override type ResourceEvent = MemResourceEvent - override type Policy = StdPolicy - - private[this] var _userStates = List[UserState]() - private[this] var _policies = List[Policy]() - private[this] var _resourceEvents = List[ResourceEvent]() - - private[this] val imEventById: ConcurrentMap[String, MemIMEvent] = new ConcurrentHashMap[String, MemIMEvent]() + with IMEventStore + with Loggable { + private[this] var _userStates = immutable.TreeSet[UserStateMsg]()(OrderingHelpers.DefaultUserStateMsgOrdering) + private[this] var _policies = immutable.TreeSet[PolicyMsg]()(OrderingHelpers.DefaultPolicyMsgOrdering) + private[this] var _resourceEvents = immutable.TreeSet[ResourceEventMsg]()(OrderingHelpers.DefaultResourceEventMsgOrdering) + private[this] var _imEvents = immutable.TreeSet[IMEventMsg]()(OrderingHelpers.DefaultIMEventMsgOrdering) def propertyPrefix = None @@ -87,7 +79,7 @@ extends StoreProvider val map = Map( Tags.UserStateTag -> _userStates.size, Tags.ResourceEventTag -> _resourceEvents.size, - Tags.IMEventTag -> imEventById.size, + Tags.IMEventTag -> _imEvents.size, "PolicyEntry" -> _policies.size ) @@ -105,95 +97,53 @@ extends StoreProvider //- StoreProvider + //+ UserStateStore - def insertUserState(userState: UserState): UserState = { - _userStates = userState.copy(_id = new ObjectId().toString) :: _userStates - userState + def insertUserState(event: UserStateMsg) = { + event.setInStoreID(event.getOriginalID) + _userStates += event + event } def findUserStateByUserID(userID: String) = { - _userStates.find(_.userID == userID) + _userStates.find(_.getUserID == userID) } - def findLatestUserStateForFullMonthBilling(userID: String, bmi: BillingMonthInfo): Option[UserState] = { - val goodOnes = _userStates.filter(_.theFullBillingMonth.isDefined).filter { userState ⇒ - val f1 = userState.userID == userID - val f2 = userState.isFullBillingMonthState - val bm = userState.theFullBillingMonth.get - val f3 = bm == bmi + def findLatestUserStateForFullMonthBilling(userID: String, bmi: BillingMonthInfo) = { + _userStates.filter { userState ⇒ + userState.getUserID == userID && + userState.getIsForFullMonth && + userState.getBillingYear == bmi.year && + userState.getBillingMonth == bmi.month + }.lastOption + } - f1 && f2 && f3 - } - - goodOnes.sortWith { - case (us1, us2) ⇒ - us1.occurredMillis > us2.occurredMillis - } match { - case head :: _ ⇒ - Some(head) - case _ ⇒ - None - } + def findLatestUserState(userID: String) = { + _userStates.filter(_.getUserID == userID).lastOption } //- UserStateStore //+ ResourceEventStore - def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent = { - if(event.isInstanceOf[MemResourceEvent]) event.asInstanceOf[MemResourceEvent] - else { - import event._ - new StdResourceEvent( - id, - occurredMillis, - receivedMillis, - userID, - clientID, - resource, - instanceID, - value, - eventVersion, - details - ): MemResourceEvent - } - } - - override def clearResourceEvents() = { - _resourceEvents = Nil - } - def pingResourceEventStore(): Unit = { // We are always live and kicking... } - def insertResourceEvent(event: ResourceEventModel) = { - val localEvent = createResourceEventFromOther(event) - _resourceEvents ::= localEvent - localEvent + def insertResourceEvent(event: ResourceEventMsg) = { + event.setInStoreID(event.getOriginalID) + _resourceEvents += event + event } def findResourceEventByID(id: String) = { - _resourceEvents.find(ev ⇒ ev.id == id) - } - - def findResourceEventsByUserID(userId: String) - (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = { - val byUserId = _resourceEvents.filter(_.userID == userId).toArray - val sorted = sortWith match { - case Some(sorter) ⇒ - byUserId.sortWith(sorter) - case None ⇒ - byUserId - } - - sorted.toList + _resourceEvents.find(_.getOriginalID == id) } def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = { _resourceEvents.filter { case ev ⇒ - ev.userID == userID && + ev.getUserID == userID && // out of sync events are those that were received in the billing month but occurred in previous (or next?) // months - ev.isOutOfSyncForBillingPeriod(startMillis, stopMillis) + MessageHelpers.isOutOfSyncForBillingPeriod(ev, startMillis, stopMillis) }.size.toLong } //- ResourceEventStore @@ -202,51 +152,36 @@ extends StoreProvider userID: String, startMillis: Long, stopMillis: Long - )(f: ResourceEvent ⇒ Unit): Unit = { + )(f: ResourceEventMsg ⇒ Unit): Unit = { _resourceEvents.filter { case ev ⇒ - ev.userID == userID && - ev.isOccurredWithinMillis(startMillis, stopMillis) + ev.getUserID == userID && + MessageHelpers.isOccurredWithinMillis(ev, startMillis, stopMillis) }.foreach(f) } //+ IMEventStore - def createIMEventFromJson(json: String) = { - StdIMEvent.fromJsonString(json) - } - - def createIMEventFromOther(event: IMEventModel) = { - StdIMEvent.fromOther(event) - } - def pingIMEventStore(): Unit = { } - def insertIMEvent(event: IMEventModel) = { - val localEvent = createIMEventFromOther(event) - imEventById += (event.id -> localEvent) - localEvent + def insertIMEvent(event: IMEventMsg) = { + event.setInStoreID(event.getOriginalID) + _imEvents += event + event } - def findIMEventByID(id: String) = imEventById.get(id) + def findIMEventByID(id: String) = { + _imEvents.find(_.getOriginalID == id) + } /** * Find the `CREATE` even for the given user. Note that there must be only one such event. */ - def findCreateIMEventByUserID(userID: String): Option[IMEvent] = { - imEventById.valuesIterator.filter { e ⇒ - e.userID == userID && e.isCreateUser - }.toList.sortWith { case (e1, e2) ⇒ - e1.occurredMillis < e2.occurredMillis - } headOption - } - - def findLatestIMEventByUserID(userID: String): Option[IMEvent] = { - imEventById.valuesIterator.filter(_.userID == userID).toList.sortWith { - case (us1, us2) ⇒ - us1.occurredMillis > us2.occurredMillis - } headOption + def findCreateIMEventByUserID(userID: String) = { + _imEvents.find { event ⇒ + event.getUserID() == userID && MessageHelpers.isUserCreationIMEvent(event) + } } /** @@ -255,42 +190,38 @@ extends StoreProvider * * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case. */ - def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEvent) => Unit) = { - imEventById.valuesIterator.filter(_.userID == userID).toSeq.sortWith { - case (ev1, ev2) ⇒ ev1.occurredMillis <= ev2.occurredMillis - } foreach(f) + def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEventMsg) ⇒ Boolean) = { + var _shouldContinue = true + for { + msg <- _imEvents if _shouldContinue + } { + _shouldContinue = f(msg) + } + _shouldContinue } //- IMEventStore - def loadPoliciesAfter(afterMillis: Long) = - _policies.filter(p => p.validFrom > afterMillis) - .sortWith((a,b) => a.validFrom < b.validFrom) + //+ PolicyStore + def insertPolicy(policy: PolicyMsg): PolicyMsg = synchronized { + policy.setInStoreID(policy.getOriginalID) + _policies += policy + policy + } - def findPolicyByID(id: String) = { - _policies.find(p => p.id == id) + def loadPolicyAt(atMillis: Long): Option[PolicyMsg] = synchronized { + _policies.to(MessageFactory.newDummyPolicyMsgAt(atMillis)).lastOption } - /** - * Store an accounting policy. - */ - def insertPolicy(policy: PolicyModel): Policy = { - val localPolicy = StdPolicy( - id = policy.id, - parentID = policy.parentID, - validityTimespan = policy.validityTimespan, - resourceTypes = policy.resourceTypes, - chargingBehaviors = policy.chargingBehaviors, - roleMapping = policy.roleMapping + def loadSortedPoliciesWithin(fromMillis: Long, toMillis: Long): SortedMap[Timeslot, PolicyMsg] = { + immutable.SortedMap(_policies. + from(MessageFactory.newDummyPolicyMsgAt(fromMillis)). + to(MessageFactory.newDummyPolicyMsgAt(toMillis)).toSeq. + map(p ⇒ (Timeslot(p.getValidFromMillis, p.getValidToMillis), p)): _* ) - _policies = localPolicy :: _policies - - localPolicy } -} -object MemStoreProvider { - final def isLocalIMEvent(event: IMEventModel) = event match { - case _: MemIMEvent ⇒ true - case _ ⇒ false + def foreachPolicy[U](f: (PolicyMsg) ⇒ U) { + _policies.foreach(f) } -} \ No newline at end of file + //- PolicyStore +}