WIP Resource event handling
[aquarium] / src / main / scala / gr / grnet / aquarium / store / memory / MemStore.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store.memory
37
38 import com.ckkloverdos.props.Props
39 import com.ckkloverdos.maybe.Just
40 import gr.grnet.aquarium.store._
41 import scala.collection.JavaConversions._
42 import collection.mutable.ConcurrentMap
43 import java.util.concurrent.ConcurrentHashMap
44 import gr.grnet.aquarium.Configurable
45 import gr.grnet.aquarium.event.model.PolicyEntry
46 import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
47 import org.bson.types.ObjectId
48 import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
49 import gr.grnet.aquarium.computation.state.UserState
50 import gr.grnet.aquarium.util.Tags
51 import gr.grnet.aquarium.computation.BillingMonthInfo
52
53 /**
54  * An implementation of various stores that persists parts in memory.
55  *
56  * This is just for testing purposes.
57  * 
58  * @author Christos KK Loverdos <loverdos@gmail.com>
59  * @author Georgios Gousios <gousiosg@gmail.com>
60  */
61
62 class MemStore extends UserStateStore
63   with Configurable with PolicyStore
64   with ResourceEventStore with IMEventStore
65   with StoreProvider {
66
67   override type IMEvent = MemIMEvent
68   override type ResourceEvent = MemResourceEvent
69
70   private[this] var _userStates     = List[UserState]()
71   private[this] var _policyEntries  = List[PolicyEntry]()
72   private[this] var _resourceEvents = List[ResourceEvent]()
73
74   private[this] val imEventById: ConcurrentMap[String, MemIMEvent] = new ConcurrentHashMap[String, MemIMEvent]()
75
76
77   def propertyPrefix = None
78
79   def configure(props: Props) = {
80   }
81
82   override def toString = {
83     val map = Map(
84       Tags.UserStateTag     -> _userStates.size,
85       Tags.ResourceEventTag -> _resourceEvents.size,
86       Tags.IMEventTag       -> imEventById.size,
87       "PolicyEntry"         -> _policyEntries.size
88     )
89
90     "MemStore(%s)" format map
91   }
92
93   //+ StoreProvider
94   def userStateStore = this
95
96   def resourceEventStore = this
97
98   def imEventStore = this
99
100   def policyStore = this
101   //- StoreProvider
102
103
104   //+ UserStateStore
105   def insertUserState(userState: UserState): UserState = {
106     _userStates = userState.copy(_id = new ObjectId().toString) :: _userStates
107     userState
108   }
109
110   def findUserStateByUserID(userID: String) = {
111     _userStates.find(_.userID == userID)
112   }
113
114   def findLatestUserStateForFullMonthBilling(userID: String, bmi: BillingMonthInfo): Option[UserState] = {
115     val goodOnes = _userStates.filter(_.theFullBillingMonth.isDefined).filter { userState ⇒
116         val f1 = userState.userID == userID
117         val f2 = userState.isFullBillingMonthState
118         val bm = userState.theFullBillingMonth.get
119         val f3 = bm == bmi
120
121         f1 && f2 && f3
122     }
123     
124     goodOnes.sortWith {
125       case (us1, us2) ⇒
126         us1.occurredMillis > us2.occurredMillis
127     } match {
128       case head :: _ ⇒
129         Some(head)
130       case _ ⇒
131         None
132     }
133   }
134   //- UserStateStore
135
136   //+ ResourceEventStore
137   def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent = {
138     if(event.isInstanceOf[MemResourceEvent]) event.asInstanceOf[MemResourceEvent]
139     else {
140       import event._
141       new StdResourceEvent(
142         id,
143         occurredMillis,
144         receivedMillis,
145         userID,
146         clientID,
147         resource,
148         instanceID,
149         value,
150         eventVersion,
151         details
152       ): MemResourceEvent
153     }
154   }
155
156   override def clearResourceEvents() = {
157     _resourceEvents = Nil
158   }
159
160   def pingResourceEventStore(): Unit = {
161     // We are always live and kicking...
162   }
163
164   def insertResourceEvent(event: ResourceEventModel) = {
165     val localEvent = createResourceEventFromOther(event)
166     _resourceEvents ::= localEvent
167     localEvent
168   }
169
170   def findResourceEventById(id: String) = {
171     _resourceEvents.find(ev ⇒ ev.id == id)
172   }
173
174   def findResourceEventsByUserId(userId: String)
175                                 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
176     val byUserId = _resourceEvents.filter(_.userID == userId).toArray
177     val sorted = sortWith match {
178       case Some(sorter) ⇒
179         byUserId.sortWith(sorter)
180       case None ⇒
181         byUserId
182     }
183
184     sorted.toList
185   }
186
187   def findResourceEventsByUserIdAfterTimestamp(userID: String, timestamp: Long): List[ResourceEvent] = {
188     _resourceEvents.filter { ev ⇒
189       ev.userID == userID &&
190       (ev.occurredMillis > timestamp)
191     }.toList
192   }
193
194   def findResourceEventHistory(userId: String,
195                                resName: String,
196                                instid: Option[String],
197                                upTo: Long): List[ResourceEvent] = {
198     Nil
199   }
200
201   def findResourceEventsForReceivedPeriod(userID: String,
202                                           startTimeMillis: Long,
203                                           stopTimeMillis: Long): List[ResourceEvent] = {
204     _resourceEvents.filter { ev ⇒
205       ev.userID == userID &&
206       ev.isReceivedWithinMillis(startTimeMillis, stopTimeMillis)
207     }.toList
208   }
209
210   def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = {
211     _resourceEvents.filter { case ev ⇒
212       ev.userID == userID &&
213       // out of sync events are those that were received in the billing month but occurred in previous (or next?)
214       // months
215       ev.isOutOfSyncForBillingPeriod(startMillis, stopMillis)
216     }.size.toLong
217   }
218
219   /**
220    * Finds all relevant resource events for the billing period.
221    * The relevant events are those:
222    * a) whose `occurredMillis` is within the given billing period or
223    * b) whose `receivedMillis` is within the given billing period.
224    *
225    * Order them by `occurredMillis`
226    */
227   override def findAllRelevantResourceEventsForBillingPeriod(userID: String,
228                                                              startMillis: Long,
229                                                              stopMillis: Long): List[ResourceEvent] = {
230     _resourceEvents.filter { case ev ⇒
231       ev.userID == userID &&
232       ev.isOccurredOrReceivedWithinMillis(startMillis, stopMillis)
233     }.toList sortWith { case (ev1, ev2) ⇒ ev1.occurredMillis <= ev2.occurredMillis }
234   }
235   //- ResourceEventStore
236
237   //+ IMEventStore
238   def createIMEventFromJson(json: String) = {
239     StdIMEvent.fromJsonString(json)
240   }
241
242   def createIMEventFromOther(event: IMEventModel) = {
243     StdIMEvent.fromOther(event)
244   }
245
246   def pingIMEventStore(): Unit = {
247   }
248
249
250   def insertIMEvent(event: IMEventModel) = {
251     val localEvent = createIMEventFromOther(event)
252     imEventById += (event.id -> localEvent)
253     localEvent
254   }
255
256   def findIMEventById(id: String) = imEventById.get(id)
257
258
259   /**
260    * Find the `CREATE` even for the given user. Note that there must be only one such event.
261    */
262   def findCreateIMEventByUserID(userID: String): Option[IMEvent] = {
263     imEventById.valuesIterator.filter { e ⇒
264       e.userID == userID && e.isCreateUser
265     }.toList.sortWith { case (e1, e2) ⇒
266       e1.occurredMillis < e2.occurredMillis
267     } headOption
268   }
269
270   def findLatestIMEventByUserID(userID: String): Option[IMEvent] = {
271     imEventById.valuesIterator.filter(_.userID == userID).toList.sortWith {
272       case (us1, us2) ⇒
273         us1.occurredMillis > us2.occurredMillis
274     } headOption
275   }
276
277   def findFirstIsActiveIMEventByUserID(userID: String): Option[IMEvent] = {
278     imEventById.valuesIterator.filter { case ev ⇒
279       ev.userID == userID && ev.isActive
280     }.toList.sortWith { case (ev1, ev2) ⇒
281       ev1.occurredMillis <= ev2.occurredMillis
282     } match {
283       case head :: _ ⇒
284         Some(head)
285
286       case _ ⇒
287         None
288     }
289   }
290
291   /**
292    * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
293    * the given function `f`.
294    *
295    * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
296    */
297   def replayIMEventsInOccurrenceOrder(userID: String)(f: (IMEvent) => Unit) = {
298     imEventById.valuesIterator.filter(_.userID == userID).toSeq.sortWith {
299       case (ev1, ev2) ⇒ ev1.occurredMillis <= ev2.occurredMillis
300     } foreach(f)
301   }
302   //- IMEventStore
303
304   def loadPolicyEntriesAfter(after: Long) =
305     _policyEntries.filter(p => p.validFrom > after)
306             .sortWith((a,b) => a.validFrom < b.validFrom)
307
308   def storePolicyEntry(policy: PolicyEntry) = {_policyEntries = policy :: _policyEntries; Just(RecordID(policy.id))}
309
310   def updatePolicyEntry(policy: PolicyEntry) =
311     _policyEntries = _policyEntries.foldLeft(List[PolicyEntry]()){
312       (acc, p) =>
313         if (p.id == policy.id)
314           policy :: acc
315         else
316           p :: acc
317   }
318
319   def findPolicyEntry(id: String) = {
320     _policyEntries.find(p => p.id == id)
321   }
322 }
323
324 object MemStore {
325   final def isLocalIMEvent(event: IMEventModel) = event match {
326     case _: MemIMEvent ⇒ true
327     case _ ⇒ false
328   }
329 }