8120df5bd012dbfa60cfa7188af72dbd002b4411
[aquarium] / src / main / scala / gr / grnet / aquarium / store / memory / MemStoreProvider.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.memory
37
38 import com.ckkloverdos.props.Props
39 import com.ckkloverdos.maybe.Just
40 import gr.grnet.aquarium.store._
41 import scala.collection.JavaConversions._
42 import collection.mutable.ConcurrentMap
43 import java.util.concurrent.ConcurrentHashMap
44 import gr.grnet.aquarium.Configurable
45 import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
46 import org.bson.types.ObjectId
47 import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
48 import gr.grnet.aquarium.computation.state.UserState
49 import gr.grnet.aquarium.util.Tags
50 import gr.grnet.aquarium.computation.BillingMonthInfo
51 import gr.grnet.aquarium.policy.{PolicyModel, StdPolicy}
52 import collection.immutable.SortedMap
53 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
54
55 /**
56  * An implementation of various stores that persists parts in memory.
57  *
58  * This is just for testing purposes.
59  * 
60  * @author Christos KK Loverdos <loverdos@gmail.com>
61  * @author Georgios Gousios <gousiosg@gmail.com>
62  * @author Prodromos Gerakios <grnet.gr>
63  */
64
65 class MemStoreProvider
66 extends StoreProvider
67    with UserStateStore
68    with Configurable
69    with PolicyStore
70    with ResourceEventStore
71    with IMEventStore {
72
73   override type IMEvent = MemIMEvent
74   override type ResourceEvent = MemResourceEvent
75   override type Policy = StdPolicy
76
77   private[this] var _userStates = List[UserState]()
78   private[this] var _policies  = List[Policy]()
79   private[this] var _resourceEvents = List[ResourceEvent]()
80
81   private[this] val imEventById: ConcurrentMap[String, MemIMEvent] = new ConcurrentHashMap[String, MemIMEvent]()
82
83
84   def propertyPrefix = None
85
86   def configure(props: Props) = {
87   }
88
89   override def toString = {
90     val map = Map(
91       Tags.UserStateTag     -> _userStates.size,
92       Tags.ResourceEventTag -> _resourceEvents.size,
93       Tags.IMEventTag       -> imEventById.size,
94       "PolicyEntry"         -> _policies.size
95     )
96
97     "MemStoreProvider(%s)" format map
98   }
99
100   //+ StoreProvider
101   def userStateStore = this
102
103   def resourceEventStore = this
104
105   def imEventStore = this
106
107   def policyStore = this
108   //- StoreProvider
109
110
111   //+ UserStateStore
112   def insertUserState(userState: UserState): UserState = {
113     _userStates = userState.copy(_id = new ObjectId().toString) :: _userStates
114     userState
115   }
116
117   def findUserStateByUserID(userID: String) = {
118     _userStates.find(_.userID == userID)
119   }
120
121   def findLatestUserStateForFullMonthBilling(userID: String, bmi: BillingMonthInfo): Option[UserState] = {
122     val goodOnes = _userStates.filter(_.theFullBillingMonth.isDefined).filter { userState ⇒
123         val f1 = userState.userID == userID
124         val f2 = userState.isFullBillingMonthState
125         val bm = userState.theFullBillingMonth.get
126         val f3 = bm == bmi
127
128         f1 && f2 && f3
129     }
130     
131     goodOnes.sortWith {
132       case (us1, us2) ⇒
133         us1.occurredMillis > us2.occurredMillis
134     } match {
135       case head :: _ ⇒
136         Some(head)
137       case _ ⇒
138         None
139     }
140   }
141   //- UserStateStore
142
143   //+ ResourceEventStore
144   def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent = {
145     if(event.isInstanceOf[MemResourceEvent]) event.asInstanceOf[MemResourceEvent]
146     else {
147       import event._
148       new StdResourceEvent(
149         id,
150         occurredMillis,
151         receivedMillis,
152         userID,
153         clientID,
154         resource,
155         instanceID,
156         value,
157         eventVersion,
158         details
159       ): MemResourceEvent
160     }
161   }
162
163   override def clearResourceEvents() = {
164     _resourceEvents = Nil
165   }
166
167   def pingResourceEventStore(): Unit = {
168     // We are always live and kicking...
169   }
170
171   def insertResourceEvent(event: ResourceEventModel) = {
172     val localEvent = createResourceEventFromOther(event)
173     _resourceEvents ::= localEvent
174     localEvent
175   }
176
177   def findResourceEventByID(id: String) = {
178     _resourceEvents.find(ev ⇒ ev.id == id)
179   }
180
181   def findResourceEventsByUserID(userId: String)
182                                 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
183     val byUserId = _resourceEvents.filter(_.userID == userId).toArray
184     val sorted = sortWith match {
185       case Some(sorter) ⇒
186         byUserId.sortWith(sorter)
187       case None ⇒
188         byUserId
189     }
190
191     sorted.toList
192   }
193
194   def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
195     _resourceEvents.filter { case ev ⇒
196       ev.userID == userID &&
197       // out of sync events are those that were received in the billing month but occurred in previous (or next?)
198       // months
199       ev.isOutOfSyncForBillingPeriod(startMillis, stopMillis)
200     }.size.toLong
201   }
202   //- ResourceEventStore
203
204   def foreachResourceEventOccurredInPeriod(
205       userID: String,
206       startMillis: Long,
207       stopMillis: Long
208   )(f: ResourceEvent ⇒ Unit): Unit = {
209     _resourceEvents.filter { case ev ⇒
210       ev.userID == userID &&
211       ev.isOccurredWithinMillis(startMillis, stopMillis)
212     }.foreach(f)
213   }
214
215   //+ IMEventStore
216   def createIMEventFromJson(json: String) = {
217     StdIMEvent.fromJsonString(json)
218   }
219
220   def createIMEventFromOther(event: IMEventModel) = {
221     StdIMEvent.fromOther(event)
222   }
223
224   def pingIMEventStore(): Unit = {
225   }
226
227
228   def insertIMEvent(event: IMEventModel) = {
229     val localEvent = createIMEventFromOther(event)
230     imEventById += (event.id -> localEvent)
231     localEvent
232   }
233
234   def findIMEventByID(id: String) = imEventById.get(id)
235
236
237   /**
238    * Find the `CREATE` even for the given user. Note that there must be only one such event.
239    */
240   def findCreateIMEventByUserID(userID: String): Option[IMEvent] = {
241     imEventById.valuesIterator.filter { e ⇒
242       e.userID == userID && e.isCreateUser
243     }.toList.sortWith { case (e1, e2) ⇒
244       e1.occurredMillis < e2.occurredMillis
245     } headOption
246   }
247
248   def findLatestIMEventByUserID(userID: String): Option[IMEvent] = {
249     imEventById.valuesIterator.filter(_.userID == userID).toList.sortWith {
250       case (us1, us2) ⇒
251         us1.occurredMillis > us2.occurredMillis
252     } headOption
253   }
254
255   /**
256    * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
257    * the given function `f`.
258    *
259    * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
260    */
261   def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEvent) => Unit) = {
262     imEventById.valuesIterator.filter(_.userID == userID).toSeq.sortWith {
263       case (ev1, ev2) ⇒ ev1.occurredMillis <= ev2.occurredMillis
264     } foreach(f)
265   }
266   //- IMEventStore
267
268   /**
269    * Store an accounting policy.
270    */
271   def insertPolicy(policy: PolicyModel): Policy = {
272     val localPolicy = StdPolicy(
273       id = policy.id,
274       parentID = policy.parentID,
275       validityTimespan = policy.validityTimespan,
276       resourceTypes = policy.resourceTypes,
277       chargingBehaviors = policy.chargingBehaviors,
278       roleMapping = policy.roleMapping
279     )
280     _policies = localPolicy :: _policies
281
282     localPolicy
283   }
284
285   def loadValidPolicyAt(atMillis: Long): Option[Policy] = {
286     throw new UnsupportedOperationException
287   }
288
289   def loadAndSortPoliciesWithin(fromMillis: Long, toMillis: Long): SortedMap[Timeslot, Policy] = {
290     throw new UnsupportedOperationException
291   }
292 }
293
294 object MemStoreProvider {
295   final def isLocalIMEvent(event: IMEventModel) = event match {
296     case _: MemIMEvent ⇒ true
297     case _ ⇒ false
298   }
299 }