Reorg initialization seq
[aquarium] / src / main / scala / gr / grnet / aquarium / store / memory / MemStoreProvider.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.memory
37
38 import collection.immutable
39 import collection.immutable.SortedMap
40 import com.ckkloverdos.props.Props
41 import gr.grnet.aquarium.Configurable
42 import gr.grnet.aquarium.computation.BillingMonthInfo
43 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
44 import gr.grnet.aquarium.message.avro.gen.{UserAgreementHistoryMsg, UserStateMsg, IMEventMsg, ResourceEventMsg, PolicyMsg}
45 import gr.grnet.aquarium.message.avro.{MessageFactory, MessageHelpers, OrderingHelpers}
46 import gr.grnet.aquarium.store._
47 import gr.grnet.aquarium.util.{Loggable, Tags}
48
49 /**
50  * An implementation of various stores that persists parts in memory.
51  *
52  * This is just for testing purposes.
53  * 
54  * @author Christos KK Loverdos <loverdos@gmail.com>
55  * @author Georgios Gousios <gousiosg@gmail.com>
56  * @author Prodromos Gerakios <grnet.gr>
57  */
58
59 class MemStoreProvider
60 extends StoreProvider
61    with UserStateStore
62    with Configurable
63    with PolicyStore
64    with ResourceEventStore
65    with IMEventStore
66    with Loggable {
67
68   private[this] var _userStates = immutable.TreeSet[UserStateMsg]()(OrderingHelpers.DefaultUserStateMsgOrdering)
69   private[this] var _policies = immutable.TreeSet[PolicyMsg]()(OrderingHelpers.DefaultPolicyMsgOrdering)
70   private[this] var _resourceEvents = immutable.TreeSet[ResourceEventMsg]()(OrderingHelpers.DefaultResourceEventMsgOrdering)
71   private[this] var _imEvents = immutable.TreeSet[IMEventMsg]()(OrderingHelpers.DefaultIMEventMsgOrdering)
72
73   def propertyPrefix = None
74
75   def configure(props: Props) = {
76   }
77
78   override def toString = {
79     val map = Map(
80       Tags.UserStateTag     -> _userStates.size,
81       Tags.ResourceEventTag -> _resourceEvents.size,
82       Tags.IMEventTag       -> _imEvents.size,
83       "PolicyEntry"         -> _policies.size
84     )
85
86     "MemStoreProvider(%s)" format map
87   }
88
89   //+ StoreProvider
90   def userStateStore = this
91
92   def resourceEventStore = this
93
94   def imEventStore = this
95
96   def policyStore = this
97   //- StoreProvider
98
99
100
101   //+ UserStateStore
102   def insertUserState(event: UserStateMsg) = {
103     event.setInStoreID(event.getOriginalID)
104     _userStates += event
105     event
106   }
107
108   def findUserStateByUserID(userID: String) = {
109     _userStates.find(_.getUserID == userID)
110   }
111
112   def findLatestUserStateForFullMonthBilling(userID: String, bmi: BillingMonthInfo) = {
113     _userStates.filter { userState ⇒
114       userState.getUserID == userID &&
115       userState.getIsForFullMonth &&
116       userState.getBillingYear == bmi.year &&
117       userState.getBillingMonth == bmi.month
118     }.lastOption
119   }
120
121   def findLatestUserState(userID: String) = {
122     _userStates.filter(_.getUserID == userID).lastOption
123   }
124   //- UserStateStore
125
126   //+ ResourceEventStore
127   def pingResourceEventStore(): Unit = {
128     // We are always live and kicking...
129   }
130
131   def insertResourceEvent(event: ResourceEventMsg) = {
132     event.setInStoreID(event.getOriginalID)
133     _resourceEvents += event
134     event
135   }
136
137   def findResourceEventByID(id: String) = {
138     _resourceEvents.find(_.getOriginalID == id)
139   }
140
141   def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
142     _resourceEvents.filter { case ev ⇒
143       ev.getUserID == userID &&
144       // out of sync events are those that were received in the billing month but occurred in previous (or next?)
145       // months
146       MessageHelpers.isOutOfSyncForBillingPeriod(ev, startMillis, stopMillis)
147     }.size.toLong
148   }
149   //- ResourceEventStore
150
151   def foreachResourceEventOccurredInPeriod(
152       userID: String,
153       startMillis: Long,
154       stopMillis: Long
155   )(f: ResourceEventMsg ⇒ Unit): Unit = {
156     _resourceEvents.filter { case ev ⇒
157       ev.getUserID == userID &&
158       MessageHelpers.isOccurredWithinMillis(ev, startMillis, stopMillis)
159     }.foreach(f)
160   }
161
162   //+ IMEventStore
163   def pingIMEventStore(): Unit = {
164   }
165
166
167   def insertIMEvent(event: IMEventMsg) = {
168     event.setInStoreID(event.getOriginalID)
169     _imEvents += event
170     event
171   }
172
173   def findIMEventByID(id: String) = {
174     _imEvents.find(_.getOriginalID == id)
175   }
176
177
178   /**
179    * Find the `CREATE` even for the given user. Note that there must be only one such event.
180    */
181   def findCreateIMEventByUserID(userID: String) = {
182     _imEvents.find { event ⇒
183       event.getUserID() == userID && MessageHelpers.isUserCreationIMEvent(event)
184     }
185   }
186
187   /**
188    * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
189    * the given function `f`.
190    *
191    * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
192    */
193   def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEventMsg) ⇒ Boolean) = {
194     var _shouldContinue = true
195     for {
196       msg <- _imEvents if _shouldContinue
197     } {
198       _shouldContinue = f(msg)
199     }
200     _shouldContinue
201   }
202   //- IMEventStore
203
204   //+ PolicyStore
205   def insertPolicy(policy: PolicyMsg): PolicyMsg = synchronized {
206     policy.setInStoreID(policy.getOriginalID)
207     _policies += policy
208     policy
209   }
210
211   def loadPolicyAt(atMillis: Long): Option[PolicyMsg] = synchronized {
212     _policies.to(MessageFactory.newDummyPolicyMsgAt(atMillis)).lastOption
213   }
214
215   def loadSortedPoliciesWithin(fromMillis: Long, toMillis: Long): SortedMap[Timeslot, PolicyMsg] = {
216     immutable.SortedMap(_policies.
217       from(MessageFactory.newDummyPolicyMsgAt(fromMillis)).
218       to(MessageFactory.newDummyPolicyMsgAt(toMillis)).toSeq.
219       map(p ⇒ (Timeslot(p.getValidFromMillis, p.getValidToMillis), p)): _*
220     )
221   }
222
223   def foreachPolicy[U](f: (PolicyMsg) ⇒ U) {
224     _policies.foreach(f)
225   }
226   //- PolicyStore
227 }