2330c45b0244ad4bff0de644ed5ee230ea4d6ab4
[aquarium] / src / main / scala / gr / grnet / aquarium / store / memory / MemStoreProvider.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.memory
37
38 import collection.immutable
39 import collection.immutable.SortedMap
40 import com.ckkloverdos.props.Props
41 import gr.grnet.aquarium.Configurable
42 import gr.grnet.aquarium.computation.BillingMonthInfo
43 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
44 import gr.grnet.aquarium.message.avro.gen.{UserStateMsg, IMEventMsg, ResourceEventMsg, PolicyMsg}
45 import gr.grnet.aquarium.message.avro.{MessageFactory, MessageHelpers, OrderingHelpers}
46 import gr.grnet.aquarium.store._
47 import gr.grnet.aquarium.util.{Loggable, Tags}
48
49 /**
50  * An implementation of various stores that persists parts in memory.
51  *
52  * This is just for testing purposes.
53  * 
54  * @author Christos KK Loverdos <loverdos@gmail.com>
55  * @author Georgios Gousios <gousiosg@gmail.com>
56  * @author Prodromos Gerakios <grnet.gr>
57  */
58
59 class MemStoreProvider
60 extends StoreProvider
61    with UserStateStore
62    with Configurable
63    with PolicyStore
64    with ResourceEventStore
65    with IMEventStore
66    with Loggable {
67
68   private[this] var _userStates = immutable.TreeSet[UserStateMsg]()(OrderingHelpers.DefaultUserStateMsgOrdering)
69   private[this] var _policies = immutable.TreeSet[PolicyMsg]()(OrderingHelpers.DefaultPolicyMsgOrdering)
70   private[this] var _resourceEvents = immutable.TreeSet[ResourceEventMsg]()(OrderingHelpers.DefaultResourceEventMsgOrdering)
71   private[this] var _imEvents = immutable.TreeSet[IMEventMsg]()(OrderingHelpers.DefaultIMEventMsgOrdering)
72
73   def propertyPrefix = None
74
75   def configure(props: Props) = {
76   }
77
78   override def toString = {
79     val map = Map(
80       Tags.UserStateTag     -> _userStates.size,
81       Tags.ResourceEventTag -> _resourceEvents.size,
82       Tags.IMEventTag       -> _imEvents.size,
83       "PolicyEntry"         -> _policies.size
84     )
85
86     "MemStoreProvider(%s)" format map
87   }
88
89   //+ StoreProvider
90   def userStateStore = this
91
92   def resourceEventStore = this
93
94   def imEventStore = this
95
96   def policyStore = this
97   //- StoreProvider
98
99
100   //+ UserStateStore
101   def insertUserState(event: UserStateMsg) = {
102     event.setInStoreID(event.getOriginalID)
103     _userStates += event
104     event
105   }
106
107   def findUserStateByUserID(userID: String) = {
108     _userStates.find(_.getUserID == userID)
109   }
110
111   def findLatestUserStateForFullMonthBilling(userID: String, bmi: BillingMonthInfo) = {
112     _userStates.filter { userState ⇒
113       userState.getUserID == userID &&
114       userState.getIsFullBillingMonth &&
115       userState.getBillingYear == bmi.year &&
116       userState.getBillingMonth == bmi.month
117     }.lastOption
118   }
119   //- UserStateStore
120
121   //+ ResourceEventStore
122   def pingResourceEventStore(): Unit = {
123     // We are always live and kicking...
124   }
125
126   def insertResourceEvent(event: ResourceEventMsg) = {
127     event.setInStoreID(event.getOriginalID)
128     _resourceEvents += event
129     event
130   }
131
132   def findResourceEventByID(id: String) = {
133     _resourceEvents.find(_.getOriginalID == id)
134   }
135
136   def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
137     _resourceEvents.filter { case ev ⇒
138       ev.getUserID == userID &&
139       // out of sync events are those that were received in the billing month but occurred in previous (or next?)
140       // months
141       MessageHelpers.isOutOfSyncForBillingPeriod(ev, startMillis, stopMillis)
142     }.size.toLong
143   }
144   //- ResourceEventStore
145
146   def foreachResourceEventOccurredInPeriod(
147       userID: String,
148       startMillis: Long,
149       stopMillis: Long
150   )(f: ResourceEventMsg ⇒ Unit): Unit = {
151     _resourceEvents.filter { case ev ⇒
152       ev.getUserID == userID &&
153       MessageHelpers.isOccurredWithinMillis(ev, startMillis, stopMillis)
154     }.foreach(f)
155   }
156
157   //+ IMEventStore
158   def pingIMEventStore(): Unit = {
159   }
160
161
162   def insertIMEvent(event: IMEventMsg) = {
163     event.setInStoreID(event.getOriginalID)
164     _imEvents += event
165     event
166   }
167
168   def findIMEventByID(id: String) = {
169     _imEvents.find(_.getOriginalID == id)
170   }
171
172
173   /**
174    * Find the `CREATE` even for the given user. Note that there must be only one such event.
175    */
176   def findCreateIMEventByUserID(userID: String) = {
177     _imEvents.find { event ⇒
178       event.getUserID() == userID && MessageHelpers.isIMEventCreate(event)
179     }
180   }
181
182   /**
183    * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
184    * the given function `f`.
185    *
186    * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
187    */
188   def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEventMsg) ⇒ Boolean) = {
189     var _shouldContinue = true
190     for {
191       msg <- _imEvents if _shouldContinue
192     } {
193       _shouldContinue = f(msg)
194     }
195     _shouldContinue
196   }
197   //- IMEventStore
198
199   //+ PolicyStore
200   def insertPolicy(policy: PolicyMsg): PolicyMsg = synchronized {
201     policy.setInStoreID(policy.getOriginalID)
202     _policies += policy
203     policy
204   }
205
206   def loadPolicyAt(atMillis: Long): Option[PolicyMsg] = synchronized {
207     _policies.to(MessageFactory.newDummyPolicyMsgAt(atMillis)).lastOption
208   }
209
210   def loadSortedPoliciesWithin(fromMillis: Long, toMillis: Long): SortedMap[Timeslot, PolicyMsg] = {
211     immutable.SortedMap(_policies.
212       from(MessageFactory.newDummyPolicyMsgAt(fromMillis)).
213       to(MessageFactory.newDummyPolicyMsgAt(toMillis)).toSeq.
214       map(p ⇒ (Timeslot(p.getValidFromMillis, p.getValidToMillis), p)): _*
215     )
216   }
217
218   def foreachPolicy[U](f: (PolicyMsg) ⇒ U) {
219     _policies.foreach(f)
220   }
221   //- PolicyStore
222 }