Tweeking configuration
[aquarium] / src / main / scala / gr / grnet / aquarium / store / memory / MemStore.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.memory
37
38 import com.ckkloverdos.props.Props
39 import com.ckkloverdos.maybe.{NoVal, Just, Maybe}
40 import gr.grnet.aquarium.store._
41 import scala.collection.JavaConversions._
42 import java.util.Date
43 import collection.mutable.ConcurrentMap
44 import java.util.concurrent.ConcurrentHashMap
45 import gr.grnet.aquarium.uid.ConcurrentVMLocalUIDGenerator
46 import gr.grnet.aquarium.Configurable
47 import gr.grnet.aquarium.event.{WalletEntry, PolicyEntry}
48 import gr.grnet.aquarium.event.im.{StdIMEvent, IMEventModel}
49 import org.bson.types.ObjectId
50 import gr.grnet.aquarium.event.resource.{StdResourceEvent, ResourceEventModel}
51 import gr.grnet.aquarium.computation.UserState
52
53 /**
54  * An implementation of various stores that persists data in memory.
55  *
56  * This is just for testing purposes.
57  * 
58  * @author Christos KK Loverdos <loverdos@gmail.com>
59  * @author Georgios Gousios <gousiosg@gmail.com>
60  */
61
62 class MemStore extends UserStateStore
63   with Configurable with PolicyStore
64   with ResourceEventStore with IMEventStore
65   with WalletEntryStore
66   with StoreProvider {
67
68   override type IMEvent = MemIMEvent
69   override type ResourceEvent = MemResourceEvent
70
71   private[this] val idGen = new ConcurrentVMLocalUIDGenerator(1000)
72   
73   private[this] var _userStates     = List[UserState]()
74   private[this] var _policyEntries  = List[PolicyEntry]()
75   private[this] var _resourceEvents = List[ResourceEvent]()
76
77   private[this] val walletEntriesById: ConcurrentMap[String, WalletEntry] = new ConcurrentHashMap[String, WalletEntry]()
78   private[this] val imEventById: ConcurrentMap[String, MemIMEvent] = new ConcurrentHashMap[String, MemIMEvent]()
79
80   def configure(props: Props) = {
81   }
82
83   override def toString = {
84     val map = Map(
85       "UserState"     -> _userStates.size,
86       "ResourceEvent" -> _resourceEvents.size,
87       "IMEvent"     -> imEventById.size,
88       "PolicyEntry"   -> _policyEntries.size,
89       "WalletEntry"   -> walletEntriesById.size
90     )
91
92     "MemStore(%s)" format map
93   }
94
95   //+ StoreProvider
96   def userStateStore = this
97
98   def resourceEventStore = this
99
100   def walletEntryStore = this
101
102   def imEventStore = this
103
104   def policyStore = this
105   //- StoreProvider
106
107
108   //+ UserStateStore
109   def insertUserState(userState: UserState): UserState = {
110     _userStates = userState.copy(_id = new ObjectId()) :: _userStates
111     userState
112   }
113
114   def findUserStateByUserID(userID: String) = {
115     _userStates.find(_.userID == userID)
116   }
117
118   def findLatestUserStateByUserID(userID: String) = {
119     val goodOnes = _userStates.filter(_.userID == userID)
120
121     goodOnes.sortWith {
122       case (us1, us2) ⇒
123         us1.occurredMillis > us2.occurredMillis
124     } match {
125       case head :: _ ⇒
126         Some(head)
127       case _ ⇒
128         None
129     }
130   }
131
132   def findLatestUserStateForEndOfBillingMonth(userID: String,
133                                               yearOfBillingMonth: Int,
134                                               billingMonth: Int): Option[UserState] = {
135     val goodOnes = _userStates.filter { userState ⇒
136         val f1 = userState.userID == userID
137         val f2 = userState.isFullBillingMonthState
138         val bm = userState.theFullBillingMonth
139         val f3 = (bm ne null) && {
140           bm.year == yearOfBillingMonth && bm.month == billingMonth
141         }
142
143         f1 && f2 && f3
144     }
145     
146     goodOnes.sortWith {
147       case (us1, us2) ⇒
148         us1.occurredMillis > us2.occurredMillis
149     } match {
150       case head :: _ ⇒
151         Some(head)
152       case _ ⇒
153         None
154     }
155   }
156
157   def deleteUserState(userId: String) {
158     _userStates.filterNot(_.userID == userId)
159   }
160   //- UserStateStore
161
162   //- WalletEntryStore
163   def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] = {
164     walletEntriesById.put(entry.id, entry)
165     Just(RecordID(entry.id))
166   }
167
168   def findWalletEntryById(id: String): Maybe[WalletEntry] = {
169     Maybe(walletEntriesById.apply(id))
170   }
171
172   def findUserWalletEntries(userId: String): List[WalletEntry] = {
173     walletEntriesById.valuesIterator.filter(_.userId == userId).toList
174   }
175
176   def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date): List[WalletEntry] = {
177     walletEntriesById.valuesIterator.filter { we ⇒
178       val receivedDate = we.receivedDate
179
180       we.userId == userId &&
181       ( (from before receivedDate) || (from == receivedDate) ) &&
182       ( (to   after  receivedDate) || (to   == receivedDate) )
183       true
184     }.toList
185   }
186
187   def findLatestUserWalletEntries(userId: String): Maybe[List[WalletEntry]] = NoVal
188
189   def findPreviousEntry(userId: String,
190                         resource: String,
191                         instanceId: String,
192                         finalized: Option[Boolean]): List[WalletEntry] = Nil
193
194   def findWalletEntriesAfter(userId: String, from: Date): List[WalletEntry] = {
195     walletEntriesById.valuesIterator.filter { we ⇒
196       val occurredDate = we.occurredDate
197
198       we.userId == userId &&
199             ( (from before occurredDate) || (from == occurredDate) )
200     }.toList
201   }
202   //- WalletEntryStore
203
204   //+ ResourceEventStore
205   def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent = {
206     if(event.isInstanceOf[MemResourceEvent]) event.asInstanceOf[MemResourceEvent]
207     else {
208       import event._
209       new StdResourceEvent(
210         id,
211         occurredMillis,
212         receivedMillis,
213         userID,
214         clientID,
215         resource,
216         instanceID,
217         value,
218         eventVersion,
219         details
220       ): MemResourceEvent
221     }
222   }
223
224   override def clearResourceEvents() = {
225     _resourceEvents = Nil
226   }
227
228   def insertResourceEvent(event: ResourceEventModel) = {
229     val localEvent = createResourceEventFromOther(event)
230     _resourceEvents ::= localEvent
231     localEvent
232   }
233
234   def findResourceEventById(id: String) = {
235     _resourceEvents.find(ev ⇒ ev.id == id)
236   }
237
238   def findResourceEventsByUserId(userId: String)
239                                 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
240     val byUserId = _resourceEvents.filter(_.userID == userId).toArray
241     val sorted = sortWith match {
242       case Some(sorter) ⇒
243         byUserId.sortWith(sorter)
244       case None ⇒
245         byUserId
246     }
247
248     sorted.toList
249   }
250
251   def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
252     _resourceEvents.filter { ev ⇒
253       ev.userID == userId &&
254       (ev.occurredMillis > timestamp)
255     }.toList
256   }
257
258   def findResourceEventHistory(userId: String,
259                                resName: String,
260                                instid: Option[String],
261                                upTo: Long): List[ResourceEvent] = {
262     Nil
263   }
264
265   def findResourceEventsForReceivedPeriod(userId: String,
266                                           startTimeMillis: Long,
267                                           stopTimeMillis: Long): List[ResourceEvent] = {
268     _resourceEvents.filter { ev ⇒
269       ev.userID == userId &&
270       ev.isReceivedWithinMillis(startTimeMillis, stopTimeMillis)
271     }.toList
272   }
273
274   def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Long = {
275     _resourceEvents.filter { case ev ⇒
276       // out of sync events are those that were received in the billing month but occurred in previous (or next?)
277       // months
278       ev.isOutOfSyncForBillingPeriod(startMillis, stopMillis)
279     }.size.toLong
280   }
281
282   /**
283    * Finds all relevant resource events for the billing period.
284    * The relevant events are those:
285    * a) whose `occurredMillis` is within the given billing period or
286    * b) whose `receivedMillis` is within the given billing period.
287    *
288    * Order them by `occurredMillis`
289    */
290   override def findAllRelevantResourceEventsForBillingPeriod(userId: String,
291                                                              startMillis: Long,
292                                                              stopMillis: Long): List[ResourceEvent] = {
293     _resourceEvents.filter { case ev ⇒
294       ev.isOccurredOrReceivedWithinMillis(startMillis, stopMillis)
295     }.toList sortWith { case (ev1, ev2) ⇒ ev1.occurredMillis <= ev2.occurredMillis }
296   }
297   //- ResourceEventStore
298
299   //+ IMEventStore
300   def createIMEventFromJson(json: String) = {
301     StdIMEvent.fromJsonString(json)
302   }
303
304   def createIMEventFromOther(event: IMEventModel) = {
305     StdIMEvent.fromOther(event)
306   }
307
308   def insertIMEvent(event: IMEventModel) = {
309     val localEvent = createIMEventFromOther(event)
310     imEventById += (event.id -> localEvent)
311     localEvent
312   }
313
314   def findIMEventById(id: String) = imEventById.get(id)
315   //- IMEventStore
316
317   def loadPolicyEntriesAfter(after: Long) =
318     _policyEntries.filter(p => p.validFrom > after)
319             .sortWith((a,b) => a.validFrom < b.validFrom)
320
321   def storePolicyEntry(policy: PolicyEntry) = {_policyEntries = policy :: _policyEntries; Just(RecordID(policy.id))}
322
323   def updatePolicyEntry(policy: PolicyEntry) =
324     _policyEntries = _policyEntries.foldLeft(List[PolicyEntry]()){
325       (acc, p) =>
326         if (p.id == policy.id)
327           policy :: acc
328         else
329           p :: acc
330   }
331
332   def findPolicyEntry(id: String) = _policyEntries.find(p => p.id == id) match {
333     case Some(x) => Just(x)
334     case None => NoVal
335   }
336 }
337
338 object MemStore {
339   final def isLocalIMEvent(event: IMEventModel) = event match {
340     case _: MemIMEvent ⇒ true
341     case _ ⇒ false
342   }
343 }