2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.store.memory
38 import com.ckkloverdos.props.Props
39 import com.ckkloverdos.maybe.{NoVal, Just, Maybe}
40 import gr.grnet.aquarium.store._
41 import scala.collection.JavaConversions._
43 import collection.mutable.ConcurrentMap
44 import java.util.concurrent.ConcurrentHashMap
45 import gr.grnet.aquarium.uid.ConcurrentVMLocalUIDGenerator
46 import gr.grnet.aquarium.Configurable
47 import gr.grnet.aquarium.event.model.{WalletEntry, PolicyEntry}
48 import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
49 import org.bson.types.ObjectId
50 import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
51 import gr.grnet.aquarium.computation.UserState
54 * An implementation of various stores that persists data in memory.
56 * This is just for testing purposes.
58 * @author Christos KK Loverdos <loverdos@gmail.com>
59 * @author Georgios Gousios <gousiosg@gmail.com>
62 class MemStore extends UserStateStore
63 with Configurable with PolicyStore
64 with ResourceEventStore with IMEventStore
68 override type IMEvent = MemIMEvent
69 override type ResourceEvent = MemResourceEvent
71 private[this] val idGen = new ConcurrentVMLocalUIDGenerator(1000)
73 private[this] var _userStates = List[UserState]()
74 private[this] var _policyEntries = List[PolicyEntry]()
75 private[this] var _resourceEvents = List[ResourceEvent]()
77 private[this] val walletEntriesById: ConcurrentMap[String, WalletEntry] = new ConcurrentHashMap[String, WalletEntry]()
78 private[this] val imEventById: ConcurrentMap[String, MemIMEvent] = new ConcurrentHashMap[String, MemIMEvent]()
81 def propertyPrefix = None
83 def configure(props: Props) = {
86 override def toString = {
88 "UserState" -> _userStates.size,
89 "ResourceEvent" -> _resourceEvents.size,
90 "IMEvent" -> imEventById.size,
91 "PolicyEntry" -> _policyEntries.size,
92 "WalletEntry" -> walletEntriesById.size
95 "MemStore(%s)" format map
99 def userStateStore = this
101 def resourceEventStore = this
103 def walletEntryStore = this
105 def imEventStore = this
107 def policyStore = this
112 def insertUserState(userState: UserState): UserState = {
113 _userStates = userState.copy(_id = new ObjectId()) :: _userStates
117 def findUserStateByUserID(userID: String) = {
118 _userStates.find(_.userID == userID)
121 def findLatestUserStateByUserID(userID: String) = {
122 val goodOnes = _userStates.filter(_.userID == userID)
126 us1.occurredMillis > us2.occurredMillis
135 def findLatestUserStateForEndOfBillingMonth(userID: String,
136 yearOfBillingMonth: Int,
137 billingMonth: Int): Option[UserState] = {
138 val goodOnes = _userStates.filter { userState ⇒
139 val f1 = userState.userID == userID
140 val f2 = userState.isFullBillingMonthState
141 val bm = userState.theFullBillingMonth
142 val f3 = (bm ne null) && {
143 bm.year == yearOfBillingMonth && bm.month == billingMonth
151 us1.occurredMillis > us2.occurredMillis
160 def deleteUserState(userId: String) {
161 _userStates.filterNot(_.userID == userId)
166 def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] = {
167 walletEntriesById.put(entry.id, entry)
168 Just(RecordID(entry.id))
171 def findWalletEntryById(id: String): Maybe[WalletEntry] = {
172 Maybe(walletEntriesById.apply(id))
175 def findUserWalletEntries(userId: String): List[WalletEntry] = {
176 walletEntriesById.valuesIterator.filter(_.userId == userId).toList
179 def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date): List[WalletEntry] = {
180 walletEntriesById.valuesIterator.filter { we ⇒
181 val receivedDate = we.receivedDate
183 we.userId == userId &&
184 ( (from before receivedDate) || (from == receivedDate) ) &&
185 ( (to after receivedDate) || (to == receivedDate) )
190 def findLatestUserWalletEntries(userId: String): Maybe[List[WalletEntry]] = NoVal
192 def findPreviousEntry(userId: String,
195 finalized: Option[Boolean]): List[WalletEntry] = Nil
197 def findWalletEntriesAfter(userID: String, from: Date): List[WalletEntry] = {
198 walletEntriesById.valuesIterator.filter { we ⇒
199 val occurredDate = we.occurredDate
201 we.userId == userID &&
202 ( (from before occurredDate) || (from == occurredDate) )
207 //+ ResourceEventStore
208 def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent = {
209 if(event.isInstanceOf[MemResourceEvent]) event.asInstanceOf[MemResourceEvent]
212 new StdResourceEvent(
227 override def clearResourceEvents() = {
228 _resourceEvents = Nil
231 def pingResourceEventStore(): Unit = {
232 // We are always live and kicking...
235 def insertResourceEvent(event: ResourceEventModel) = {
236 val localEvent = createResourceEventFromOther(event)
237 _resourceEvents ::= localEvent
241 def findResourceEventById(id: String) = {
242 _resourceEvents.find(ev ⇒ ev.id == id)
245 def findResourceEventsByUserId(userId: String)
246 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
247 val byUserId = _resourceEvents.filter(_.userID == userId).toArray
248 val sorted = sortWith match {
250 byUserId.sortWith(sorter)
258 def findResourceEventsByUserIdAfterTimestamp(userID: String, timestamp: Long): List[ResourceEvent] = {
259 _resourceEvents.filter { ev ⇒
260 ev.userID == userID &&
261 (ev.occurredMillis > timestamp)
265 def findResourceEventHistory(userId: String,
267 instid: Option[String],
268 upTo: Long): List[ResourceEvent] = {
272 def findResourceEventsForReceivedPeriod(userID: String,
273 startTimeMillis: Long,
274 stopTimeMillis: Long): List[ResourceEvent] = {
275 _resourceEvents.filter { ev ⇒
276 ev.userID == userID &&
277 ev.isReceivedWithinMillis(startTimeMillis, stopTimeMillis)
281 def countOutOfSyncEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
282 _resourceEvents.filter { case ev ⇒
283 ev.userID == userID &&
284 // out of sync events are those that were received in the billing month but occurred in previous (or next?)
286 ev.isOutOfSyncForBillingPeriod(startMillis, stopMillis)
291 * Finds all relevant resource events for the billing period.
292 * The relevant events are those:
293 * a) whose `occurredMillis` is within the given billing period or
294 * b) whose `receivedMillis` is within the given billing period.
296 * Order them by `occurredMillis`
298 override def findAllRelevantResourceEventsForBillingPeriod(userID: String,
300 stopMillis: Long): List[ResourceEvent] = {
301 _resourceEvents.filter { case ev ⇒
302 ev.userID == userID &&
303 ev.isOccurredOrReceivedWithinMillis(startMillis, stopMillis)
304 }.toList sortWith { case (ev1, ev2) ⇒ ev1.occurredMillis <= ev2.occurredMillis }
306 //- ResourceEventStore
309 def createIMEventFromJson(json: String) = {
310 StdIMEvent.fromJsonString(json)
313 def createIMEventFromOther(event: IMEventModel) = {
314 StdIMEvent.fromOther(event)
317 def pingIMEventStore(): Unit = {
321 def insertIMEvent(event: IMEventModel) = {
322 val localEvent = createIMEventFromOther(event)
323 imEventById += (event.id -> localEvent)
327 def findIMEventById(id: String) = imEventById.get(id)
329 def findLatestIMEventByUserID(userID: String): Option[IMEvent] = {
330 imEventById.valuesIterator.filter(_.userID == userID).toList.sortWith {
332 us1.occurredMillis > us2.occurredMillis
342 def findFirstIsActiveIMEventByUserID(userID: String): Option[IMEvent] = {
343 imEventById.valuesIterator.filter { case ev ⇒
344 ev.userID == userID && ev.isActive
345 }.toList.sortWith { case (ev1, ev2) ⇒
346 ev1.occurredMillis <= ev2.occurredMillis
357 def loadPolicyEntriesAfter(after: Long) =
358 _policyEntries.filter(p => p.validFrom > after)
359 .sortWith((a,b) => a.validFrom < b.validFrom)
361 def storePolicyEntry(policy: PolicyEntry) = {_policyEntries = policy :: _policyEntries; Just(RecordID(policy.id))}
363 def updatePolicyEntry(policy: PolicyEntry) =
364 _policyEntries = _policyEntries.foldLeft(List[PolicyEntry]()){
366 if (p.id == policy.id)
372 def findPolicyEntry(id: String) = _policyEntries.find(p => p.id == id) match {
373 case Some(x) => Just(x)
379 final def isLocalIMEvent(event: IMEventModel) = event match {
380 case _: MemIMEvent ⇒ true