2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.store.memory
38 import com.ckkloverdos.props.Props
39 import com.ckkloverdos.maybe.{NoVal, Just, Maybe}
40 import gr.grnet.aquarium.store._
41 import scala.collection.JavaConversions._
43 import collection.mutable.ConcurrentMap
44 import java.util.concurrent.ConcurrentHashMap
45 import gr.grnet.aquarium.user.UserState
46 import gr.grnet.aquarium.simulation.uid.ConcurrentVMLocalUIDGenerator
47 import gr.grnet.aquarium.{AquariumException, Configurable}
48 import gr.grnet.aquarium.events.{IMEvent, WalletEntry, ResourceEvent, PolicyEntry}
51 * An implementation of various stores that persists data in memory.
53 * This is just for testing purposes.
55 * @author Christos KK Loverdos <loverdos@gmail.com>
56 * @author Georgios Gousios <gousiosg@gmail.com>
59 class MemStore extends UserStateStore
60 with Configurable with PolicyStore
61 with ResourceEventStore with IMEventStore
65 private[this] val idGen = new ConcurrentVMLocalUIDGenerator(1000)
67 private[this] var _userStates = List[UserState]()
68 private[this] var _policyEntries = List[PolicyEntry]()
69 private[this] var _resourceEvents = List[ResourceEvent]()
71 private[this] val walletEntriesById: ConcurrentMap[String, WalletEntry] = new ConcurrentHashMap[String, WalletEntry]()
72 private[this] val imEventById: ConcurrentMap[String, IMEvent] = new ConcurrentHashMap[String, IMEvent]()
74 def configure(props: Props) = {
77 override def toString = {
79 "UserState" -> _userStates.size,
80 "ResourceEvent" -> _resourceEvents.size,
81 "IMEvent" -> imEventById.size,
82 "PolicyEntry" -> _policyEntries.size,
83 "WalletEntry" -> walletEntriesById.size
86 "MemStore(%s)" format map
90 def userStateStore = this
92 def resourceEventStore = this
94 def walletEntryStore = this
96 def imEventStore = this
98 def policyStore = this
103 def storeUserState(userState: UserState): Maybe[RecordID] = {
104 _userStates = userState.copy(id = idGen.nextUID()) :: _userStates
105 Just(RecordID(_userStates.head._id))
108 def findUserStateByUserId(userId: String) = {
109 _userStates.find(_.userId == userId) match {
110 case Some(userState) ⇒
117 def findLatestUserStateForEndOfBillingMonth(userId: String,
118 yearOfBillingMonth: Int,
119 billingMonth: Int): Maybe[UserState] = {
120 val goodOnes = _userStates.filter { userState ⇒
121 val f1 = userState.userId == userId
122 val f2 = userState.isFullBillingMonthState
123 val bm = userState.theFullBillingMonth
124 val f3 = (bm ne null) && {
125 bm.year == yearOfBillingMonth && bm.month == billingMonth
133 us1.oldestSnapshotTime > us2.oldestSnapshotTime
142 def deleteUserState(userId: String) {
143 _userStates.filterNot(_.userId == userId)
148 def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] = {
149 walletEntriesById.put(entry.id, entry)
150 Just(RecordID(entry.id))
153 def findWalletEntryById(id: String): Maybe[WalletEntry] = {
154 Maybe(walletEntriesById.apply(id))
157 def findUserWalletEntries(userId: String): List[WalletEntry] = {
158 walletEntriesById.valuesIterator.filter(_.userId == userId).toList
161 def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date): List[WalletEntry] = {
162 walletEntriesById.valuesIterator.filter { we ⇒
163 val receivedDate = we.receivedDate
165 we.userId == userId &&
166 ( (from before receivedDate) || (from == receivedDate) ) &&
167 ( (to after receivedDate) || (to == receivedDate) )
172 def findLatestUserWalletEntries(userId: String): Maybe[List[WalletEntry]] = NoVal
174 def findPreviousEntry(userId: String,
177 finalized: Option[Boolean]): List[WalletEntry] = Nil
179 def findWalletEntriesAfter(userId: String, from: Date): List[WalletEntry] = {
180 walletEntriesById.valuesIterator.filter { we ⇒
181 val occurredDate = we.occurredDate
183 we.userId == userId &&
184 ( (from before occurredDate) || (from == occurredDate) )
189 //+ ResourceEventStore
191 override def clearResourceEvents() = {
192 _resourceEvents = Nil
195 def storeResourceEvent(event: ResourceEvent) = {
196 _resourceEvents ::= event
197 Just(RecordID(event.id))
200 def findResourceEventById(id: String) = {
201 _resourceEvents.find(ev ⇒ ev.id == id) match {
202 case Some(ev) ⇒ Just(ev)
207 def findResourceEventsByUserId(userId: String)
208 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
209 val byUserId = _resourceEvents.filter(_.userID == userId).toArray
210 val sorted = sortWith match {
212 byUserId.sortWith(sorter)
220 def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
221 _resourceEvents.filter { ev ⇒
222 ev.userID == userId &&
223 (ev.occurredMillis > timestamp)
227 def findResourceEventHistory(userId: String,
229 instid: Option[String],
230 upTo: Long): List[ResourceEvent] = {
234 def findResourceEventsForReceivedPeriod(userId: String,
235 startTimeMillis: Long,
236 stopTimeMillis: Long): List[ResourceEvent] = {
237 _resourceEvents.filter { ev ⇒
238 ev.userID == userId &&
239 ev.isReceivedWithinMillis(startTimeMillis, stopTimeMillis)
243 def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Maybe[Long] = Maybe {
244 _resourceEvents.filter { case ev ⇒
245 // out of sync events are those that were received in the billing month but occurred in previous (or next?)
247 ev.isOutOfSyncForBillingPeriod(startMillis, stopMillis)
252 * Finds all relevant resource events for the billing period.
253 * The relevant events are those:
254 * a) whose `occurredMillis` is within the given billing period or
255 * b) whose `receivedMillis` is within the given billing period.
257 * Order them by `occurredMillis`
259 override def findAllRelevantResourceEventsForBillingPeriod(userId: String,
261 stopMillis: Long): List[ResourceEvent] = {
262 _resourceEvents.filter { case ev ⇒
263 ev.isOccurredOrReceivedWithinMillis(startMillis, stopMillis)
264 }.toList sortWith { case (ev1, ev2) ⇒ ev1.occurredMillis <= ev2.occurredMillis }
266 //- ResourceEventStore
269 def storeUnparsed(json: String) = throw new AquariumException("Not implemented")
271 def storeIMEvent(event: IMEvent) = {imEventById += (event.id -> event); Just(RecordID(event.id))}
273 def findIMEventById(id: String) = Maybe{imEventById.getOrElse(id, null)}
275 def findIMEventsByUserId(userId: String) = imEventById.valuesIterator.filter{v => v.userID == userId}.toList
278 def loadPolicyEntriesAfter(after: Long) =
279 _policyEntries.filter(p => p.validFrom > after)
280 .sortWith((a,b) => a.validFrom < b.validFrom)
282 def storePolicyEntry(policy: PolicyEntry) = {_policyEntries = policy :: _policyEntries; Just(RecordID(policy.id))}
284 def updatePolicyEntry(policy: PolicyEntry) =
285 _policyEntries = _policyEntries.foldLeft(List[PolicyEntry]()){
287 if (p.id == policy.id)
293 def findPolicyEntry(id: String) = _policyEntries.find(p => p.id == id) match {
294 case Some(x) => Just(x)