Better implement one query on resource event store
[aquarium] / src / main / scala / gr / grnet / aquarium / user / UserStateComputations.scala
1 /*
2  * Copyright 2011 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.user
37
38 import scala.collection.mutable
39
40 import gr.grnet.aquarium.store.ResourceEventStore
41 import com.ckkloverdos.maybe.{Failed, NoVal, Just, Maybe}
42 import gr.grnet.aquarium.logic.accounting.Accounting
43 import gr.grnet.aquarium.util.date.{TimeHelpers, DateCalculator}
44 import gr.grnet.aquarium.logic.accounting.dsl.{DSLResourcesMap, DSLCostPolicy, DSLPolicy, DSLAgreement}
45 import gr.grnet.aquarium.util.Loggable
46 import gr.grnet.aquarium.logic.events.{WalletEntry, ResourceEvent}
47
48 sealed abstract class CalculationType(_name: String) {
49   def name = _name
50 }
51
52 /**
53  * Normal calculations that are part of the bill generation procedure
54  */
55 case object PeriodicCalculation extends CalculationType("periodic")
56
57 /**
58  * Adhoc calculations, e.g. when computing the state in realtime.
59  */
60 case object AdhocCalculation extends CalculationType("adhoc")
61
62 trait UserPolicyFinder {
63   def findUserPolicyAt(userId: String, whenMillis: Long): DSLPolicy
64 }
65
66 trait FullStateFinder {
67   def findFullState(userId: String, whenMillis: Long): Any
68 }
69
70 trait UserStateCache {
71   def findUserStateAtEndOfPeriod(userId: String, year: Int, month: Int): Maybe[UserState]
72
73   /**
74    * Find the most up-to-date user state for the particular billing period.
75    */
76   def findLatestUserStateForBillingMonth(userId: String, yearOfBillingMonth: Int, billingMonth: Int): Maybe[UserState]
77 }
78
79 /**
80  * Use this to keep track of implicit OFFs at the end of the billing period.
81  *
82  * The use case is this: A VM may have been started (ON state) before the end of the billing period
83  * and ended (OFF state) after the beginning of the next billing period. In order to bill this, we must assume
84  * an implicit OFF even right at the end of the billing period and an implicit ON event with the beginning of the
85  * next billing period.
86  *
87  * @author Christos KK Loverdos <loverdos@gmail.com>
88  *
89  * @param onEvents The `ON` events that need to be implicitly terminated.
90  */
91 case class ImplicitOffEvents(onEvents: List[ResourceEvent])
92
93 case class OutOfSyncWalletEntries(entries: List[WalletEntry])
94
95 /**
96  * Full user state at the end of a billing month.
97  *
98  * @param userState
99  * @param implicitOffs
100  */
101 case class EndOfBillingState(userState: UserState, implicitOffs: ImplicitOffEvents, outOfSyncWalletEntries: OutOfSyncWalletEntries)
102
103 /**
104  *
105  * @author Christos KK Loverdos <loverdos@gmail.com>
106  */
107 class UserStateComputations extends Loggable {
108   def createFirstUserState(userId: String, agreementName: String = "default") = {
109     val now = 0L
110     UserState(
111       userId,
112       now,
113       0L,
114       false,
115       null,
116       0L,
117       ActiveSuspendedSnapshot(false, now),
118       CreditSnapshot(0, now),
119       AgreementSnapshot(Agreement(agreementName, now, -1) :: Nil, now),
120       RolesSnapshot(List(), now),
121       PaymentOrdersSnapshot(Nil, now),
122       OwnedGroupsSnapshot(Nil, now),
123       GroupMembershipsSnapshot(Nil, now),
124       OwnedResourcesSnapshot(List(), now)
125     )
126   }
127
128   def createFirstUserState(userId: String, agreementName: String, resourcesMap: DSLResourcesMap) = {
129       val now = 0L
130       UserState(
131         userId,
132         now,
133         0L,
134         false,
135         null,
136         0L,
137         ActiveSuspendedSnapshot(false, now),
138         CreditSnapshot(0, now),
139         AgreementSnapshot(Agreement(agreementName, now, - 1) :: Nil, now),
140         RolesSnapshot(List(), now),
141         PaymentOrdersSnapshot(Nil, now),
142         OwnedGroupsSnapshot(Nil, now),
143         GroupMembershipsSnapshot(Nil, now),
144         OwnedResourcesSnapshot(List(), now)
145       )
146     }
147
148   /**
149    * Get the user state as computed up to (and not including) the start of the new billing period.
150    *
151    * Always compute, taking into account any "out of sync" resource events
152    */
153   def computeUserStateAtStartOfBillingPeriod(billingYear: Int,
154                                              billingMonth: Int,
155                                              knownUserState: UserState,
156                                              accounting: Accounting): Maybe[EndOfBillingState] = {
157
158     val billingDate = new DateCalculator(billingYear, billingMonth, 1)
159     val billingDateMillis = billingDate.toMillis
160
161 //    if(billingDateMillis < knownUserState.startDateMillis) {
162 //      val userId = knownUserState.userId
163 //      val agreementName = knownUserState.agreement match {
164 //        case null      ⇒ "default"
165 //        case agreement ⇒ agreement.data
166 //      }
167 //      createFirstUserState(userId, agreementName)
168 //    } else {
169       // We really need to compute the user state here
170
171       // get all events that
172       // FIXME: Implement
173     Just(EndOfBillingState(knownUserState, ImplicitOffEvents(Nil), OutOfSyncWalletEntries(Nil)))
174
175 //    }
176   }
177
178   /**
179    * Find the previous resource event, if needed by the event's cost policy,
180    * in order to use it for any credit calculations.
181    */
182   def findPreviousRCEventOf(rcEvent: ResourceEvent,
183                             costPolicy: DSLCostPolicy,
184                             previousRCEventsMap: mutable.Map[ResourceEvent.FullResourceType, ResourceEvent]): Maybe[ResourceEvent] = {
185
186     if(costPolicy.needsPreviousEventForCreditCalculation) {
187       // Get a previous resource only if this is needed by the policy
188       previousRCEventsMap.get(rcEvent.fullResourceInfo) match {
189         case Some(previousRCEvent) ⇒
190           Just(previousRCEvent)
191         case None ⇒
192           queryForPreviousRCEvent(rcEvent)
193       }
194     } else {
195       // No need for previous event. Will return NoVal
196       NoVal
197     }
198   }
199
200   /**
201    * FIXME: implement
202    */
203   def queryForPreviousRCEvent(rcEvent: ResourceEvent): Maybe[ResourceEvent] = {
204     NoVal
205   }
206
207   def updatePreviousRCEventWith(previousRCEventsMap: mutable.Map[ResourceEvent.FullResourceType, ResourceEvent],
208                                 newRCEvent: ResourceEvent): Unit = {
209     previousRCEventsMap(newRCEvent.fullResourceInfo) = newRCEvent
210   }
211
212   /**
213    * Do a full month billing.
214    *
215    * Takes into account "out of sync events".
216    * 
217    */
218   def computeFullMonthlyBilling(yearOfBillingMonth: Int,
219                                 billingMonth: Int,
220                                 userId: String,
221                                 policyFinder: UserPolicyFinder,
222                                 fullStateFinder: FullStateFinder,
223                                 userStateCache: UserStateCache,
224                                 rcEventStore: ResourceEventStore,
225                                 currentUserState: UserState,
226                                 otherStuff: Traversable[Any],
227                                 defaultPolicy: DSLPolicy, // Policy.policy
228                                 defaultResourcesMap: DSLResourcesMap,
229                                 accounting: Accounting): Maybe[EndOfBillingState] = Maybe {
230
231     val billingMonthStartDate = new DateCalculator(yearOfBillingMonth, billingMonth, 1)
232     val billingMonthStopDate = billingMonthStartDate.copy.goEndOfThisMonth
233
234     logger.debug("billingMonthStartDate = %s".format(billingMonthStartDate))
235     logger.debug("billingMonthStopDate  = %s".format(billingMonthStopDate))
236
237     val prevBillingMonthStartDate = billingMonthStartDate.copy.goPreviousMonth
238     val yearOfPrevBillingMonth = prevBillingMonthStartDate.getYear
239     val prevBillingMonth = prevBillingMonthStartDate.getMonthOfYear
240
241     // Check if this value is already cached and valid, otherwise compute the value
242     // TODO : cache it in case of new computation
243     val cachedStartUserStateM = userStateCache.findLatestUserStateForBillingMonth(
244       userId,
245       yearOfPrevBillingMonth,
246       prevBillingMonth)
247
248     val (previousStartUserState, newStartUserState) = cachedStartUserStateM match {
249       case Just(cachedStartUserState) ⇒
250         // So, we do have a cached user state but must check if this is still valid
251         logger.debug("Found cachedStartUserState = %s".format(cachedStartUserState))
252
253         // Check how many resource events were used to produce this user state
254         val cachedHowmanyRCEvents = cachedStartUserState.resourceEventsCounter
255
256         // Ask resource event store to see if we had any "out of sync" events for the particular (== previous)
257         // billing period.
258         val prevHowmanyOutOfSyncRCEvents = rcEventStore.countOutOfSyncEventsForBillingMonth(
259           userId,
260           yearOfPrevBillingMonth,
261           prevBillingMonth)
262         logger.debug("prevHowmanyOutOfSyncRCEvents = %s".format(prevHowmanyOutOfSyncRCEvents))
263         
264         val recomputedStartUserState = if(prevHowmanyOutOfSyncRCEvents == 0) {
265         logger.debug("Not necessary to recompute start user state, using cachedStartUserState")
266           // This is good, there were no "out of sync" resource events, so we can use the cached value
267           cachedStartUserState
268         } else {
269           // Oops, there are "out of sync" resource event. Must compute (potentially recursively)
270           logger.debug("Recompute start user state...")
271           val computedUserStateAtStartOfBillingPeriod = computeUserStateAtStartOfBillingPeriod(
272             yearOfPrevBillingMonth,
273             prevBillingMonth,
274             cachedStartUserState,
275             accounting)
276           logger.debug("computedUserStateAtStartOfiingPeriodllB = %s".format(computedUserStateAtStartOfBillingPeriod))
277           val recomputedStartUserState = computedUserStateAtStartOfBillingPeriod.asInstanceOf[Just[EndOfBillingState]].get.userState // FIXME
278           logger.debug("recomputedStartUserState = %s".format(recomputedStartUserState))
279           recomputedStartUserState
280         }
281
282         (cachedStartUserState, recomputedStartUserState)
283       case NoVal ⇒
284         // We do not even have a cached value, so compute one!
285         logger.debug("Do not have a cachedStartUserState, computing one...")
286         val computedUserStateAtStartOfBillingPeriod = computeUserStateAtStartOfBillingPeriod(
287           yearOfPrevBillingMonth,
288           prevBillingMonth,
289           currentUserState,
290           accounting)
291         logger.debug("computedUserStateAtStartOfBillingPeriod = %s".format(computedUserStateAtStartOfBillingPeriod))
292         val recomputedStartUserState = computedUserStateAtStartOfBillingPeriod.asInstanceOf[Just[EndOfBillingState]].get.userState // FIXME
293         logger.debug("recomputedStartUserState = %s".format(recomputedStartUserState))
294
295         (recomputedStartUserState, recomputedStartUserState)
296       case Failed(e, m) ⇒
297         logger.error("[Could not find latest user state for billing month %s-%s] %s".format(yearOfPrevBillingMonth, prevBillingMonth, m), e)
298         throw new Exception(m, e)
299     }
300
301     // OK. Now that we have a user state to start with (= start of billing period reference point),
302     // let us deal with the events themselves.
303     val billingStartMillis = billingMonthStartDate.toMillis
304     val billingStopMillis  = billingMonthStopDate.toMillis
305     val allBillingPeriodRelevantRCEvents = rcEventStore.findAllRelevantResourceEventsForBillingPeriod(userId, billingStartMillis, billingStopMillis)
306     logger.debug("allBillingPeriodRelevantRCEvents [%s] = %s".format(allBillingPeriodRelevantRCEvents.size, allBillingPeriodRelevantRCEvents))
307
308     type FullResourceType = ResourceEvent.FullResourceType
309     val previousRCEventsMap = mutable.Map[FullResourceType, ResourceEvent]()
310     val impliedRCEventsMap  = mutable.Map[FullResourceType, ResourceEvent]() // those which do not exists but are
311     // implied in order to do billing calculations (e.g. the "off" vmtime resource event)
312
313     // Our temporary state holder.
314     var _workingUserState = newStartUserState
315     val nowMillis = TimeHelpers.nowMillis
316
317     for(currentResourceEvent <- allBillingPeriodRelevantRCEvents) {
318       val resource = currentResourceEvent.resource
319       val instanceId = currentResourceEvent.instanceId
320
321       logger.debug("Processing %s".format(currentResourceEvent))
322
323       // We need to do these kinds of calculations:
324       // 1. Credit state calculations
325       // 2. Resource state calculations
326
327       // How credits are computed:
328       // - "onoff" events (think "vmtime"):
329       //   - need to be considered in on/off pairs
330       //   - just use the time difference of this event to the previous one for the credit computation
331       // - "discrete" events (think "bandwidth"):
332       //   - just use their value, which is a difference already for the credit computation
333       // - "continuous" events (think "bandwidth"):
334       //   - need the previous absolute value
335       //   - need the time difference of this event to the previous one
336       //   - use both the above (previous absolute value, time difference) for the credit computation
337       //
338       // BUT ALL THE ABOVE SHOULD NOT BE CONSIDERED HERE; RATHER THEY ARE POLYMORPHIC BEHAVIOURS
339
340       // What we need to do is:
341       // A. Update user state with new resource instance amount
342       // B. Update user state with new credit
343       // C. Update ??? state with wallet entries
344
345       // The DSLCostPolicy for the resource does not change, so it is safe to use the default DSLPolicy to obtain it.
346       val costPolicyOpt = currentResourceEvent.findCostPolicy(defaultResourcesMap)
347       costPolicyOpt match {
348         case Some(costPolicy) ⇒
349           ///////////////////////////////////////
350           // A. Update user state with new resource instance amount
351           // TODO: Check if we are at beginning of billing period, so as to use
352           //       costPolicy.computeResourceInstanceAmountForNewBillingPeriod
353           val DefaultResourceInstanceAmount = costPolicy.getResourceInstanceInitialAmount
354
355           val previousAmount = currentUserState.getResourceInstanceAmount(resource, instanceId, DefaultResourceInstanceAmount)
356           val newAmount = costPolicy.computeNewResourceInstanceAmount(previousAmount, currentResourceEvent.value)
357
358           _workingUserState = _workingUserState.copyForResourcesSnapshotUpdate(resource, instanceId, newAmount, nowMillis)
359           // A. Update user state with new resource instance amount
360           ///////////////////////////////////////
361
362
363           ///////////////////////////////////////
364           // B. Update user state with new credit
365           val previousRCEventM = findPreviousRCEventOf(currentResourceEvent, costPolicy, previousRCEventsMap)
366           _workingUserState.findResourceInstanceSnapshot(resource, instanceId)
367           // B. Update user state with new credit
368           ///////////////////////////////////////
369
370
371           ///////////////////////////////////////
372           // C. Update ??? state with wallet entries
373
374           // C. Update ??? state with wallet entries
375           ///////////////////////////////////////
376
377         case None ⇒
378           () // ERROR
379       }
380
381
382       updatePreviousRCEventWith(previousRCEventsMap, currentResourceEvent)
383     } // for(newResourceEvent <- allBillingPeriodRelevantRCEvents)
384
385
386     null
387   }
388
389
390   /**
391   * Runs the billing algorithm on the specified period.
392   * By default, a billing period is monthly.
393   * The start of the billing period is midnight of the first day of the month we compute the bill for.
394   *
395   */
396    def doPartialMonthlyBilling(startBillingYear: Int,
397                                startBillingMonth: Int,
398                                stopBillingMillis: Long,
399                                userId: String,
400                                policyFinder: UserPolicyFinder,
401                                fullStateFinder: FullStateFinder,
402                                userStateFinder: UserStateCache,
403                                rcEventStore: ResourceEventStore,
404                                currentUserState: UserState,
405                                otherStuff: Traversable[Any],
406                                accounting: Accounting): Maybe[UserState] = Maybe {
407   
408
409      null.asInstanceOf[UserState]
410    }
411 }
412
413 object DefaultUserStateComputations extends UserStateComputations