From: Christos KK Loverdos Date: Wed, 6 Jun 2012 15:28:53 +0000 (+0300) Subject: WIP Resource event handling X-Git-Url: https://code.grnet.gr/git/aquarium/commitdiff_plain/370ff8176509af16b20e8d047ad1a865d545e71b WIP Resource event handling --- diff --git a/src/main/scala/gr/grnet/aquarium/computation/UserState.scala b/src/main/scala/gr/grnet/aquarium/computation/UserState.scala index 4a5b56c..df3c431 100644 --- a/src/main/scala/gr/grnet/aquarium/computation/UserState.scala +++ b/src/main/scala/gr/grnet/aquarium/computation/UserState.scala @@ -141,7 +141,8 @@ case class UserState( ownedResourcesSnapshot: OwnedResourcesSnapshot, newWalletEntries: List[NewWalletEntry], - occurredMillis: Long, // The time fro which this state is relevant + + occurredMillis: Long, // When this user state was computed // The last known change reason for this userState lastChangeReason: UserStateChangeReason = NoSpecificChangeReason(), @@ -223,8 +224,17 @@ object UserState { } object JsonNames { - final val _id = "_id" + final val _id = "_id" final val userID = "userID" + final val isFullBillingMonthState = "isFullBillingMonthState" + final val occurredMillis = "occurredMillis" + + object theFullBillingMonth { + final val year = "year" + final val month = "month" + final val monthStartMillis = "monthStartMillis" + final val monthStopMillis = "monthStopMillis" + } } def createInitialUserState(userID: String, diff --git a/src/main/scala/gr/grnet/aquarium/computation/UserStateComputations.scala b/src/main/scala/gr/grnet/aquarium/computation/UserStateComputations.scala index a3a1d84..c5b5925 100644 --- a/src/main/scala/gr/grnet/aquarium/computation/UserStateComputations.scala +++ b/src/main/scala/gr/grnet/aquarium/computation/UserStateComputations.scala @@ -72,7 +72,7 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable { "findUserStateAtEndOfBillingMonth(%s)", billingMonthInfo.toShortDebugString) clog.begin() - def doCompute: UserState = { + def computeFullMonthlyBilling(): UserState = { doFullMonthlyBilling( userStateBootstrap, billingMonthInfo, @@ -98,53 +98,53 @@ class UserStateComputations(_aquarium: () ⇒ Aquarium) extends Loggable { clog.debug("Returning INITIAL state [_id=%s] %s".format(initialUserState1._id, initialUserState1)) clog.end() - initialUserState1 - } else { - // Ask DB cache for the latest known user state for this billing period - val latestUserStateOpt = userStateStore.findLatestUserStateForEndOfBillingMonth( - userID, - billingMonthInfo.year, - billingMonthInfo.month) - - latestUserStateOpt match { - case None ⇒ - // Not found, must compute - clog.debug("No user state found from cache, will have to (re)compute") - val result = doCompute - clog.end() - result - - case Some(latestUserState) ⇒ - // Found a "latest" user state but need to see if it is indeed the true and one latest. - // For this reason, we must count the events again. - val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter - val actualOOSEventsCounter = resourceEventStore.countOutOfSyncEventsForBillingPeriod( - userID, - billingMonthStartMillis, - billingMonthStopMillis) - - val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter - counterDiff match { - // ZERO, we are OK! - case 0 ⇒ - // NOTE: Keep the caller's calculation reason - latestUserState.copyForChangeReason(calculationReason) - - // We had more, so must recompute - case n if n > 0 ⇒ - clog.debug( - "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n) - val result = doCompute - clog.end() - result - - // We had less???? - case n if n < 0 ⇒ - val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n) - clog.warn(errMsg) - throw new AquariumException(errMsg) - } - } + return initialUserState1 + } + + // Ask DB cache for the latest known user state for this billing period + val latestUserStateOpt = userStateStore.findLatestUserStateForEndOfBillingMonth( + userID, + billingMonthInfo.year, + billingMonthInfo.month) + + latestUserStateOpt match { + case None ⇒ + // Not found, must compute + clog.debug("No user state found from cache, will have to (re)compute") + val result = computeFullMonthlyBilling + clog.end() + result + + case Some(latestUserState) ⇒ + // Found a "latest" user state but need to see if it is indeed the true and one latest. + // For this reason, we must count the events again. + val latestStateOOSEventsCounter = latestUserState.billingPeriodOutOfSyncResourceEventsCounter + val actualOOSEventsCounter = resourceEventStore.countOutOfSyncResourceEventsForBillingPeriod( + userID, + billingMonthStartMillis, + billingMonthStopMillis) + + val counterDiff = actualOOSEventsCounter - latestStateOOSEventsCounter + counterDiff match { + // ZERO, we are OK! + case 0 ⇒ + // NOTE: Keep the caller's calculation reason + latestUserState.copyForChangeReason(calculationReason) + + // We had more, so must recompute + case n if n > 0 ⇒ + clog.debug( + "Found %s out of sync events (%s more), will have to (re)compute user state", actualOOSEventsCounter, n) + val result = computeFullMonthlyBilling + clog.end() + result + + // We had less???? + case n if n < 0 ⇒ + val errMsg = "Found %s out of sync events (%s less). DB must be inconsistent".format(actualOOSEventsCounter, n) + clog.warn(errMsg) + throw new AquariumInternalError(errMsg) + } } } diff --git a/src/main/scala/gr/grnet/aquarium/event/model/resource/ResourceEventModel.scala b/src/main/scala/gr/grnet/aquarium/event/model/resource/ResourceEventModel.scala index fb11aa7..af51ede 100644 --- a/src/main/scala/gr/grnet/aquarium/event/model/resource/ResourceEventModel.scala +++ b/src/main/scala/gr/grnet/aquarium/event/model/resource/ResourceEventModel.scala @@ -104,8 +104,10 @@ trait ResourceEventModel extends ExternalEventModel { } def isOutOfSyncForBillingPeriod(billingStartMillis: Long, billingStopMillis: Long): Boolean = { - isReceivedWithinMillis(billingStartMillis, billingStopMillis) && - (occurredMillis < billingStartMillis || occurredMillis > billingStopMillis) + // Out of sync events are those that were received within the billing period + // but actually occurred outside the billing period. + isReceivedWithinMillis(billingStartMillis, billingStopMillis) && + !isOccurredWithinMillis(billingStartMillis, billingStopMillis) } def toDebugString(useOnlyInstanceId: Boolean = false): String = { diff --git a/src/main/scala/gr/grnet/aquarium/logic/accounting/Policy.scala b/src/main/scala/gr/grnet/aquarium/logic/accounting/Policy.scala index e1c5143..f79afcb 100644 --- a/src/main/scala/gr/grnet/aquarium/logic/accounting/Policy.scala +++ b/src/main/scala/gr/grnet/aquarium/logic/accounting/Policy.scala @@ -196,9 +196,9 @@ object Policy extends DSL with Loggable { receivedMillis = ts, validFrom = ts) config.policyStore.findPolicyEntry(newPolicy.id) match { - case Just(x) => + case Some(x) => logger.warn("Policy file contents not modified") - case NoVal => + case None => if (!pol.isEmpty) { val toUpdate = pol.last.copy(validTo = ts - 1) config.policyStore.updatePolicyEntry(toUpdate) @@ -206,8 +206,6 @@ object Policy extends DSL with Loggable { } else { config.policyStore.storePolicyEntry(newPolicy) } - case failed @ Failed(e) => - failed.throwMe } } diff --git a/src/main/scala/gr/grnet/aquarium/store/PolicyStore.scala b/src/main/scala/gr/grnet/aquarium/store/PolicyStore.scala index f587271..572f6fa 100644 --- a/src/main/scala/gr/grnet/aquarium/store/PolicyStore.scala +++ b/src/main/scala/gr/grnet/aquarium/store/PolicyStore.scala @@ -95,5 +95,5 @@ trait PolicyStore { /** * Find a policy by its unique id */ - def findPolicyEntry(id: String): Maybe[PolicyEntry] + def findPolicyEntry(id: String): Option[PolicyEntry] } \ No newline at end of file diff --git a/src/main/scala/gr/grnet/aquarium/store/ResourceEventStore.scala b/src/main/scala/gr/grnet/aquarium/store/ResourceEventStore.scala index 787a657..f5da67c 100644 --- a/src/main/scala/gr/grnet/aquarium/store/ResourceEventStore.scala +++ b/src/main/scala/gr/grnet/aquarium/store/ResourceEventStore.scala @@ -76,7 +76,7 @@ trait ResourceEventStore { /** * Count and return the number of "out of sync" events for a billing month. */ - def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Long + def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long /** * Finds all relevant resource events for the billing period. diff --git a/src/main/scala/gr/grnet/aquarium/store/UserStateStore.scala b/src/main/scala/gr/grnet/aquarium/store/UserStateStore.scala index add6394..2ffd1e2 100644 --- a/src/main/scala/gr/grnet/aquarium/store/UserStateStore.scala +++ b/src/main/scala/gr/grnet/aquarium/store/UserStateStore.scala @@ -57,12 +57,10 @@ trait UserStateStore { */ def findUserStateByUserID(userID: String): Option[UserState] - def findLatestUserStateByUserID(userID: String): Option[UserState] - /** * Find the most up-to-date user state for the particular billing period. */ - def findLatestUserStateForEndOfBillingMonth(userId: String, yearOfBillingMonth: Int, billingMonth: Int): Option[UserState] + def findLatestUserStateForEndOfBillingMonth(userID: String, yearOfBillingMonth: Int, billingMonth: Int): Option[UserState] /** * Delete a state for a user diff --git a/src/main/scala/gr/grnet/aquarium/store/memory/MemStore.scala b/src/main/scala/gr/grnet/aquarium/store/memory/MemStore.scala index 26e4bb4..0795355 100644 --- a/src/main/scala/gr/grnet/aquarium/store/memory/MemStore.scala +++ b/src/main/scala/gr/grnet/aquarium/store/memory/MemStore.scala @@ -111,20 +111,6 @@ class MemStore extends UserStateStore _userStates.find(_.userID == userID) } - def findLatestUserStateByUserID(userID: String) = { - val goodOnes = _userStates.filter(_.userID == userID) - - goodOnes.sortWith { - case (us1, us2) ⇒ - us1.occurredMillis > us2.occurredMillis - } match { - case head :: _ ⇒ - Some(head) - case _ ⇒ - None - } - } - def findLatestUserStateForEndOfBillingMonth(userID: String, yearOfBillingMonth: Int, billingMonth: Int): Option[UserState] = { @@ -229,7 +215,7 @@ class MemStore extends UserStateStore }.toList } - def countOutOfSyncEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = { + def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = { _resourceEvents.filter { case ev ⇒ ev.userID == userID && // out of sync events are those that were received in the billing month but occurred in previous (or next?) @@ -338,9 +324,8 @@ class MemStore extends UserStateStore p :: acc } - def findPolicyEntry(id: String) = _policyEntries.find(p => p.id == id) match { - case Some(x) => Just(x) - case None => NoVal + def findPolicyEntry(id: String) = { + _policyEntries.find(p => p.id == id) } } diff --git a/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala b/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala index 0b8d209..617471f 100644 --- a/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala +++ b/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala @@ -184,9 +184,22 @@ class MongoDBStore( MongoDBStore.runQuery[ResourceEvent](query, resourceEvents, orderBy)(MongoDBResourceEvent.fromDBObject)(None) } - def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Long = { - // FIXME: Implement - 0L + def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = { + val query = new BasicDBObjectBuilder(). + add(ResourceEventModel.Names.userID, userID). + // received within the period + add(ResourceEventModel.Names.receivedMillis, new BasicDBObject("$gte", startMillis)). + add(ResourceEventModel.Names.receivedMillis, new BasicDBObject("$lte", stopMillis)). + // occurred outside the period + add("$or", { + val dbList = new BasicDBList() + dbList.add(0, new BasicDBObject(ResourceEventModel.Names.occurredMillis, new BasicDBObject("$lt", startMillis))) + dbList.add(1, new BasicDBObject(ResourceEventModel.Names.occurredMillis, new BasicDBObject("$gt", stopMillis))) + dbList + }). + get() + + resourceEvents.count(query) } def findAllRelevantResourceEventsForBillingPeriod(userId: String, @@ -210,24 +223,25 @@ class MongoDBStore( val query = new BasicDBObject(UserStateJsonNames.userID, userID) val cursor = userStates find query - withCloseable(cursor) { cursor ⇒ - if(cursor.hasNext) - Some(MongoDBStore.dbObjectToUserState(cursor.next())) - else - None - } - } - - - def findLatestUserStateByUserID(userID: String) = { - // FIXME: implement - null + MongoDBStore.firstResultIfExists(cursor, MongoDBStore.dbObjectToUserState) } - def findLatestUserStateForEndOfBillingMonth(userId: String, + def findLatestUserStateForEndOfBillingMonth(userID: String, yearOfBillingMonth: Int, billingMonth: Int): Option[UserState] = { - None // FIXME: implement + val query = new BasicDBObjectBuilder(). + add(UserState.JsonNames.userID, userID). + add(UserState.JsonNames.isFullBillingMonthState, true). + add(UserState.JsonNames.theFullBillingMonth.year, yearOfBillingMonth). + add(UserState.JsonNames.theFullBillingMonth.month, billingMonth). + get() + + // Descending order, so that the latest comes first + val sorter = new BasicDBObject(UserState.JsonNames.occurredMillis, -1) + + val cursor = userStates.find(query).sort(sorter) + + MongoDBStore.firstResultIfExists(cursor, MongoDBStore.dbObjectToUserState) } def deleteUserState(userId: String) = { @@ -271,27 +285,14 @@ class MongoDBStore( // Normally one such event is allowed ... val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, 1)) - withCloseable(cursor) { cursor ⇒ - if(cursor.hasNext) { - Some(MongoDBIMEvent.fromDBObject(cursor.next())) - } else { - None - } - } + MongoDBStore.firstResultIfExists(cursor, MongoDBIMEvent.fromDBObject) } def findLatestIMEventByUserID(userID: String): Option[IMEvent] = { val query = new BasicDBObject(IMEventNames.userID, userID) val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, -1)) - withCloseable(cursor) { cursor ⇒ - if(cursor.hasNext) { - Some(MongoDBIMEvent.fromDBObject(cursor.next())) - } else { - None - } - } - + MongoDBStore.firstResultIfExists(cursor, MongoDBIMEvent.fromDBObject) } /** @@ -408,6 +409,16 @@ object MongoDBStore { UserState.fromJson(JSON.serialize(dbObj)) } + def firstResultIfExists[A](cursor: DBCursor, f: DBObject ⇒ A): Option[A] = { + withCloseable(cursor) { cursor ⇒ + if(cursor.hasNext) { + Some(f(cursor.next())) + } else { + None + } + } + } + def dbObjectToPolicyEntry(dbObj: DBObject): PolicyEntry = { PolicyEntry.fromJson(JSON.serialize(dbObj)) }