Make Aquarium a service. Move startup functionality to it.
[aquarium] / src / main / scala / gr / grnet / aquarium / store / memory / MemStore.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.memory
37
38 import com.ckkloverdos.props.Props
39 import com.ckkloverdos.maybe.{NoVal, Just, Maybe}
40 import gr.grnet.aquarium.store._
41 import scala.collection.JavaConversions._
42 import java.util.Date
43 import collection.mutable.ConcurrentMap
44 import java.util.concurrent.ConcurrentHashMap
45 import gr.grnet.aquarium.uid.ConcurrentVMLocalUIDGenerator
46 import gr.grnet.aquarium.Configurable
47 import gr.grnet.aquarium.event.model.{WalletEntry, PolicyEntry}
48 import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
49 import org.bson.types.ObjectId
50 import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
51 import gr.grnet.aquarium.computation.UserState
52
53 /**
54  * An implementation of various stores that persists data in memory.
55  *
56  * This is just for testing purposes.
57  * 
58  * @author Christos KK Loverdos <loverdos@gmail.com>
59  * @author Georgios Gousios <gousiosg@gmail.com>
60  */
61
62 class MemStore extends UserStateStore
63   with Configurable with PolicyStore
64   with ResourceEventStore with IMEventStore
65   with WalletEntryStore
66   with StoreProvider {
67
68   override type IMEvent = MemIMEvent
69   override type ResourceEvent = MemResourceEvent
70
71   private[this] val idGen = new ConcurrentVMLocalUIDGenerator(1000)
72   
73   private[this] var _userStates     = List[UserState]()
74   private[this] var _policyEntries  = List[PolicyEntry]()
75   private[this] var _resourceEvents = List[ResourceEvent]()
76
77   private[this] val walletEntriesById: ConcurrentMap[String, WalletEntry] = new ConcurrentHashMap[String, WalletEntry]()
78   private[this] val imEventById: ConcurrentMap[String, MemIMEvent] = new ConcurrentHashMap[String, MemIMEvent]()
79
80
81   def propertyPrefix = None
82
83   def configure(props: Props) = {
84   }
85
86   override def toString = {
87     val map = Map(
88       "UserState"     -> _userStates.size,
89       "ResourceEvent" -> _resourceEvents.size,
90       "IMEvent"     -> imEventById.size,
91       "PolicyEntry"   -> _policyEntries.size,
92       "WalletEntry"   -> walletEntriesById.size
93     )
94
95     "MemStore(%s)" format map
96   }
97
98   //+ StoreProvider
99   def userStateStore = this
100
101   def resourceEventStore = this
102
103   def walletEntryStore = this
104
105   def imEventStore = this
106
107   def policyStore = this
108   //- StoreProvider
109
110
111   //+ UserStateStore
112   def insertUserState(userState: UserState): UserState = {
113     _userStates = userState.copy(_id = new ObjectId()) :: _userStates
114     userState
115   }
116
117   def findUserStateByUserID(userID: String) = {
118     _userStates.find(_.userID == userID)
119   }
120
121   def findLatestUserStateByUserID(userID: String) = {
122     val goodOnes = _userStates.filter(_.userID == userID)
123
124     goodOnes.sortWith {
125       case (us1, us2) ⇒
126         us1.occurredMillis > us2.occurredMillis
127     } match {
128       case head :: _ ⇒
129         Some(head)
130       case _ ⇒
131         None
132     }
133   }
134
135   def findLatestUserStateForEndOfBillingMonth(userID: String,
136                                               yearOfBillingMonth: Int,
137                                               billingMonth: Int): Option[UserState] = {
138     val goodOnes = _userStates.filter { userState ⇒
139         val f1 = userState.userID == userID
140         val f2 = userState.isFullBillingMonthState
141         val bm = userState.theFullBillingMonth
142         val f3 = (bm ne null) && {
143           bm.year == yearOfBillingMonth && bm.month == billingMonth
144         }
145
146         f1 && f2 && f3
147     }
148     
149     goodOnes.sortWith {
150       case (us1, us2) ⇒
151         us1.occurredMillis > us2.occurredMillis
152     } match {
153       case head :: _ ⇒
154         Some(head)
155       case _ ⇒
156         None
157     }
158   }
159
160   def deleteUserState(userId: String) {
161     _userStates.filterNot(_.userID == userId)
162   }
163   //- UserStateStore
164
165   //- WalletEntryStore
166   def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] = {
167     walletEntriesById.put(entry.id, entry)
168     Just(RecordID(entry.id))
169   }
170
171   def findWalletEntryById(id: String): Maybe[WalletEntry] = {
172     Maybe(walletEntriesById.apply(id))
173   }
174
175   def findUserWalletEntries(userId: String): List[WalletEntry] = {
176     walletEntriesById.valuesIterator.filter(_.userId == userId).toList
177   }
178
179   def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date): List[WalletEntry] = {
180     walletEntriesById.valuesIterator.filter { we ⇒
181       val receivedDate = we.receivedDate
182
183       we.userId == userId &&
184       ( (from before receivedDate) || (from == receivedDate) ) &&
185       ( (to   after  receivedDate) || (to   == receivedDate) )
186       true
187     }.toList
188   }
189
190   def findLatestUserWalletEntries(userId: String): Maybe[List[WalletEntry]] = NoVal
191
192   def findPreviousEntry(userId: String,
193                         resource: String,
194                         instanceId: String,
195                         finalized: Option[Boolean]): List[WalletEntry] = Nil
196
197   def findWalletEntriesAfter(userId: String, from: Date): List[WalletEntry] = {
198     walletEntriesById.valuesIterator.filter { we ⇒
199       val occurredDate = we.occurredDate
200
201       we.userId == userId &&
202             ( (from before occurredDate) || (from == occurredDate) )
203     }.toList
204   }
205   //- WalletEntryStore
206
207   //+ ResourceEventStore
208   def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent = {
209     if(event.isInstanceOf[MemResourceEvent]) event.asInstanceOf[MemResourceEvent]
210     else {
211       import event._
212       new StdResourceEvent(
213         id,
214         occurredMillis,
215         receivedMillis,
216         userID,
217         clientID,
218         resource,
219         instanceID,
220         value,
221         eventVersion,
222         details
223       ): MemResourceEvent
224     }
225   }
226
227   override def clearResourceEvents() = {
228     _resourceEvents = Nil
229   }
230
231   def insertResourceEvent(event: ResourceEventModel) = {
232     val localEvent = createResourceEventFromOther(event)
233     _resourceEvents ::= localEvent
234     localEvent
235   }
236
237   def findResourceEventById(id: String) = {
238     _resourceEvents.find(ev ⇒ ev.id == id)
239   }
240
241   def findResourceEventsByUserId(userId: String)
242                                 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
243     val byUserId = _resourceEvents.filter(_.userID == userId).toArray
244     val sorted = sortWith match {
245       case Some(sorter) ⇒
246         byUserId.sortWith(sorter)
247       case None ⇒
248         byUserId
249     }
250
251     sorted.toList
252   }
253
254   def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
255     _resourceEvents.filter { ev ⇒
256       ev.userID == userId &&
257       (ev.occurredMillis > timestamp)
258     }.toList
259   }
260
261   def findResourceEventHistory(userId: String,
262                                resName: String,
263                                instid: Option[String],
264                                upTo: Long): List[ResourceEvent] = {
265     Nil
266   }
267
268   def findResourceEventsForReceivedPeriod(userId: String,
269                                           startTimeMillis: Long,
270                                           stopTimeMillis: Long): List[ResourceEvent] = {
271     _resourceEvents.filter { ev ⇒
272       ev.userID == userId &&
273       ev.isReceivedWithinMillis(startTimeMillis, stopTimeMillis)
274     }.toList
275   }
276
277   def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Long = {
278     _resourceEvents.filter { case ev ⇒
279       // out of sync events are those that were received in the billing month but occurred in previous (or next?)
280       // months
281       ev.isOutOfSyncForBillingPeriod(startMillis, stopMillis)
282     }.size.toLong
283   }
284
285   /**
286    * Finds all relevant resource events for the billing period.
287    * The relevant events are those:
288    * a) whose `occurredMillis` is within the given billing period or
289    * b) whose `receivedMillis` is within the given billing period.
290    *
291    * Order them by `occurredMillis`
292    */
293   override def findAllRelevantResourceEventsForBillingPeriod(userId: String,
294                                                              startMillis: Long,
295                                                              stopMillis: Long): List[ResourceEvent] = {
296     _resourceEvents.filter { case ev ⇒
297       ev.isOccurredOrReceivedWithinMillis(startMillis, stopMillis)
298     }.toList sortWith { case (ev1, ev2) ⇒ ev1.occurredMillis <= ev2.occurredMillis }
299   }
300   //- ResourceEventStore
301
302   //+ IMEventStore
303   def createIMEventFromJson(json: String) = {
304     StdIMEvent.fromJsonString(json)
305   }
306
307   def createIMEventFromOther(event: IMEventModel) = {
308     StdIMEvent.fromOther(event)
309   }
310
311   def insertIMEvent(event: IMEventModel) = {
312     val localEvent = createIMEventFromOther(event)
313     imEventById += (event.id -> localEvent)
314     localEvent
315   }
316
317   def findIMEventById(id: String) = imEventById.get(id)
318   //- IMEventStore
319
320   def loadPolicyEntriesAfter(after: Long) =
321     _policyEntries.filter(p => p.validFrom > after)
322             .sortWith((a,b) => a.validFrom < b.validFrom)
323
324   def storePolicyEntry(policy: PolicyEntry) = {_policyEntries = policy :: _policyEntries; Just(RecordID(policy.id))}
325
326   def updatePolicyEntry(policy: PolicyEntry) =
327     _policyEntries = _policyEntries.foldLeft(List[PolicyEntry]()){
328       (acc, p) =>
329         if (p.id == policy.id)
330           policy :: acc
331         else
332           p :: acc
333   }
334
335   def findPolicyEntry(id: String) = _policyEntries.find(p => p.id == id) match {
336     case Some(x) => Just(x)
337     case None => NoVal
338   }
339 }
340
341 object MemStore {
342   final def isLocalIMEvent(event: IMEventModel) = event match {
343     case _: MemIMEvent ⇒ true
344     case _ ⇒ false
345   }
346 }