Implement Once behavior with the new scheme. Refactor in the process
[aquarium] / src / main / scala / gr / grnet / aquarium / store / memory / MemStoreProvider.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.memory
37
38 import com.ckkloverdos.props.Props
39 import gr.grnet.aquarium.store._
40 import scala.collection.JavaConversions._
41 import collection.mutable.ConcurrentMap
42 import java.util.concurrent.ConcurrentHashMap
43 import gr.grnet.aquarium.Configurable
44 import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
45 import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
46 import gr.grnet.aquarium.util.{Loggable, Tags}
47 import gr.grnet.aquarium.computation.BillingMonthInfo
48 import gr.grnet.aquarium.policy.{PolicyModel, StdPolicy}
49 import collection.immutable.SortedMap
50 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
51 import collection.immutable
52 import java.util.Date
53 import gr.grnet.aquarium.charging.state.{UserStateModel, StdUserState}
54
55 /**
56  * An implementation of various stores that persists parts in memory.
57  *
58  * This is just for testing purposes.
59  * 
60  * @author Christos KK Loverdos <loverdos@gmail.com>
61  * @author Georgios Gousios <gousiosg@gmail.com>
62  * @author Prodromos Gerakios <grnet.gr>
63  */
64
65 class MemStoreProvider
66 extends StoreProvider
67    with UserStateStore
68    with Configurable
69    with PolicyStore
70    with ResourceEventStore
71    with IMEventStore
72    with Loggable {
73
74   override type IMEvent = MemIMEvent
75   override type ResourceEvent = MemResourceEvent
76   override type Policy = StdPolicy
77   override type UserState = StdUserState
78
79   private[this] var _userStates = List[UserState]()
80   private[this] var _policies  = List[Policy]()
81   private[this] var _resourceEvents = List[ResourceEvent]()
82
83   private[this] val imEventById: ConcurrentMap[String, MemIMEvent] = new ConcurrentHashMap[String, MemIMEvent]()
84
85
86   def propertyPrefix = None
87
88   def configure(props: Props) = {
89   }
90
91   override def toString = {
92     val map = Map(
93       Tags.UserStateTag     -> _userStates.size,
94       Tags.ResourceEventTag -> _resourceEvents.size,
95       Tags.IMEventTag       -> imEventById.size,
96       "PolicyEntry"         -> _policies.size
97     )
98
99     "MemStoreProvider(%s)" format map
100   }
101
102   //+ StoreProvider
103   def userStateStore = this
104
105   def resourceEventStore = this
106
107   def imEventStore = this
108
109   def policyStore = this
110   //- StoreProvider
111
112
113   //+ UserStateStore
114   def createUserStateFromOther(model: UserStateModel): UserState = {
115     logger.info("createUserStateFromOther(%s)".format(model))
116
117     if(model.isInstanceOf[StdUserState]) {
118       model.asInstanceOf[StdUserState]
119     }
120     else {
121       new StdUserState(
122         model.id,
123         model.parentIDInStore,
124         model.userID,
125         model.occurredMillis,
126         model.latestResourceEventOccurredMillis,
127         model.totalCredits,
128         model.isFullBillingMonth,
129         model.billingYear,
130         model.billingMonth,
131         model.chargingReason,
132         model.stateOfResources,
133         model.billingPeriodOutOfSyncResourceEventsCounter,
134         model.agreementHistory,
135         model.walletEntries
136       )
137     }
138   }
139
140   def insertUserState(userState: UserStateModel): UserState = {
141     val localUserState = createUserStateFromOther(userState)
142     _userStates ::= localUserState
143     localUserState
144   }
145
146   def findUserStateByUserID(userID: String) = {
147     _userStates.find(_.userID == userID)
148   }
149
150   def findLatestUserStateForFullMonthBilling(userID: String, bmi: BillingMonthInfo): Option[UserState] = {
151     val goodOnes = _userStates.filter { userState ⇒
152       userState.userID == userID &&
153       userState.isFullBillingMonth &&
154       userState.billingYear == bmi.year &&
155       userState.billingMonth == bmi.month
156     }
157     
158     goodOnes.sortWith {
159       case (us1, us2) ⇒
160         us1.occurredMillis > us2.occurredMillis
161     } match {
162       case head :: _ ⇒
163         Some(head)
164       case _ ⇒
165         None
166     }
167   }
168   //- UserStateStore
169
170   //+ ResourceEventStore
171   def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent = {
172     if(event.isInstanceOf[MemResourceEvent]) event.asInstanceOf[MemResourceEvent]
173     else {
174       import event._
175       new StdResourceEvent(
176         id,
177         occurredMillis,
178         receivedMillis,
179         userID,
180         clientID,
181         resource,
182         instanceID,
183         value,
184         eventVersion,
185         details
186       ): MemResourceEvent
187     }
188   }
189
190   override def clearResourceEvents() = {
191     _resourceEvents = Nil
192   }
193
194   def pingResourceEventStore(): Unit = {
195     // We are always live and kicking...
196   }
197
198   def insertResourceEvent(event: ResourceEventModel) = {
199     val localEvent = createResourceEventFromOther(event)
200     _resourceEvents ::= localEvent
201     localEvent
202   }
203
204   def findResourceEventByID(id: String) = {
205     _resourceEvents.find(ev ⇒ ev.id == id)
206   }
207
208   def findResourceEventsByUserID(userId: String)
209                                 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
210     val byUserId = _resourceEvents.filter(_.userID == userId).toArray
211     val sorted = sortWith match {
212       case Some(sorter) ⇒
213         byUserId.sortWith(sorter)
214       case None ⇒
215         byUserId
216     }
217
218     sorted.toList
219   }
220
221   def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
222     _resourceEvents.filter { case ev ⇒
223       ev.userID == userID &&
224       // out of sync events are those that were received in the billing month but occurred in previous (or next?)
225       // months
226       ev.isOutOfSyncForBillingPeriod(startMillis, stopMillis)
227     }.size.toLong
228   }
229   //- ResourceEventStore
230
231   def foreachResourceEventOccurredInPeriod(
232       userID: String,
233       startMillis: Long,
234       stopMillis: Long
235   )(f: ResourceEvent ⇒ Unit): Unit = {
236     _resourceEvents.filter { case ev ⇒
237       ev.userID == userID &&
238       ev.isOccurredWithinMillis(startMillis, stopMillis)
239     }.foreach(f)
240   }
241
242   //+ IMEventStore
243   def createIMEventFromJson(json: String) = {
244     StdIMEvent.fromJsonString(json)
245   }
246
247   def createIMEventFromOther(event: IMEventModel) = {
248     StdIMEvent.fromOther(event)
249   }
250
251   def pingIMEventStore(): Unit = {
252   }
253
254
255   def insertIMEvent(event: IMEventModel) = {
256     val localEvent = createIMEventFromOther(event)
257     imEventById += (event.id -> localEvent)
258     localEvent
259   }
260
261   def findIMEventByID(id: String) = imEventById.get(id)
262
263
264   /**
265    * Find the `CREATE` even for the given user. Note that there must be only one such event.
266    */
267   def findCreateIMEventByUserID(userID: String): Option[IMEvent] = {
268     imEventById.valuesIterator.filter { e ⇒
269       e.userID == userID && e.isCreateUser
270     }.toList.sortWith { case (e1, e2) ⇒
271       e1.occurredMillis < e2.occurredMillis
272     } headOption
273   }
274
275   def findLatestIMEventByUserID(userID: String): Option[IMEvent] = {
276     imEventById.valuesIterator.filter(_.userID == userID).toList.sortWith {
277       case (us1, us2) ⇒
278         us1.occurredMillis > us2.occurredMillis
279     } headOption
280   }
281
282   /**
283    * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
284    * the given function `f`.
285    *
286    * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
287    */
288   def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEvent) => Unit) = {
289     imEventById.valuesIterator.filter(_.userID == userID).toSeq.sortWith {
290       case (ev1, ev2) ⇒ ev1.occurredMillis <= ev2.occurredMillis
291     } foreach(f)
292   }
293   //- IMEventStore
294
295   /**
296    * Store an accounting policy.
297    */
298   def insertPolicy(policy: PolicyModel): Policy = {
299     val localPolicy = StdPolicy(
300       id = policy.id,
301       parentID = policy.parentID,
302       validityTimespan = policy.validityTimespan,
303       resourceTypes = policy.resourceTypes,
304       chargingBehaviors = policy.chargingBehaviors,
305       roleMapping = policy.roleMapping
306     )
307     _policies = localPolicy :: _policies
308
309     localPolicy
310   }
311
312   def loadValidPolicyAt(atMillis: Long): Option[Policy] = {
313     var d = new Date(atMillis)
314     /* sort in reverse order  and return the first that includes this date*/
315     _policies.sortWith({(x,y)=> y.validFrom < x.validFrom}).collectFirst({
316       case t if(t.validityTimespan.toTimeslot.includes(d)) => t
317     })
318   }
319
320   private def emptyMap = immutable.SortedMap[Timeslot,Policy]()
321
322   def loadAndSortPoliciesWithin(fromMillis: Long, toMillis: Long): SortedMap[Timeslot, Policy] = {
323     val range = Timeslot(fromMillis,toMillis)
324     _policies.foldLeft (emptyMap) { (map,p) =>
325       if(range.overlaps(p.validityTimespan.toTimeslot))
326         map + ((p.validityTimespan.toTimeslot,p))
327       else
328         map
329     }
330   }
331 }
332
333 object MemStoreProvider {
334   final def isLocalIMEvent(event: IMEventModel) = event match {
335     case _: MemIMEvent ⇒ true
336     case _ ⇒ false
337   }
338 }