Simplify the types of stored things
[aquarium] / src / main / scala / gr / grnet / aquarium / store / memory / MemStoreProvider.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.memory
37
38 import com.ckkloverdos.props.Props
39 import gr.grnet.aquarium.store._
40 import scala.collection.JavaConversions._
41 import collection.mutable.ConcurrentMap
42 import java.util.concurrent.ConcurrentHashMap
43 import gr.grnet.aquarium.Configurable
44 import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
45 import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
46 import gr.grnet.aquarium.util.{Loggable, Tags}
47 import gr.grnet.aquarium.computation.BillingMonthInfo
48 import gr.grnet.aquarium.policy.{PolicyModel, StdPolicy}
49 import collection.immutable.SortedMap
50 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
51 import collection.immutable
52 import java.util.Date
53 import gr.grnet.aquarium.charging.state.{UserStateModel, StdUserState}
54
55 /**
56  * An implementation of various stores that persists parts in memory.
57  *
58  * This is just for testing purposes.
59  * 
60  * @author Christos KK Loverdos <loverdos@gmail.com>
61  * @author Georgios Gousios <gousiosg@gmail.com>
62  * @author Prodromos Gerakios <grnet.gr>
63  */
64
65 class MemStoreProvider
66 extends StoreProvider
67    with UserStateStore
68    with Configurable
69    with PolicyStore
70    with ResourceEventStore
71    with IMEventStore
72    with Loggable {
73
74   private[this] var _userStates = List[UserStateModel]()
75   private[this] var _policies  = List[PolicyModel]()
76   private[this] var _resourceEvents = List[ResourceEventModel]()
77
78   private[this] val imEventById: ConcurrentMap[String, MemIMEvent] = new ConcurrentHashMap[String, MemIMEvent]()
79
80
81   def propertyPrefix = None
82
83   def configure(props: Props) = {
84   }
85
86   override def toString = {
87     val map = Map(
88       Tags.UserStateTag     -> _userStates.size,
89       Tags.ResourceEventTag -> _resourceEvents.size,
90       Tags.IMEventTag       -> imEventById.size,
91       "PolicyEntry"         -> _policies.size
92     )
93
94     "MemStoreProvider(%s)" format map
95   }
96
97   //+ StoreProvider
98   def userStateStore = this
99
100   def resourceEventStore = this
101
102   def imEventStore = this
103
104   def policyStore = this
105   //- StoreProvider
106
107
108   //+ UserStateStore
109   def createUserStateFromOther(model: UserStateModel): UserStateModel = {
110     logger.info("createUserStateFromOther(%s)".format(model))
111
112     if(model.isInstanceOf[StdUserState]) {
113       model.asInstanceOf[StdUserState]
114     }
115     else {
116       new StdUserState(
117         model.id,
118         model.parentIDInStore,
119         model.userID,
120         model.occurredMillis,
121         model.latestResourceEventOccurredMillis,
122         model.totalCredits,
123         model.isFullBillingMonth,
124         model.billingYear,
125         model.billingMonth,
126         model.stateOfResources,
127         model.billingPeriodOutOfSyncResourceEventsCounter,
128         model.agreementHistory,
129         model.walletEntries
130       )
131     }
132   }
133
134   def insertUserState(userState: UserStateModel): UserStateModel = {
135     val localUserState = createUserStateFromOther(userState)
136     _userStates ::= localUserState
137     localUserState
138   }
139
140   def findUserStateByUserID(userID: String) = {
141     _userStates.find(_.userID == userID)
142   }
143
144   def findLatestUserStateForFullMonthBilling(userID: String, bmi: BillingMonthInfo): Option[UserStateModel] = {
145     val goodOnes = _userStates.filter { userState ⇒
146       userState.userID == userID &&
147       userState.isFullBillingMonth &&
148       userState.billingYear == bmi.year &&
149       userState.billingMonth == bmi.month
150     }
151     
152     goodOnes.sortWith {
153       case (us1, us2) ⇒
154         us1.occurredMillis > us2.occurredMillis
155     } match {
156       case head :: _ ⇒
157         Some(head)
158       case _ ⇒
159         None
160     }
161   }
162   //- UserStateStore
163
164   //+ ResourceEventStore
165   def createResourceEventFromOther(event: ResourceEventModel): ResourceEventModel = {
166     if(event.isInstanceOf[MemResourceEvent]) event.asInstanceOf[MemResourceEvent]
167     else {
168       import event._
169       new StdResourceEvent(
170         id,
171         occurredMillis,
172         receivedMillis,
173         userID,
174         clientID,
175         resource,
176         instanceID,
177         value,
178         eventVersion,
179         details
180       ): MemResourceEvent
181     }
182   }
183
184   override def clearResourceEvents() = {
185     _resourceEvents = Nil
186   }
187
188   def pingResourceEventStore(): Unit = {
189     // We are always live and kicking...
190   }
191
192   def insertResourceEvent(event: ResourceEventModel) = {
193     val localEvent = createResourceEventFromOther(event)
194     _resourceEvents ::= localEvent
195     localEvent
196   }
197
198   def findResourceEventByID(id: String) = {
199     _resourceEvents.find(ev ⇒ ev.id == id)
200   }
201
202   def findResourceEventsByUserID(userId: String)
203                                 (sortWith: Option[(ResourceEventModel, ResourceEventModel) => Boolean]): List[ResourceEventModel] = {
204     val byUserId = _resourceEvents.filter(_.userID == userId).toArray
205     val sorted = sortWith match {
206       case Some(sorter) ⇒
207         byUserId.sortWith(sorter)
208       case None ⇒
209         byUserId
210     }
211
212     sorted.toList
213   }
214
215   def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
216     _resourceEvents.filter { case ev ⇒
217       ev.userID == userID &&
218       // out of sync events are those that were received in the billing month but occurred in previous (or next?)
219       // months
220       ev.isOutOfSyncForBillingPeriod(startMillis, stopMillis)
221     }.size.toLong
222   }
223   //- ResourceEventStore
224
225   def foreachResourceEventOccurredInPeriod(
226       userID: String,
227       startMillis: Long,
228       stopMillis: Long
229   )(f: ResourceEventModel ⇒ Unit): Unit = {
230     _resourceEvents.filter { case ev ⇒
231       ev.userID == userID &&
232       ev.isOccurredWithinMillis(startMillis, stopMillis)
233     }.foreach(f)
234   }
235
236   //+ IMEventStore
237   def createIMEventFromJson(json: String) = {
238     StdIMEvent.fromJsonString(json)
239   }
240
241   def createIMEventFromOther(event: IMEventModel) = {
242     StdIMEvent.fromOther(event)
243   }
244
245   def pingIMEventStore(): Unit = {
246   }
247
248
249   def insertIMEvent(event: IMEventModel) = {
250     val localEvent = createIMEventFromOther(event)
251     imEventById += (event.id -> localEvent)
252     localEvent
253   }
254
255   def findIMEventByID(id: String) = imEventById.get(id)
256
257
258   /**
259    * Find the `CREATE` even for the given user. Note that there must be only one such event.
260    */
261   def findCreateIMEventByUserID(userID: String): Option[IMEventModel] = {
262     imEventById.valuesIterator.filter { e ⇒
263       e.userID == userID && e.isCreateUser
264     }.toList.sortWith { case (e1, e2) ⇒
265       e1.occurredMillis < e2.occurredMillis
266     } headOption
267   }
268
269   def findLatestIMEventByUserID(userID: String): Option[IMEventModel] = {
270     imEventById.valuesIterator.filter(_.userID == userID).toList.sortWith {
271       case (us1, us2) ⇒
272         us1.occurredMillis > us2.occurredMillis
273     } headOption
274   }
275
276   /**
277    * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
278    * the given function `f`.
279    *
280    * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
281    */
282   def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEventModel) => Unit) = {
283     imEventById.valuesIterator.filter(_.userID == userID).toSeq.sortWith {
284       case (ev1, ev2) ⇒ ev1.occurredMillis <= ev2.occurredMillis
285     } foreach(f)
286   }
287   //- IMEventStore
288
289   /**
290    * Store an accounting policy.
291    */
292   def insertPolicy(policy: PolicyModel): PolicyModel = {
293     val localPolicy = StdPolicy(
294       id = policy.id,
295       parentID = policy.parentID,
296       validityTimespan = policy.validityTimespan,
297       resourceTypes = policy.resourceTypes,
298       chargingBehaviors = policy.chargingBehaviors,
299       roleMapping = policy.roleMapping
300     )
301     _policies = localPolicy :: _policies
302
303     localPolicy
304   }
305
306   def loadValidPolicyAt(atMillis: Long): Option[PolicyModel] = {
307     var d = new Date(atMillis)
308     /* sort in reverse order  and return the first that includes this date*/
309     _policies.sortWith({(x,y)=> y.validFrom < x.validFrom}).collectFirst({
310       case t if(t.validityTimespan.toTimeslot.includes(d)) => t
311     })
312   }
313
314   private def emptyMap = immutable.SortedMap[Timeslot, PolicyModel]()
315
316   def loadAndSortPoliciesWithin(fromMillis: Long, toMillis: Long): SortedMap[Timeslot, PolicyModel] = {
317     val range = Timeslot(fromMillis,toMillis)
318     _policies.foldLeft (emptyMap) { (map,p) =>
319       if(range.overlaps(p.validityTimespan.toTimeslot))
320         map + ((p.validityTimespan.toTimeslot,p))
321       else
322         map
323     }
324   }
325 }
326
327 object MemStoreProvider {
328   final def isLocalIMEvent(event: IMEventModel) = event match {
329     case _: MemIMEvent ⇒ true
330     case _ ⇒ false
331   }
332 }