Better handling of billing month info
[aquarium] / src / main / scala / gr / grnet / aquarium / store / memory / MemStoreProvider.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.memory
37
38 import com.ckkloverdos.props.Props
39 import gr.grnet.aquarium.store._
40 import scala.collection.JavaConversions._
41 import collection.mutable.ConcurrentMap
42 import java.util.concurrent.ConcurrentHashMap
43 import gr.grnet.aquarium.Configurable
44 import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
45 import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
46 import gr.grnet.aquarium.util.{Loggable, Tags}
47 import gr.grnet.aquarium.computation.BillingMonthInfo
48 import gr.grnet.aquarium.policy.{PolicyModel, StdPolicy}
49 import collection.immutable.SortedMap
50 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
51 import collection.immutable
52 import java.util.Date
53 import gr.grnet.aquarium.charging.state.{UserStateModel, StdUserState}
54
55 /**
56  * An implementation of various stores that persists parts in memory.
57  *
58  * This is just for testing purposes.
59  * 
60  * @author Christos KK Loverdos <loverdos@gmail.com>
61  * @author Georgios Gousios <gousiosg@gmail.com>
62  * @author Prodromos Gerakios <grnet.gr>
63  */
64
65 class MemStoreProvider
66 extends StoreProvider
67    with UserStateStore
68    with Configurable
69    with PolicyStore
70    with ResourceEventStore
71    with IMEventStore
72    with Loggable {
73
74   override type IMEvent = MemIMEvent
75   override type ResourceEvent = MemResourceEvent
76   override type Policy = StdPolicy
77   override type UserState = StdUserState
78
79   private[this] var _userStates = List[UserState]()
80   private[this] var _policies  = List[Policy]()
81   private[this] var _resourceEvents = List[ResourceEvent]()
82
83   private[this] val imEventById: ConcurrentMap[String, MemIMEvent] = new ConcurrentHashMap[String, MemIMEvent]()
84
85
86   def propertyPrefix = None
87
88   def configure(props: Props) = {
89   }
90
91   override def toString = {
92     val map = Map(
93       Tags.UserStateTag     -> _userStates.size,
94       Tags.ResourceEventTag -> _resourceEvents.size,
95       Tags.IMEventTag       -> imEventById.size,
96       "PolicyEntry"         -> _policies.size
97     )
98
99     "MemStoreProvider(%s)" format map
100   }
101
102   //+ StoreProvider
103   def userStateStore = this
104
105   def resourceEventStore = this
106
107   def imEventStore = this
108
109   def policyStore = this
110   //- StoreProvider
111
112
113   //+ UserStateStore
114   def createUserStateFromOther(model: UserStateModel): UserState = {
115     logger.info("createUserStateFromOther(%s)".format(model))
116
117     if(model.isInstanceOf[StdUserState]) {
118       model.asInstanceOf[StdUserState]
119     }
120     else {
121       new StdUserState(
122         model.id,
123         model.parentIDInStore,
124         model.userID,
125         model.occurredMillis,
126         model.totalCredits,
127         model.isFullBillingMonth,
128         model.billingYear,
129         model.billingMonth,
130         model.chargingReason,
131         model.previousResourceEvents,
132         model.implicitlyIssuedStartEvents,
133         model.accumulatingAmountOfResourceInstance,
134         model.chargingDataOfResourceInstance,
135         model.billingPeriodOutOfSyncResourceEventsCounter,
136         model.agreementHistory,
137         model.walletEntries
138       )
139     }
140   }
141
142   def insertUserState(userState: UserStateModel): UserState = {
143     val localUserState = createUserStateFromOther(userState)
144     _userStates ::= localUserState
145     localUserState
146   }
147
148   def findUserStateByUserID(userID: String) = {
149     _userStates.find(_.userID == userID)
150   }
151
152   def findLatestUserStateForFullMonthBilling(userID: String, bmi: BillingMonthInfo): Option[UserState] = {
153     val goodOnes = _userStates.filter { userState ⇒
154       userState.userID == userID &&
155       userState.isFullBillingMonth &&
156       userState.billingYear == bmi.year &&
157       userState.billingMonth == bmi.month
158     }
159     
160     goodOnes.sortWith {
161       case (us1, us2) ⇒
162         us1.occurredMillis > us2.occurredMillis
163     } match {
164       case head :: _ ⇒
165         Some(head)
166       case _ ⇒
167         None
168     }
169   }
170   //- UserStateStore
171
172   //+ ResourceEventStore
173   def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent = {
174     if(event.isInstanceOf[MemResourceEvent]) event.asInstanceOf[MemResourceEvent]
175     else {
176       import event._
177       new StdResourceEvent(
178         id,
179         occurredMillis,
180         receivedMillis,
181         userID,
182         clientID,
183         resource,
184         instanceID,
185         value,
186         eventVersion,
187         details
188       ): MemResourceEvent
189     }
190   }
191
192   override def clearResourceEvents() = {
193     _resourceEvents = Nil
194   }
195
196   def pingResourceEventStore(): Unit = {
197     // We are always live and kicking...
198   }
199
200   def insertResourceEvent(event: ResourceEventModel) = {
201     val localEvent = createResourceEventFromOther(event)
202     _resourceEvents ::= localEvent
203     localEvent
204   }
205
206   def findResourceEventByID(id: String) = {
207     _resourceEvents.find(ev ⇒ ev.id == id)
208   }
209
210   def findResourceEventsByUserID(userId: String)
211                                 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
212     val byUserId = _resourceEvents.filter(_.userID == userId).toArray
213     val sorted = sortWith match {
214       case Some(sorter) ⇒
215         byUserId.sortWith(sorter)
216       case None ⇒
217         byUserId
218     }
219
220     sorted.toList
221   }
222
223   def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
224     _resourceEvents.filter { case ev ⇒
225       ev.userID == userID &&
226       // out of sync events are those that were received in the billing month but occurred in previous (or next?)
227       // months
228       ev.isOutOfSyncForBillingPeriod(startMillis, stopMillis)
229     }.size.toLong
230   }
231   //- ResourceEventStore
232
233   def foreachResourceEventOccurredInPeriod(
234       userID: String,
235       startMillis: Long,
236       stopMillis: Long
237   )(f: ResourceEvent ⇒ Unit): Unit = {
238     _resourceEvents.filter { case ev ⇒
239       ev.userID == userID &&
240       ev.isOccurredWithinMillis(startMillis, stopMillis)
241     }.foreach(f)
242   }
243
244   //+ IMEventStore
245   def createIMEventFromJson(json: String) = {
246     StdIMEvent.fromJsonString(json)
247   }
248
249   def createIMEventFromOther(event: IMEventModel) = {
250     StdIMEvent.fromOther(event)
251   }
252
253   def pingIMEventStore(): Unit = {
254   }
255
256
257   def insertIMEvent(event: IMEventModel) = {
258     val localEvent = createIMEventFromOther(event)
259     imEventById += (event.id -> localEvent)
260     localEvent
261   }
262
263   def findIMEventByID(id: String) = imEventById.get(id)
264
265
266   /**
267    * Find the `CREATE` even for the given user. Note that there must be only one such event.
268    */
269   def findCreateIMEventByUserID(userID: String): Option[IMEvent] = {
270     imEventById.valuesIterator.filter { e ⇒
271       e.userID == userID && e.isCreateUser
272     }.toList.sortWith { case (e1, e2) ⇒
273       e1.occurredMillis < e2.occurredMillis
274     } headOption
275   }
276
277   def findLatestIMEventByUserID(userID: String): Option[IMEvent] = {
278     imEventById.valuesIterator.filter(_.userID == userID).toList.sortWith {
279       case (us1, us2) ⇒
280         us1.occurredMillis > us2.occurredMillis
281     } headOption
282   }
283
284   /**
285    * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
286    * the given function `f`.
287    *
288    * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
289    */
290   def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEvent) => Unit) = {
291     imEventById.valuesIterator.filter(_.userID == userID).toSeq.sortWith {
292       case (ev1, ev2) ⇒ ev1.occurredMillis <= ev2.occurredMillis
293     } foreach(f)
294   }
295   //- IMEventStore
296
297   /**
298    * Store an accounting policy.
299    */
300   def insertPolicy(policy: PolicyModel): Policy = {
301     val localPolicy = StdPolicy(
302       id = policy.id,
303       parentID = policy.parentID,
304       validityTimespan = policy.validityTimespan,
305       resourceTypes = policy.resourceTypes,
306       chargingBehaviors = policy.chargingBehaviors,
307       roleMapping = policy.roleMapping
308     )
309     _policies = localPolicy :: _policies
310
311     localPolicy
312   }
313
314   def loadValidPolicyAt(atMillis: Long): Option[Policy] = {
315     var d = new Date(atMillis)
316     /* sort in reverse order  and return the first that includes this date*/
317     _policies.sortWith({(x,y)=> y.validFrom < x.validFrom}).collectFirst({
318       case t if(t.validityTimespan.toTimeslot.includes(d)) => t
319     })
320   }
321
322   private def emptyMap = immutable.SortedMap[Timeslot,Policy]()
323
324   def loadAndSortPoliciesWithin(fromMillis: Long, toMillis: Long): SortedMap[Timeslot, Policy] = {
325     val range = Timeslot(fromMillis,toMillis)
326     _policies.foldLeft (emptyMap) { (map,p) =>
327       if(range.overlaps(p.validityTimespan.toTimeslot))
328         map + ((p.validityTimespan.toTimeslot,p))
329       else
330         map
331     }
332   }
333 }
334
335 object MemStoreProvider {
336   final def isLocalIMEvent(event: IMEventModel) = event match {
337     case _: MemIMEvent ⇒ true
338     case _ ⇒ false
339   }
340 }