WIP: ResourceEvent-related refactorings
[aquarium] / src / main / scala / gr / grnet / aquarium / store / memory / MemStore.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.memory
37
38 import com.ckkloverdos.props.Props
39 import com.ckkloverdos.maybe.{NoVal, Just, Maybe}
40 import gr.grnet.aquarium.store._
41 import scala.collection.JavaConversions._
42 import java.util.Date
43 import collection.mutable.ConcurrentMap
44 import java.util.concurrent.ConcurrentHashMap
45 import gr.grnet.aquarium.user.UserState
46 import gr.grnet.aquarium.uid.ConcurrentVMLocalUIDGenerator
47 import gr.grnet.aquarium.{AquariumException, Configurable}
48 import gr.grnet.aquarium.event.{WalletEntry, PolicyEntry}
49 import gr.grnet.aquarium.converter.JsonTextFormat
50 import gr.grnet.aquarium.util._
51 import gr.grnet.aquarium.event.im.{StdIMEvent, IMEventModel}
52 import org.bson.types.ObjectId
53 import gr.grnet.aquarium.event.resource.{StdResourceEvent, ResourceEventModel}
54
55 /**
56  * An implementation of various stores that persists data in memory.
57  *
58  * This is just for testing purposes.
59  * 
60  * @author Christos KK Loverdos <loverdos@gmail.com>
61  * @author Georgios Gousios <gousiosg@gmail.com>
62  */
63
64 class MemStore extends UserStateStore
65   with Configurable with PolicyStore
66   with ResourceEventStore with IMEventStore
67   with WalletEntryStore
68   with StoreProvider {
69
70   override type IMEvent = MemIMEvent
71   override type ResourceEvent = MemResourceEvent
72
73   private[this] val idGen = new ConcurrentVMLocalUIDGenerator(1000)
74   
75   private[this] var _userStates     = List[UserState]()
76   private[this] var _policyEntries  = List[PolicyEntry]()
77   private[this] var _resourceEvents = List[ResourceEvent]()
78
79   private[this] val walletEntriesById: ConcurrentMap[String, WalletEntry] = new ConcurrentHashMap[String, WalletEntry]()
80   private[this] val imEventById: ConcurrentMap[String, MemIMEvent] = new ConcurrentHashMap[String, MemIMEvent]()
81
82   def configure(props: Props) = {
83   }
84
85   override def toString = {
86     val map = Map(
87       "UserState"     -> _userStates.size,
88       "ResourceEvent" -> _resourceEvents.size,
89       "IMEvent"     -> imEventById.size,
90       "PolicyEntry"   -> _policyEntries.size,
91       "WalletEntry"   -> walletEntriesById.size
92     )
93
94     "MemStore(%s)" format map
95   }
96
97   //+ StoreProvider
98   def userStateStore = this
99
100   def resourceEventStore = this
101
102   def walletEntryStore = this
103
104   def imEventStore = this
105
106   def policyStore = this
107   //- StoreProvider
108
109
110   //+ UserStateStore
111   def insertUserState(userState: UserState): UserState = {
112     _userStates = userState.copy(_id = new ObjectId()) :: _userStates
113     userState
114   }
115
116   def findUserStateByUserID(userID: String) = {
117     _userStates.find(_.userID == userID)
118   }
119
120   def findLatestUserStateByUserID(userID: String) = {
121     val goodOnes = _userStates.filter(_.userID == userID)
122
123     goodOnes.sortWith {
124       case (us1, us2) ⇒
125         us1.oldestSnapshotTime > us2.oldestSnapshotTime
126     } match {
127       case head :: _ ⇒
128         Some(head)
129       case _ ⇒
130         None
131     }
132   }
133
134   def findLatestUserStateForEndOfBillingMonth(userID: String,
135                                               yearOfBillingMonth: Int,
136                                               billingMonth: Int): Option[UserState] = {
137     val goodOnes = _userStates.filter { userState ⇒
138         val f1 = userState.userID == userID
139         val f2 = userState.isFullBillingMonthState
140         val bm = userState.theFullBillingMonth
141         val f3 = (bm ne null) && {
142           bm.year == yearOfBillingMonth && bm.month == billingMonth
143         }
144
145         f1 && f2 && f3
146     }
147     
148     goodOnes.sortWith {
149       case (us1, us2) ⇒
150         us1.oldestSnapshotTime > us2.oldestSnapshotTime
151     } match {
152       case head :: _ ⇒
153         Some(head)
154       case _ ⇒
155         None
156     }
157   }
158
159   def deleteUserState(userId: String) {
160     _userStates.filterNot(_.userID == userId)
161   }
162   //- UserStateStore
163
164   //- WalletEntryStore
165   def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] = {
166     walletEntriesById.put(entry.id, entry)
167     Just(RecordID(entry.id))
168   }
169
170   def findWalletEntryById(id: String): Maybe[WalletEntry] = {
171     Maybe(walletEntriesById.apply(id))
172   }
173
174   def findUserWalletEntries(userId: String): List[WalletEntry] = {
175     walletEntriesById.valuesIterator.filter(_.userId == userId).toList
176   }
177
178   def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date): List[WalletEntry] = {
179     walletEntriesById.valuesIterator.filter { we ⇒
180       val receivedDate = we.receivedDate
181
182       we.userId == userId &&
183       ( (from before receivedDate) || (from == receivedDate) ) &&
184       ( (to   after  receivedDate) || (to   == receivedDate) )
185       true
186     }.toList
187   }
188
189   def findLatestUserWalletEntries(userId: String): Maybe[List[WalletEntry]] = NoVal
190
191   def findPreviousEntry(userId: String,
192                         resource: String,
193                         instanceId: String,
194                         finalized: Option[Boolean]): List[WalletEntry] = Nil
195
196   def findWalletEntriesAfter(userId: String, from: Date): List[WalletEntry] = {
197     walletEntriesById.valuesIterator.filter { we ⇒
198       val occurredDate = we.occurredDate
199
200       we.userId == userId &&
201             ( (from before occurredDate) || (from == occurredDate) )
202     }.toList
203   }
204   //- WalletEntryStore
205
206   //+ ResourceEventStore
207   def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent = {
208     if(event.isInstanceOf[MemResourceEvent]) event.asInstanceOf[MemResourceEvent]
209     else {
210       import event._
211       new StdResourceEvent(
212         id,
213         occurredMillis,
214         receivedMillis,
215         userID,
216         clientID,
217         resource,
218         instanceID,
219         value,
220         eventVersion,
221         details
222       ): MemResourceEvent
223     }
224   }
225
226   override def clearResourceEvents() = {
227     _resourceEvents = Nil
228   }
229
230   def insertResourceEvent(event: ResourceEventModel) = {
231     val localEvent = createResourceEventFromOther(event)
232     _resourceEvents ::= localEvent
233     localEvent
234   }
235
236   def findResourceEventById(id: String) = {
237     _resourceEvents.find(ev ⇒ ev.id == id)
238   }
239
240   def findResourceEventsByUserId(userId: String)
241                                 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
242     val byUserId = _resourceEvents.filter(_.userID == userId).toArray
243     val sorted = sortWith match {
244       case Some(sorter) ⇒
245         byUserId.sortWith(sorter)
246       case None ⇒
247         byUserId
248     }
249
250     sorted.toList
251   }
252
253   def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
254     _resourceEvents.filter { ev ⇒
255       ev.userID == userId &&
256       (ev.occurredMillis > timestamp)
257     }.toList
258   }
259
260   def findResourceEventHistory(userId: String,
261                                resName: String,
262                                instid: Option[String],
263                                upTo: Long): List[ResourceEvent] = {
264     Nil
265   }
266
267   def findResourceEventsForReceivedPeriod(userId: String,
268                                           startTimeMillis: Long,
269                                           stopTimeMillis: Long): List[ResourceEvent] = {
270     _resourceEvents.filter { ev ⇒
271       ev.userID == userId &&
272       ev.isReceivedWithinMillis(startTimeMillis, stopTimeMillis)
273     }.toList
274   }
275
276   def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Long = {
277     _resourceEvents.filter { case ev ⇒
278       // out of sync events are those that were received in the billing month but occurred in previous (or next?)
279       // months
280       ev.isOutOfSyncForBillingPeriod(startMillis, stopMillis)
281     }.size.toLong
282   }
283
284   /**
285    * Finds all relevant resource events for the billing period.
286    * The relevant events are those:
287    * a) whose `occurredMillis` is within the given billing period or
288    * b) whose `receivedMillis` is within the given billing period.
289    *
290    * Order them by `occurredMillis`
291    */
292   override def findAllRelevantResourceEventsForBillingPeriod(userId: String,
293                                                              startMillis: Long,
294                                                              stopMillis: Long): List[ResourceEvent] = {
295     _resourceEvents.filter { case ev ⇒
296       ev.isOccurredOrReceivedWithinMillis(startMillis, stopMillis)
297     }.toList sortWith { case (ev1, ev2) ⇒ ev1.occurredMillis <= ev2.occurredMillis }
298   }
299   //- ResourceEventStore
300
301   //+ IMEventStore
302   def createIMEventFromJson(json: String) = {
303     StdIMEvent.fromJsonString(json)
304   }
305
306   def createIMEventFromOther(event: IMEventModel) = {
307     StdIMEvent.fromOther(event)
308   }
309
310   def insertIMEvent(event: IMEventModel) = {
311     val localEvent = createIMEventFromOther(event)
312     imEventById += (event.id -> localEvent)
313     localEvent
314   }
315
316   def findIMEventById(id: String) = imEventById.get(id)
317   //- IMEventStore
318
319   def loadPolicyEntriesAfter(after: Long) =
320     _policyEntries.filter(p => p.validFrom > after)
321             .sortWith((a,b) => a.validFrom < b.validFrom)
322
323   def storePolicyEntry(policy: PolicyEntry) = {_policyEntries = policy :: _policyEntries; Just(RecordID(policy.id))}
324
325   def updatePolicyEntry(policy: PolicyEntry) =
326     _policyEntries = _policyEntries.foldLeft(List[PolicyEntry]()){
327       (acc, p) =>
328         if (p.id == policy.id)
329           policy :: acc
330         else
331           p :: acc
332   }
333
334   def findPolicyEntry(id: String) = _policyEntries.find(p => p.id == id) match {
335     case Some(x) => Just(x)
336     case None => NoVal
337   }
338 }
339
340 object MemStore {
341   final def isLocalIMEvent(event: IMEventModel) = event match {
342     case _: MemIMEvent ⇒ true
343     case _ ⇒ false
344   }
345 }