X-Git-Url: https://code.grnet.gr/git/aquarium/blobdiff_plain/04b151bace0f785da9a2ff50633fded2674ff6c7..f2e3cc2beadcdfa86fb63e6b0429d958ec2adb8f:/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala diff --git a/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala b/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala index 446f0a2..1433e48 100644 --- a/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala +++ b/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala @@ -35,466 +35,432 @@ package gr.grnet.aquarium.store.mongodb -import gr.grnet.aquarium.util.Loggable -import com.mongodb.util.JSON -import gr.grnet.aquarium.user.UserState -import gr.grnet.aquarium.user.UserState.{JsonNames => UserStateJsonNames} -import gr.grnet.aquarium.util.displayableObjectInfo -import gr.grnet.aquarium.util.json.JsonSupport -import collection.mutable.ListBuffer -import gr.grnet.aquarium.store._ -import gr.grnet.aquarium.logic.events.ResourceEvent.{JsonNames => ResourceJsonNames} -import gr.grnet.aquarium.logic.events.UserEvent.{JsonNames => UserEventJsonNames} -import gr.grnet.aquarium.logic.events.WalletEntry.{JsonNames => WalletJsonNames} -import gr.grnet.aquarium.logic.events.PolicyEntry.{JsonNames => PolicyJsonNames} -import java.util.Date -import gr.grnet.aquarium.logic.accounting.Policy -import gr.grnet.aquarium.logic.events._ +import collection.immutable import com.mongodb._ -import com.ckkloverdos.maybe.{NoVal, Maybe} -import gr.grnet.aquarium.logic.accounting.dsl.{DSLResource, Timeslot, DSLPolicy, DSLComplexResource} +import gr.grnet.aquarium.computation.BillingMonthInfo +import gr.grnet.aquarium.converter.StdConverters +import gr.grnet.aquarium.logic.accounting.dsl.Timeslot +import gr.grnet.aquarium.message.MessageConstants +import gr.grnet.aquarium.message.avro.gen.{UserAgreementHistoryMsg, UserStateMsg, IMEventMsg, ResourceEventMsg, PolicyMsg} +import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, OrderingHelpers, AvroHelpers} +import gr.grnet.aquarium.store._ +import gr.grnet.aquarium.util._ +import gr.grnet.aquarium.util.Once +import gr.grnet.aquarium.util.json.JsonSupport +import gr.grnet.aquarium.{Aquarium, AquariumException} +import org.apache.avro.specific.SpecificRecord +import org.bson.types.ObjectId /** * Mongodb implementation of the various aquarium stores. * * @author Christos KK Loverdos * @author Georgios Gousios + * @author Prodromos Gerakios */ class MongoDBStore( + val aquarium: Aquarium, val mongo: Mongo, val database: String, val username: String, val password: String) extends ResourceEventStore with UserStateStore - with WalletEntryStore - with UserEventStore + with IMEventStore with PolicyStore with Loggable { - private[store] lazy val resourceEvents = getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION) - private[store] lazy val userStates = getCollection(MongoDBStore.USER_STATES_COLLECTION) - private[store] lazy val userEvents = getCollection(MongoDBStore.USER_EVENTS_COLLECTION) - private[store] lazy val walletEntries = getCollection(MongoDBStore.WALLET_ENTRIES_COLLECTION) -// private[store] lazy val policies = getCollection(MongoDBStore.POLICIES_COLLECTION) - private[store] lazy val policyEntries = getCollection(MongoDBStore.POLICY_ENTRIES_COLLECTION) + private[store] lazy val resourceEvents = getCollection(MongoDBStore.ResourceEventCollection) + private[store] lazy val userStates = getCollection(MongoDBStore.UserStateCollection) + private[store] lazy val imEvents = getCollection(MongoDBStore.IMEventCollection) + private[store] lazy val policies = getCollection(MongoDBStore.PolicyCollection) + + private[store] lazy val indicesMap = { + val resev= new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames.id,1). + add(MongoDBStore.JsonNames.userID,1). + add(MongoDBStore.JsonNames.occurredMillis,1). + add(MongoDBStore.JsonNames.receivedMillis,1).get + val imev = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames.userID,1). + add(MongoDBStore.JsonNames.eventType,""). + add(MongoDBStore.JsonNames.occurredMillis,1).get + val policy = new BasicDBObjectBuilder(). + add("validFromMillis",1). + add("validToMillis",1).get + val user = new BasicDBObjectBuilder(). + add( "occurredMillis",1). + add("isFullBillingMonth",false). + add("billingYear",1). + add("billingMonth",1). + add("billingMonthDay",1).get + Map(MongoDBStore.ResourceEventCollection -> resev, + MongoDBStore.IMEventCollection-> imev, + MongoDBStore.PolicyCollection-> policy, + MongoDBStore.UserStateCollection-> user + ) + } + private[this] val once = new Once() + + private[this] def doAuthenticate(db: DB) { + if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) { + throw new AquariumException("Could not authenticate user %s".format(username)) + } + } private[this] def getCollection(name: String): DBCollection = { val db = mongo.getDB(database) - //logger.debug("Authenticating to mongo") - if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) { - throw new StoreException("Could not authenticate user %s".format(username)) + doAuthenticate(db) + once.run { /* this code is thread-safe and will run exactly once*/ + indicesMap.foreach { case (collection,obj) => + mongo.getDB(database).getCollection(collection).createIndex(obj) + } } db.getCollection(name) } - private[this] def _sortByTimestampAsc[A <: AquariumEvent](one: A, two: A): Boolean = { - if (one.occurredMillis > two.occurredMillis) false - else if (one.occurredMillis < two.occurredMillis) true - else true - } - - private[this] def _sortByTimestampDesc[A <: AquariumEvent](one: A, two: A): Boolean = { - if (one.occurredMillis < two.occurredMillis) false - else if (one.occurredMillis > two.occurredMillis) true - else true - } - //+ResourceEventStore - def storeResourceEvent(event: ResourceEvent): Maybe[RecordID] = - MongoDBStore.storeAquariumEvent(event, resourceEvents) - - def findResourceEventById(id: String): Maybe[ResourceEvent] = - MongoDBStore.findById(id, resourceEvents, MongoDBStore.dbObjectToResourceEvent) - - def findResourceEventsByUserId(userId: String) - (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = { - val query = new BasicDBObject(ResourceJsonNames.userId, userId) - - MongoDBStore.runQuery(query, resourceEvents)(MongoDBStore.dbObjectToResourceEvent)(sortWith) - } - - def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = { - val query = new BasicDBObject() - query.put(ResourceJsonNames.userId, userId) - query.put(ResourceJsonNames.occurredMillis, new BasicDBObject("$gt", timestamp)) - - val sort = new BasicDBObject(ResourceJsonNames.occurredMillis, 1) - - val cursor = resourceEvents.find(query).sort(sort) - - try { - val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent] + def pingResourceEventStore(): Unit = synchronized { + getCollection(MongoDBStore.ResourceEventCollection) + MongoDBStore.ping(mongo) + } + + def insertResourceEvent(event: ResourceEventMsg) = { + val mongoID = new ObjectId() + event.setInStoreID(mongoID.toStringMongod) + + val dbObject = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames._id, mongoID). + add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)). + add(MongoDBStore.JsonNames.userID, event.getUserID). + add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis). + add(MongoDBStore.JsonNames.receivedMillis, event.getReceivedMillis). + get() + + MongoDBStore.insertDBObject(dbObject, resourceEvents) + event + } + + def findResourceEventByID(id: String): Option[ResourceEventMsg] = { + val dbObjectOpt = MongoDBStore.findOneByAttribute(resourceEvents, MongoDBStore.JsonNames.id, id) + for { + dbObject ← dbObjectOpt + payload = dbObject.get(MongoDBStore.JsonNames.payload) + msg = AvroHelpers.specificRecordOfBytes(payload.asInstanceOf[Array[Byte]], new ResourceEventMsg) + } yield msg + } + + def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = { + val query = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames.userID, userID). + // received within the period + add(MongoDBStore.JsonNames.receivedMillis, new BasicDBObject("$gte", startMillis)). + add(MongoDBStore.JsonNames.receivedMillis, new BasicDBObject("$lte", stopMillis)). + // occurred outside the period + add("$or", { + val dbList = new BasicDBList() + dbList.add(0, new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$lt", startMillis))) + dbList.add(1, new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$gt", stopMillis))) + dbList + }). + get() + + resourceEvents.count(query) + } + + def foreachResourceEventOccurredInPeriod( + userID: String, + startMillis: Long, + stopMillis: Long + )(f: ResourceEventMsg ⇒ Unit): Long = { + var _counter= 0L + val query = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames.userID, userID). + add(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$gte", startMillis)). + add(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$lte", stopMillis)). + get() + + val sorter = new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1) + val cursor = resourceEvents.find(query).sort(sorter) + + withCloseable(cursor) { cursor ⇒ while(cursor.hasNext) { - buffer += MongoDBStore.dbObjectToResourceEvent(cursor.next()) - } - buffer.toList.sortWith(_sortByTimestampAsc) - } finally { - cursor.close() - } - } + val nextDBObject = cursor.next() + val payload = nextDBObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]] + val nextEvent = AvroHelpers.specificRecordOfBytes(payload, new ResourceEventMsg) - def findResourceEventHistory(userId: String, resName: String, - instid: Option[String], upTo: Long) : List[ResourceEvent] = { - val query = new BasicDBObject() - query.put(ResourceJsonNames.userId, userId) - query.put(ResourceJsonNames.occurredMillis, new BasicDBObject("$lt", upTo)) - query.put(ResourceJsonNames.resource, resName) - - instid match { - case Some(id) => - Policy.policy.findResource(resName) match { - case Some(y) => query.put(ResourceJsonNames.details, - new BasicDBObject(y.descriminatorField, instid.get)) - case None => - } - case None => + f(nextEvent) + _counter += 1 + } } - val sort = new BasicDBObject(ResourceJsonNames.occurredMillis, 1) - val cursor = resourceEvents.find(query).sort(sort) + _counter + } + //-ResourceEventStore - try { - val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent] - while(cursor.hasNext) { - buffer += MongoDBStore.dbObjectToResourceEvent(cursor.next()) - } - buffer.toList.sortWith(_sortByTimestampAsc) - } finally { - cursor.close() + //+ UserStateStore + def findUserStateByUserID(userID: String) = { + val dbObjectOpt = MongoDBStore.findOneByAttribute(userStates, MongoDBStore.JsonNames.userID, userID) + for { + dbObject <- dbObjectOpt + payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]] + msg = AvroHelpers.specificRecordOfBytes(payload, new UserStateMsg) + } yield { + msg } } - def findResourceEventsForReceivedPeriod(userId: String, startTimeMillis: Long, stopTimeMillis: Long): List[ResourceEvent] = { - val query = new BasicDBObject() - query.put(ResourceJsonNames.userId, userId) - query.put(ResourceJsonNames.receivedMillis, new BasicDBObject("$gte", startTimeMillis)) - query.put(ResourceJsonNames.receivedMillis, new BasicDBObject("$lte", stopTimeMillis)) + def findLatestUserStateForFullMonthBilling(userID: String, bmi: BillingMonthInfo) = { + val query = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames.userID, userID). + add(MongoDBStore.JsonNames.isForFullMonth, true). + add(MongoDBStore.JsonNames.billingYear, bmi.year). + add(MongoDBStore.JsonNames.billingMonth, bmi.month). + get() - // Sort them by increasing order for occurred time - val orderBy = new BasicDBObject(ResourceJsonNames.occurredMillis, 1) + // Descending order, so that the latest comes first + val sorter = new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, -1) - MongoDBStore.runQuery[ResourceEvent](query, resourceEvents, orderBy)(MongoDBStore.dbObjectToResourceEvent)(None) - } - - def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Maybe[Long] = { - Maybe { - // FIXME: Implement - 0L + val cursor = userStates.find(query).sort(sorter) + + withCloseable(cursor) { cursor ⇒ + MongoDBStore.findNextPayloadRecord(cursor, new UserStateMsg) } } - def findAllRelevantResourceEventsForBillingPeriod(userId: String, - startMillis: Long, - stopMillis: Long): List[ResourceEvent] = { - // FIXME: Implement - Nil - } - //-ResourceEventStore + def findLatestUserState(userID: String) = { + val query = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames.userID, userID). + get() - //+ UserStateStore - def storeUserState(userState: UserState): Maybe[RecordID] = { - MongoDBStore.storeUserState(userState, userStates) - } + // Descending order, so that the latest comes first + val sorter = new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, -1) - def findUserStateByUserId(userId: String): Maybe[UserState] = { - Maybe { - val query = new BasicDBObject(UserStateJsonNames.userId, userId) - val cursor = userStates find query - - try { - if(cursor.hasNext) - MongoDBStore.dbObjectToUserState(cursor.next()) - else - null - } finally { - cursor.close() - } - } - } + val cursor = userStates.find(query).sort(sorter) - def findLatestUserStateForEndOfBillingMonth(userId: String, - yearOfBillingMonth: Int, - billingMonth: Int): Maybe[UserState] = { - NoVal // FIXME: implement + withCloseable(cursor) { cursor ⇒ + MongoDBStore.findNextPayloadRecord(cursor, new UserStateMsg) + } } - def deleteUserState(userId: String) = { - val query = new BasicDBObject(UserStateJsonNames.userId, userId) - userStates.findAndRemove(query) + /** + * Stores a user state. + */ + def insertUserState(event: UserStateMsg)= { + val mongoID = new ObjectId() + event.setInStoreID(mongoID.toStringMongod) + + val dbObject = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames._id, mongoID). + add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)). + add(MongoDBStore.JsonNames.userID, event.getUserID). + add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis). + add(MongoDBStore.JsonNames.isForFullMonth, event.getIsForFullMonth). + add(MongoDBStore.JsonNames.billingYear, event.getBillingYear). + add(MongoDBStore.JsonNames.billingMonth, event.getBillingMonth). + add(MongoDBStore.JsonNames.billingMonthDay, event.getBillingMonthDay). + get() + + MongoDBStore.insertDBObject(dbObject, userStates) + event } //- UserStateStore - //+WalletEntryStore - def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] = - MongoDBStore.storeAquariumEvent(entry, walletEntries) - - def findWalletEntryById(id: String): Maybe[WalletEntry] = - MongoDBStore.findById[WalletEntry](id, walletEntries, MongoDBStore.dbObjectToWalletEntry) - - def findUserWalletEntries(userId: String) = { - // TODO: optimize - findUserWalletEntriesFromTo(userId, new Date(0), new Date(Int.MaxValue)) + //+IMEventStore + def pingIMEventStore(): Unit = { + getCollection(MongoDBStore.IMEventCollection) + MongoDBStore.ping(mongo) + } + + def insertIMEvent(event: IMEventMsg) = { + val mongoID = new ObjectId() + event.setInStoreID(mongoID.toStringMongod) + + val dbObject = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames._id, mongoID). + add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)). + add(MongoDBStore.JsonNames.userID, event.getUserID). + add(MongoDBStore.JsonNames.eventType, event.getEventType().toLowerCase). + add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis). + add(MongoDBStore.JsonNames.receivedMillis, event.getReceivedMillis). + get() + + MongoDBStore.insertDBObject(dbObject, imEvents) + event + } + + def findIMEventByID(id: String) = { + val dbObjectOpt = MongoDBStore.findOneByAttribute(imEvents, MongoDBStore.JsonNames.id, id) + for { + dbObject ← dbObjectOpt + payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]] + msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg) + } yield { + msg + } } - def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date) : List[WalletEntry] = { - val q = new BasicDBObject() - // TODO: Is this the correct way for an AND query? - q.put(WalletJsonNames.occurredMillis, new BasicDBObject("$gt", from.getTime)) - q.put(WalletJsonNames.occurredMillis, new BasicDBObject("$lt", to.getTime)) - q.put(WalletJsonNames.userId, userId) - - MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc)) - } - def findWalletEntriesAfter(userId: String, from: Date) : List[WalletEntry] = { - val q = new BasicDBObject() - q.put(WalletJsonNames.occurredMillis, new BasicDBObject("$gt", from.getTime)) - q.put(WalletJsonNames.userId, userId) + /** + * Find the `CREATE` even for the given user. Note that there must be only one such event. + */ + def findCreateIMEventByUserID(userID: String) = { + val query = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames.userID, userID). + add(MongoDBStore.JsonNames.eventType, MessageConstants.IMEventMsg.EventTypes.create).get() + + // Normally one such event is allowed ... + val cursor = imEvents.find(query).sort(new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1)) + + val dbObjectOpt = withCloseable(cursor) { cursor ⇒ + if(cursor.hasNext) { + Some(cursor.next()) + } else { + None + } + } - MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc)) + for { + dbObject <- dbObjectOpt + payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]] + msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg) + } yield { + msg + } } - def findLatestUserWalletEntries(userId: String) = { - Maybe { - val orderBy = new BasicDBObject(WalletJsonNames.occurredMillis, -1) // -1 is descending order - val cursor = walletEntries.find().sort(orderBy) - - try { - val buffer = new scala.collection.mutable.ListBuffer[WalletEntry] - if(cursor.hasNext) { - val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next()) - buffer += walletEntry - - var _previousOccurredMillis = walletEntry.occurredMillis - var _ok = true - - while(cursor.hasNext && _ok) { - val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next()) - var currentOccurredMillis = walletEntry.occurredMillis - _ok = currentOccurredMillis == _previousOccurredMillis - - if(_ok) { - buffer += walletEntry - } - } - - buffer.toList - } else { - null - } - } finally { - cursor.close() + /** + * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through + * the given function `f`. + * + * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case. + */ + def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEventMsg) ⇒ Boolean) = { + val query = new BasicDBObject(MongoDBStore.JsonNames.userID, userID) + val cursor = imEvents.find(query).sort(new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1)) + + var _shouldContinue = true + withCloseable(cursor) { cursor ⇒ + while(_shouldContinue && cursor.hasNext) { + val dbObject = cursor.next() + val payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]] + val msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg) + + _shouldContinue = f(msg) } } + + _shouldContinue } + //-IMEventStore - def findPreviousEntry(userId: String, resource: String, - instanceId: String, - finalized: Option[Boolean]): List[WalletEntry] = { - val q = new BasicDBObject() - q.put(WalletJsonNames.userId, userId) - q.put(WalletJsonNames.resource, resource) - q.put(WalletJsonNames.instanceId, instanceId) - finalized match { - case Some(x) => q.put(WalletJsonNames.finalized, x) - case None => + //+PolicyStore + def foreachPolicy[U](f: PolicyMsg ⇒ U) { + val cursor = policies.find() + withCloseable(cursor) { cursor ⇒ + while(cursor.hasNext) { + val dbObject = cursor.next() + val payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]] + val policy = AvroHelpers.specificRecordOfBytes(payload, new PolicyMsg) + f(policy) + } } - - MongoDBStore.runQuery[WalletEntry](q, walletEntries)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc)) } - //-WalletEntryStore - - //+UserEventStore - def storeUserEvent(event: UserEvent): Maybe[RecordID] = - MongoDBStore.storeAny[UserEvent](event, userEvents, UserEventJsonNames.userID, - _.userID, MongoDBStore.jsonSupportToDBObject) - - def findUserEventById(id: String): Maybe[UserEvent] = - MongoDBStore.findById[UserEvent](id, userEvents, MongoDBStore.dbObjectToUserEvent) + def insertPolicy(policy: PolicyMsg): PolicyMsg = { + val mongoID = new ObjectId() + policy.setInStoreID(mongoID.toStringMongod) + val dbObject = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames._id, mongoID). + add(MongoDBStore.JsonNames.validFromMillis, policy.getValidFromMillis). + add(MongoDBStore.JsonNames.validToMillis, policy.getValidToMillis). + add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(policy)). + get() - def findUserEventsByUserId(userId: String): List[UserEvent] = { - val query = new BasicDBObject(UserEventJsonNames.userID, userId) - MongoDBStore.runQuery(query, userEvents)(MongoDBStore.dbObjectToUserEvent)(Some(_sortByTimestampAsc)) + MongoDBStore.insertDBObject(dbObject, policies) + policy } - //-UserEventStore - //+PolicyStore - def loadPolicyEntriesAfter(after: Long): List[PolicyEntry] = { - val query = new BasicDBObject(PolicyEntry.JsonNames.validFrom, - new BasicDBObject("$gt", after)) - MongoDBStore.runQuery(query, policyEntries)(MongoDBStore.dbObjectToPolicyEntry)(Some(_sortByTimestampAsc)) + def loadPolicyAt(atMillis: Long): Option[PolicyMsg] = { + // FIXME Inefficient + var _policies = immutable.TreeSet[PolicyMsg]()(OrderingHelpers.DefaultPolicyMsgOrdering) + foreachPolicy(_policies += _) + _policies.to(MessageFactory.newDummyPolicyMsgAt(atMillis)).lastOption } - def storePolicyEntry(policy: PolicyEntry): Maybe[RecordID] = MongoDBStore.storePolicyEntry(policy, policyEntries) - + def loadSortedPoliciesWithin(fromMillis: Long, toMillis: Long): immutable.SortedMap[Timeslot, PolicyMsg] = { + // FIXME Inefficient + var _policies = immutable.TreeSet[PolicyMsg]()(OrderingHelpers.DefaultPolicyMsgOrdering) + foreachPolicy(_policies += _) - def updatePolicyEntry(policy: PolicyEntry) = { - //Find the entry - val query = new BasicDBObject(PolicyEntry.JsonNames.id, policy.id) - val policyObject = MongoDBStore.jsonSupportToDBObject(policy) - policyEntries.update(query, policyObject, true, false) + immutable.SortedMap(_policies. + from(MessageFactory.newDummyPolicyMsgAt(fromMillis)). + to(MessageFactory.newDummyPolicyMsgAt(toMillis)).toSeq. + map(p ⇒ (Timeslot(p.getValidFromMillis, p.getValidToMillis), p)): _* + ) } - - def findPolicyEntry(id: String) = - MongoDBStore.findById[PolicyEntry](id, policyEntries, MongoDBStore.dbObjectToPolicyEntry) - //-PolicyStore } object MongoDBStore { - object JsonNames { - final val _id = "_id" - } + final val JsonNames = gr.grnet.aquarium.util.json.JsonNames - /** - * Collection holding the [[gr.grnet.aquarium.logic.events.ResourceEvent]]s. - * - * Resource events are coming from all systems handling billable resources. - */ - final val RESOURCE_EVENTS_COLLECTION = "resevents" + final val collections = List("resevents","userstates","imevents","policies") - /** - * Collection holding the snapshots of [[gr.grnet.aquarium.user.UserState]]. - * - * [[gr.grnet.aquarium.user.UserState]] is held internally within [[gr.grnet.aquarium.user.actor.UserActor]]s. - */ - final val USER_STATES_COLLECTION = "userstates" + final val ResourceEventCollection = collections(0) - /** - * Collection holding [[gr.grnet.aquarium.logic.events.UserEvent]]s. - * - * User events are coming from the IM module (external). - */ - final val USER_EVENTS_COLLECTION = "userevents" + final val UserStateCollection = collections(1) - /** - * Collection holding [[gr.grnet.aquarium.logic.events.WalletEntry]]. - * - * Wallet entries are generated internally in Aquarium. - */ - final val WALLET_ENTRIES_COLLECTION = "wallets" + final val IMEventCollection = collections(2) - /** - * Collection holding [[gr.grnet.aquarium.logic.accounting.dsl.DSLPolicy]]. - */ -// final val POLICIES_COLLECTION = "policies" - - /** - * Collection holding [[gr.grnet.aquarium.logic.events.PolicyEntry]]. - */ - final val POLICY_ENTRIES_COLLECTION = "policyEntries" - - /* TODO: Some of the following methods rely on JSON (de-)serialization). - * A method based on proper object serialization would be much faster. - */ - def dbObjectToResourceEvent(dbObject: DBObject): ResourceEvent = { - ResourceEvent.fromJson(JSON.serialize(dbObject)) - } + final val PolicyCollection = collections(3) - def dbObjectToUserState(dbObj: DBObject): UserState = { - UserState.fromJson(JSON.serialize(dbObj)) - } - - def dbObjectToWalletEntry(dbObj: DBObject): WalletEntry = { - WalletEntry.fromJson(JSON.serialize(dbObj)) - } - - def dbObjectToUserEvent(dbObj: DBObject): UserEvent = { - UserEvent.fromJson(JSON.serialize(dbObj)) + def firstResultIfExists[A](cursor: DBCursor, f: DBObject ⇒ A): Option[A] = { + withCloseable(cursor) { cursor ⇒ + if(cursor.hasNext) { + Some(f(cursor.next())) + } else { + None + } + } } - def dbObjectToPolicyEntry(dbObj: DBObject): PolicyEntry = { - PolicyEntry.fromJson(JSON.serialize(dbObj)) + def ping(mongo: Mongo): Unit = synchronized { + // This requires a network roundtrip + mongo.isLocked } - def findById[A >: Null <: AquariumEvent](id: String, collection: DBCollection, deserializer: (DBObject) => A) : Maybe[A] = Maybe { - val query = new BasicDBObject(ResourceJsonNames.id, id) - val cursor = collection find query - - try { - if(cursor.hasNext) - deserializer apply cursor.next - else - null: A // will be transformed to NoVal by the Maybe polymorphic constructor - } finally { - cursor.close() + def findOneByAttribute( + collection: DBCollection, + attributeName: String, + attributeValue: String, + sortByOpt: Option[DBObject] = None + ): Option[DBObject] = { + val query = new BasicDBObject(attributeName, attributeValue) + val cursor = sortByOpt match { + case None ⇒ collection find query + case Some(sortBy) ⇒ collection find query sort sortBy } - } - - def runQuery[A <: AquariumEvent](query: DBObject, collection: DBCollection, orderBy: DBObject = null) - (deserializer: (DBObject) => A) - (sortWith: Option[(A, A) => Boolean]): List[A] = { - val cursor0 = collection find query - val cursor = if(orderBy ne null) { - cursor0 sort orderBy - } else { - cursor0 - } // I really know that docs say that it is the same cursor. - - if(!cursor.hasNext) { - cursor.close() - Nil - } else { - val buff = new ListBuffer[A]() - - while(cursor.hasNext) { - buff += deserializer apply cursor.next - } - - cursor.close() - - sortWith match { - case Some(sorter) => buff.toList.sortWith(sorter) - case None => buff.toList - } + withCloseable(cursor) { cursor ⇒ + if(cursor.hasNext) Some(cursor.next()) else None } } - def storeAquariumEvent[A <: AquariumEvent](event: A, collection: DBCollection) : Maybe[RecordID] = { - storeAny[A](event, collection, ResourceJsonNames.id, (e) => e.id, MongoDBStore.jsonSupportToDBObject) - } - - def storeUserState(userState: UserState, collection: DBCollection): Maybe[RecordID] = { - storeAny[UserState](userState, collection, ResourceJsonNames.userId, _.userId, MongoDBStore.jsonSupportToDBObject) - } - - def storePolicyEntry(policyEntry: PolicyEntry, collection: DBCollection): Maybe[RecordID] = { - storeAny[PolicyEntry](policyEntry, collection, PolicyJsonNames.id, _.id, MongoDBStore.jsonSupportToDBObject) + def insertDBObject(dbObj: DBObject, collection: DBCollection) { + collection.insert(dbObj, WriteConcern.JOURNAL_SAFE) } - def storeAny[A](any: A, - collection: DBCollection, - idName: String, - idValueProvider: (A) => String, - serializer: (A) => DBObject) : Maybe[RecordID] = { - import com.ckkloverdos.maybe.effect - - Maybe { - val dbObj = serializer apply any - val writeResult = collection insert dbObj - writeResult.getLastError().throwOnError() - - // Get back to retrieve unique id - val cursor = collection.find(new BasicDBObject(idName, idValueProvider(any))) - cursor - } flatMap { cursor ⇒ - effect { - if(cursor.hasNext) - RecordID(cursor.next().get(JsonNames._id).toString) - else - throw new StoreException("Could not store %s to %s".format(any, collection)) - } {} { cursor.close() } + def findNextPayloadRecord[R <: SpecificRecord](cursor: DBCursor, fresh: R): Option[R] = { + for { + dbObject <- if(cursor.hasNext) Some(cursor.next()) else None + payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]] + msg = AvroHelpers.specificRecordOfBytes(payload, fresh) + } yield { + msg } } - def jsonSupportToDBObject(any: JsonSupport): DBObject = { - JSON.parse(any.toJson) match { - case dbObject: DBObject ⇒ - dbObject - case _ ⇒ - throw new StoreException("Could not transform %s -> %s".format(displayableObjectInfo(any), classOf[DBObject].getName)) - } + def jsonSupportToDBObject(jsonSupport: JsonSupport) = { + StdConverters.AllConverters.convertEx[DBObject](jsonSupport) } }