9726383d11b4a9d18e8d03247b7a5c6d3c41434d
[aquarium] / src / main / scala / gr / grnet / aquarium / store / memory / MemStore.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.memory
37
38 import com.ckkloverdos.props.Props
39 import com.ckkloverdos.maybe.{NoVal, Just, Maybe}
40 import gr.grnet.aquarium.store._
41 import scala.collection.JavaConversions._
42 import java.util.Date
43 import collection.mutable.ConcurrentMap
44 import java.util.concurrent.ConcurrentHashMap
45 import gr.grnet.aquarium.uid.ConcurrentVMLocalUIDGenerator
46 import gr.grnet.aquarium.Configurable
47 import gr.grnet.aquarium.event.model.{WalletEntry, PolicyEntry}
48 import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
49 import org.bson.types.ObjectId
50 import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
51 import gr.grnet.aquarium.computation.UserState
52
53 /**
54  * An implementation of various stores that persists data in memory.
55  *
56  * This is just for testing purposes.
57  * 
58  * @author Christos KK Loverdos <loverdos@gmail.com>
59  * @author Georgios Gousios <gousiosg@gmail.com>
60  */
61
62 class MemStore extends UserStateStore
63   with Configurable with PolicyStore
64   with ResourceEventStore with IMEventStore
65   with WalletEntryStore
66   with StoreProvider {
67
68   override type IMEvent = MemIMEvent
69   override type ResourceEvent = MemResourceEvent
70
71   private[this] val idGen = new ConcurrentVMLocalUIDGenerator(1000)
72   
73   private[this] var _userStates     = List[UserState]()
74   private[this] var _policyEntries  = List[PolicyEntry]()
75   private[this] var _resourceEvents = List[ResourceEvent]()
76
77   private[this] val walletEntriesById: ConcurrentMap[String, WalletEntry] = new ConcurrentHashMap[String, WalletEntry]()
78   private[this] val imEventById: ConcurrentMap[String, MemIMEvent] = new ConcurrentHashMap[String, MemIMEvent]()
79
80
81   def propertyPrefix = None
82
83   def configure(props: Props) = {
84   }
85
86   override def toString = {
87     val map = Map(
88       "UserState"     -> _userStates.size,
89       "ResourceEvent" -> _resourceEvents.size,
90       "IMEvent"     -> imEventById.size,
91       "PolicyEntry"   -> _policyEntries.size,
92       "WalletEntry"   -> walletEntriesById.size
93     )
94
95     "MemStore(%s)" format map
96   }
97
98   //+ StoreProvider
99   def userStateStore = this
100
101   def resourceEventStore = this
102
103   def walletEntryStore = this
104
105   def imEventStore = this
106
107   def policyStore = this
108   //- StoreProvider
109
110
111   //+ UserStateStore
112   def insertUserState(userState: UserState): UserState = {
113     _userStates = userState.copy(_id = new ObjectId()) :: _userStates
114     userState
115   }
116
117   def findUserStateByUserID(userID: String) = {
118     _userStates.find(_.userID == userID)
119   }
120
121   def findLatestUserStateByUserID(userID: String) = {
122     val goodOnes = _userStates.filter(_.userID == userID)
123
124     goodOnes.sortWith {
125       case (us1, us2) ⇒
126         us1.occurredMillis > us2.occurredMillis
127     } match {
128       case head :: _ ⇒
129         Some(head)
130       case _ ⇒
131         None
132     }
133   }
134
135   def findLatestUserStateForEndOfBillingMonth(userID: String,
136                                               yearOfBillingMonth: Int,
137                                               billingMonth: Int): Option[UserState] = {
138     val goodOnes = _userStates.filter { userState ⇒
139         val f1 = userState.userID == userID
140         val f2 = userState.isFullBillingMonthState
141         val bm = userState.theFullBillingMonth
142         val f3 = (bm ne null) && {
143           bm.year == yearOfBillingMonth && bm.month == billingMonth
144         }
145
146         f1 && f2 && f3
147     }
148     
149     goodOnes.sortWith {
150       case (us1, us2) ⇒
151         us1.occurredMillis > us2.occurredMillis
152     } match {
153       case head :: _ ⇒
154         Some(head)
155       case _ ⇒
156         None
157     }
158   }
159
160   def deleteUserState(userId: String) {
161     _userStates.filterNot(_.userID == userId)
162   }
163   //- UserStateStore
164
165   //- WalletEntryStore
166   def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] = {
167     walletEntriesById.put(entry.id, entry)
168     Just(RecordID(entry.id))
169   }
170
171   def findWalletEntryById(id: String): Maybe[WalletEntry] = {
172     Maybe(walletEntriesById.apply(id))
173   }
174
175   def findUserWalletEntries(userId: String): List[WalletEntry] = {
176     walletEntriesById.valuesIterator.filter(_.userId == userId).toList
177   }
178
179   def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date): List[WalletEntry] = {
180     walletEntriesById.valuesIterator.filter { we ⇒
181       val receivedDate = we.receivedDate
182
183       we.userId == userId &&
184       ( (from before receivedDate) || (from == receivedDate) ) &&
185       ( (to   after  receivedDate) || (to   == receivedDate) )
186       true
187     }.toList
188   }
189
190   def findLatestUserWalletEntries(userId: String): Maybe[List[WalletEntry]] = NoVal
191
192   def findPreviousEntry(userId: String,
193                         resource: String,
194                         instanceId: String,
195                         finalized: Option[Boolean]): List[WalletEntry] = Nil
196
197   def findWalletEntriesAfter(userID: String, from: Date): List[WalletEntry] = {
198     walletEntriesById.valuesIterator.filter { we ⇒
199       val occurredDate = we.occurredDate
200
201       we.userId == userID &&
202             ( (from before occurredDate) || (from == occurredDate) )
203     }.toList
204   }
205   //- WalletEntryStore
206
207   //+ ResourceEventStore
208   def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent = {
209     if(event.isInstanceOf[MemResourceEvent]) event.asInstanceOf[MemResourceEvent]
210     else {
211       import event._
212       new StdResourceEvent(
213         id,
214         occurredMillis,
215         receivedMillis,
216         userID,
217         clientID,
218         resource,
219         instanceID,
220         value,
221         eventVersion,
222         details
223       ): MemResourceEvent
224     }
225   }
226
227   override def clearResourceEvents() = {
228     _resourceEvents = Nil
229   }
230
231   def pingResourceEventStore(): Unit = {
232     // We are always live and kicking...
233   }
234
235   def insertResourceEvent(event: ResourceEventModel) = {
236     val localEvent = createResourceEventFromOther(event)
237     _resourceEvents ::= localEvent
238     localEvent
239   }
240
241   def findResourceEventById(id: String) = {
242     _resourceEvents.find(ev ⇒ ev.id == id)
243   }
244
245   def findResourceEventsByUserId(userId: String)
246                                 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
247     val byUserId = _resourceEvents.filter(_.userID == userId).toArray
248     val sorted = sortWith match {
249       case Some(sorter) ⇒
250         byUserId.sortWith(sorter)
251       case None ⇒
252         byUserId
253     }
254
255     sorted.toList
256   }
257
258   def findResourceEventsByUserIdAfterTimestamp(userID: String, timestamp: Long): List[ResourceEvent] = {
259     _resourceEvents.filter { ev ⇒
260       ev.userID == userID &&
261       (ev.occurredMillis > timestamp)
262     }.toList
263   }
264
265   def findResourceEventHistory(userId: String,
266                                resName: String,
267                                instid: Option[String],
268                                upTo: Long): List[ResourceEvent] = {
269     Nil
270   }
271
272   def findResourceEventsForReceivedPeriod(userID: String,
273                                           startTimeMillis: Long,
274                                           stopTimeMillis: Long): List[ResourceEvent] = {
275     _resourceEvents.filter { ev ⇒
276       ev.userID == userID &&
277       ev.isReceivedWithinMillis(startTimeMillis, stopTimeMillis)
278     }.toList
279   }
280
281   def countOutOfSyncEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
282     _resourceEvents.filter { case ev ⇒
283       ev.userID == userID &&
284       // out of sync events are those that were received in the billing month but occurred in previous (or next?)
285       // months
286       ev.isOutOfSyncForBillingPeriod(startMillis, stopMillis)
287     }.size.toLong
288   }
289
290   /**
291    * Finds all relevant resource events for the billing period.
292    * The relevant events are those:
293    * a) whose `occurredMillis` is within the given billing period or
294    * b) whose `receivedMillis` is within the given billing period.
295    *
296    * Order them by `occurredMillis`
297    */
298   override def findAllRelevantResourceEventsForBillingPeriod(userID: String,
299                                                              startMillis: Long,
300                                                              stopMillis: Long): List[ResourceEvent] = {
301     _resourceEvents.filter { case ev ⇒
302       ev.userID == userID &&
303       ev.isOccurredOrReceivedWithinMillis(startMillis, stopMillis)
304     }.toList sortWith { case (ev1, ev2) ⇒ ev1.occurredMillis <= ev2.occurredMillis }
305   }
306   //- ResourceEventStore
307
308   //+ IMEventStore
309   def createIMEventFromJson(json: String) = {
310     StdIMEvent.fromJsonString(json)
311   }
312
313   def createIMEventFromOther(event: IMEventModel) = {
314     StdIMEvent.fromOther(event)
315   }
316
317   def pingIMEventStore(): Unit = {
318   }
319
320
321   def insertIMEvent(event: IMEventModel) = {
322     val localEvent = createIMEventFromOther(event)
323     imEventById += (event.id -> localEvent)
324     localEvent
325   }
326
327   def findIMEventById(id: String) = imEventById.get(id)
328
329   def findLatestIMEventByUserID(userID: String): Option[IMEvent] = {
330     imEventById.valuesIterator.filter(_.userID == userID).toList.sortWith {
331       case (us1, us2) ⇒
332         us1.occurredMillis > us2.occurredMillis
333     } match {
334       case head :: _ ⇒
335         Some(head)
336
337       case _ ⇒
338         None
339     }
340   }
341
342   def findFirstIsActiveIMEventByUserID(userID: String): Option[IMEvent] = {
343     imEventById.valuesIterator.filter { case ev ⇒
344       ev.userID == userID && ev.isActive
345     }.toList.sortWith { case (ev1, ev2) ⇒
346       ev1.occurredMillis <= ev2.occurredMillis
347     } match {
348       case head :: _ ⇒
349         Some(head)
350
351       case _ ⇒
352         None
353     }
354   }
355   //- IMEventStore
356
357   def loadPolicyEntriesAfter(after: Long) =
358     _policyEntries.filter(p => p.validFrom > after)
359             .sortWith((a,b) => a.validFrom < b.validFrom)
360
361   def storePolicyEntry(policy: PolicyEntry) = {_policyEntries = policy :: _policyEntries; Just(RecordID(policy.id))}
362
363   def updatePolicyEntry(policy: PolicyEntry) =
364     _policyEntries = _policyEntries.foldLeft(List[PolicyEntry]()){
365       (acc, p) =>
366         if (p.id == policy.id)
367           policy :: acc
368         else
369           p :: acc
370   }
371
372   def findPolicyEntry(id: String) = _policyEntries.find(p => p.id == id) match {
373     case Some(x) => Just(x)
374     case None => NoVal
375   }
376 }
377
378 object MemStore {
379   final def isLocalIMEvent(event: IMEventModel) = event match {
380     case _: MemIMEvent ⇒ true
381     case _ ⇒ false
382   }
383 }