X-Git-Url: https://code.grnet.gr/git/aquarium/blobdiff_plain/c11b8ebcbf64910d7c6725b3cdc8b733363c2eda..f2e3cc2beadcdfa86fb63e6b0429d958ec2adb8f:/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala?ds=sidebyside diff --git a/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala b/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala index c27fe2c..1433e48 100644 --- a/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala +++ b/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala @@ -35,32 +35,31 @@ package gr.grnet.aquarium.store.mongodb -import com.mongodb.util.JSON -import gr.grnet.aquarium.computation.UserState.{JsonNames ⇒ UserStateJsonNames} -import gr.grnet.aquarium.util.json.JsonSupport -import collection.mutable.ListBuffer -import gr.grnet.aquarium.event.model.im.IMEventModel -import gr.grnet.aquarium.event.model.im.IMEventModel.{Names ⇒ IMEventNames} -import gr.grnet.aquarium.event.model.resource.ResourceEventModel -import gr.grnet.aquarium.event.model.resource.ResourceEventModel.{Names ⇒ ResourceEventNames} -import gr.grnet.aquarium.store._ -import gr.grnet.aquarium.event.model.PolicyEntry.{JsonNames ⇒ PolicyJsonNames} -import gr.grnet.aquarium.logic.accounting.Policy +import collection.immutable import com.mongodb._ -import org.bson.types.ObjectId -import com.ckkloverdos.maybe.Maybe +import gr.grnet.aquarium.computation.BillingMonthInfo +import gr.grnet.aquarium.converter.StdConverters +import gr.grnet.aquarium.logic.accounting.dsl.Timeslot +import gr.grnet.aquarium.message.MessageConstants +import gr.grnet.aquarium.message.avro.gen.{UserAgreementHistoryMsg, UserStateMsg, IMEventMsg, ResourceEventMsg, PolicyMsg} +import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, OrderingHelpers, AvroHelpers} +import gr.grnet.aquarium.store._ import gr.grnet.aquarium.util._ -import gr.grnet.aquarium.converter.Conversions -import gr.grnet.aquarium.computation.UserState -import gr.grnet.aquarium.event.model.{ExternalEventModel, PolicyEntry} +import gr.grnet.aquarium.util.Once +import gr.grnet.aquarium.util.json.JsonSupport +import gr.grnet.aquarium.{Aquarium, AquariumException} +import org.apache.avro.specific.SpecificRecord +import org.bson.types.ObjectId /** * Mongodb implementation of the various aquarium stores. * * @author Christos KK Loverdos * @author Georgios Gousios + * @author Prodromos Gerakios */ class MongoDBStore( + val aquarium: Aquarium, val mongo: Mongo, val database: String, val username: String, @@ -71,222 +70,263 @@ class MongoDBStore( with PolicyStore with Loggable { - override type IMEvent = MongoDBIMEvent - override type ResourceEvent = MongoDBResourceEvent - - private[store] lazy val resourceEvents = getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION) - private[store] lazy val userStates = getCollection(MongoDBStore.USER_STATES_COLLECTION) - private[store] lazy val imEvents = getCollection(MongoDBStore.IM_EVENTS_COLLECTION) - private[store] lazy val unparsedIMEvents = getCollection(MongoDBStore.UNPARSED_IM_EVENTS_COLLECTION) - private[store] lazy val policyEntries = getCollection(MongoDBStore.POLICY_ENTRIES_COLLECTION) + private[store] lazy val resourceEvents = getCollection(MongoDBStore.ResourceEventCollection) + private[store] lazy val userStates = getCollection(MongoDBStore.UserStateCollection) + private[store] lazy val imEvents = getCollection(MongoDBStore.IMEventCollection) + private[store] lazy val policies = getCollection(MongoDBStore.PolicyCollection) + + private[store] lazy val indicesMap = { + val resev= new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames.id,1). + add(MongoDBStore.JsonNames.userID,1). + add(MongoDBStore.JsonNames.occurredMillis,1). + add(MongoDBStore.JsonNames.receivedMillis,1).get + val imev = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames.userID,1). + add(MongoDBStore.JsonNames.eventType,""). + add(MongoDBStore.JsonNames.occurredMillis,1).get + val policy = new BasicDBObjectBuilder(). + add("validFromMillis",1). + add("validToMillis",1).get + val user = new BasicDBObjectBuilder(). + add( "occurredMillis",1). + add("isFullBillingMonth",false). + add("billingYear",1). + add("billingMonth",1). + add("billingMonthDay",1).get + Map(MongoDBStore.ResourceEventCollection -> resev, + MongoDBStore.IMEventCollection-> imev, + MongoDBStore.PolicyCollection-> policy, + MongoDBStore.UserStateCollection-> user + ) + } + private[this] val once = new Once() + + private[this] def doAuthenticate(db: DB) { + if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) { + throw new AquariumException("Could not authenticate user %s".format(username)) + } + } private[this] def getCollection(name: String): DBCollection = { val db = mongo.getDB(database) - //logger.debug("Authenticating to mongo") - if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) { - throw new StoreException("Could not authenticate user %s".format(username)) + doAuthenticate(db) + once.run { /* this code is thread-safe and will run exactly once*/ + indicesMap.foreach { case (collection,obj) => + mongo.getDB(database).getCollection(collection).createIndex(obj) + } } db.getCollection(name) } - private[this] def _sortByTimestampAsc[A <: ExternalEventModel](one: A, two: A): Boolean = { - if (one.occurredMillis > two.occurredMillis) false - else if (one.occurredMillis < two.occurredMillis) true - else true - } - //+ResourceEventStore - def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent = { - MongoDBResourceEvent.fromOther(event, null) - } - - def pingResourceEventStore(): Unit = { + def pingResourceEventStore(): Unit = synchronized { + getCollection(MongoDBStore.ResourceEventCollection) MongoDBStore.ping(mongo) } - def insertResourceEvent(event: ResourceEventModel) = { - val localEvent = MongoDBResourceEvent.fromOther(event, new ObjectId().toStringMongod) - MongoDBStore.insertObject(localEvent, resourceEvents, MongoDBStore.jsonSupportToDBObject) - localEvent - } + def insertResourceEvent(event: ResourceEventMsg) = { + val mongoID = new ObjectId() + event.setInStoreID(mongoID.toStringMongod) - def findResourceEventById(id: String): Option[ResourceEvent] = { - MongoDBStore.findBy(ResourceEventNames.id, id, resourceEvents, MongoDBResourceEvent.fromDBObject) - } + val dbObject = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames._id, mongoID). + add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)). + add(MongoDBStore.JsonNames.userID, event.getUserID). + add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis). + add(MongoDBStore.JsonNames.receivedMillis, event.getReceivedMillis). + get() - def findResourceEventsByUserId(userId: String) - (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = { - val query = new BasicDBObject(ResourceEventNames.userID, userId) - - MongoDBStore.runQuery(query, resourceEvents)(MongoDBResourceEvent.fromDBObject)(sortWith) + MongoDBStore.insertDBObject(dbObject, resourceEvents) + event } - def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = { - val query = new BasicDBObject() - query.put(ResourceEventNames.userID, userId) - query.put(ResourceEventNames.occurredMillis, new BasicDBObject("$gt", timestamp)) - - val sort = new BasicDBObject(ResourceEventNames.occurredMillis, 1) - - val cursor = resourceEvents.find(query).sort(sort) - - try { - val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent] - while(cursor.hasNext) { - buffer += MongoDBResourceEvent.fromDBObject(cursor.next()) - } - buffer.toList.sortWith(_sortByTimestampAsc) - } finally { - cursor.close() - } + def findResourceEventByID(id: String): Option[ResourceEventMsg] = { + val dbObjectOpt = MongoDBStore.findOneByAttribute(resourceEvents, MongoDBStore.JsonNames.id, id) + for { + dbObject ← dbObjectOpt + payload = dbObject.get(MongoDBStore.JsonNames.payload) + msg = AvroHelpers.specificRecordOfBytes(payload.asInstanceOf[Array[Byte]], new ResourceEventMsg) + } yield msg } - def findResourceEventHistory(userId: String, resName: String, - instid: Option[String], upTo: Long) : List[ResourceEvent] = { - val query = new BasicDBObject() - query.put(ResourceEventNames.userID, userId) - query.put(ResourceEventNames.occurredMillis, new BasicDBObject("$lt", upTo)) - query.put(ResourceEventNames.resource, resName) - - instid match { - case Some(id) => - Policy.policy.findResource(resName) match { - case Some(y) => query.put(ResourceEventNames.details, - new BasicDBObject(y.descriminatorField, instid.get)) - case None => - } - case None => - } + def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = { + val query = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames.userID, userID). + // received within the period + add(MongoDBStore.JsonNames.receivedMillis, new BasicDBObject("$gte", startMillis)). + add(MongoDBStore.JsonNames.receivedMillis, new BasicDBObject("$lte", stopMillis)). + // occurred outside the period + add("$or", { + val dbList = new BasicDBList() + dbList.add(0, new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$lt", startMillis))) + dbList.add(1, new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$gt", stopMillis))) + dbList + }). + get() + + resourceEvents.count(query) + } + + def foreachResourceEventOccurredInPeriod( + userID: String, + startMillis: Long, + stopMillis: Long + )(f: ResourceEventMsg ⇒ Unit): Long = { + var _counter= 0L + val query = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames.userID, userID). + add(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$gte", startMillis)). + add(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$lte", stopMillis)). + get() - val sort = new BasicDBObject(ResourceEventNames.occurredMillis, 1) - val cursor = resourceEvents.find(query).sort(sort) + val sorter = new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1) + val cursor = resourceEvents.find(query).sort(sorter) - try { - val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent] + withCloseable(cursor) { cursor ⇒ while(cursor.hasNext) { - buffer += MongoDBResourceEvent.fromDBObject(cursor.next()) + val nextDBObject = cursor.next() + val payload = nextDBObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]] + val nextEvent = AvroHelpers.specificRecordOfBytes(payload, new ResourceEventMsg) + + f(nextEvent) + _counter += 1 } - buffer.toList.sortWith(_sortByTimestampAsc) - } finally { - cursor.close() } - } - - def findResourceEventsForReceivedPeriod(userId: String, startTimeMillis: Long, stopTimeMillis: Long): List[ResourceEvent] = { - val query = new BasicDBObject() - query.put(ResourceEventNames.userID, userId) - query.put(ResourceEventNames.receivedMillis, new BasicDBObject("$gte", startTimeMillis)) - query.put(ResourceEventNames.receivedMillis, new BasicDBObject("$lte", stopTimeMillis)) - - // Sort them by increasing order for occurred time - val orderBy = new BasicDBObject(ResourceEventNames.occurredMillis, 1) - - MongoDBStore.runQuery[ResourceEvent](query, resourceEvents, orderBy)(MongoDBResourceEvent.fromDBObject)(None) - } - - def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Long = { - // FIXME: Implement - 0L - } - def findAllRelevantResourceEventsForBillingPeriod(userId: String, - startMillis: Long, - stopMillis: Long): List[ResourceEvent] = { - // FIXME: Implement - Nil + _counter } //-ResourceEventStore //+ UserStateStore - def insertUserState(userState: UserState) = { - MongoDBStore.insertUserState(userState, userStates, MongoDBStore.jsonSupportToDBObject) + def findUserStateByUserID(userID: String) = { + val dbObjectOpt = MongoDBStore.findOneByAttribute(userStates, MongoDBStore.JsonNames.userID, userID) + for { + dbObject <- dbObjectOpt + payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]] + msg = AvroHelpers.specificRecordOfBytes(payload, new UserStateMsg) + } yield { + msg + } } - def findUserStateByUserID(userID: String): Option[UserState] = { - val query = new BasicDBObject(UserStateJsonNames.userID, userID) - val cursor = userStates find query + def findLatestUserStateForFullMonthBilling(userID: String, bmi: BillingMonthInfo) = { + val query = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames.userID, userID). + add(MongoDBStore.JsonNames.isForFullMonth, true). + add(MongoDBStore.JsonNames.billingYear, bmi.year). + add(MongoDBStore.JsonNames.billingMonth, bmi.month). + get() + + // Descending order, so that the latest comes first + val sorter = new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, -1) + + val cursor = userStates.find(query).sort(sorter) withCloseable(cursor) { cursor ⇒ - if(cursor.hasNext) - Some(MongoDBStore.dbObjectToUserState(cursor.next())) - else - None + MongoDBStore.findNextPayloadRecord(cursor, new UserStateMsg) } } + def findLatestUserState(userID: String) = { + val query = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames.userID, userID). + get() - def findLatestUserStateByUserID(userID: String) = { - // FIXME: implement - null - } + // Descending order, so that the latest comes first + val sorter = new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, -1) - def findLatestUserStateForEndOfBillingMonth(userId: String, - yearOfBillingMonth: Int, - billingMonth: Int): Option[UserState] = { - None // FIXME: implement + val cursor = userStates.find(query).sort(sorter) + + withCloseable(cursor) { cursor ⇒ + MongoDBStore.findNextPayloadRecord(cursor, new UserStateMsg) + } } - def deleteUserState(userId: String) = { - val query = new BasicDBObject(UserStateJsonNames.userID, userId) - userStates.findAndRemove(query) + /** + * Stores a user state. + */ + def insertUserState(event: UserStateMsg)= { + val mongoID = new ObjectId() + event.setInStoreID(mongoID.toStringMongod) + + val dbObject = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames._id, mongoID). + add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)). + add(MongoDBStore.JsonNames.userID, event.getUserID). + add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis). + add(MongoDBStore.JsonNames.isForFullMonth, event.getIsForFullMonth). + add(MongoDBStore.JsonNames.billingYear, event.getBillingYear). + add(MongoDBStore.JsonNames.billingMonth, event.getBillingMonth). + add(MongoDBStore.JsonNames.billingMonthDay, event.getBillingMonthDay). + get() + + MongoDBStore.insertDBObject(dbObject, userStates) + event } //- UserStateStore //+IMEventStore - def createIMEventFromJson(json: String) = { - MongoDBStore.createIMEventFromJson(json) - } - - def createIMEventFromOther(event: IMEventModel) = { - MongoDBStore.createIMEventFromOther(event) - } - def pingIMEventStore(): Unit = { + getCollection(MongoDBStore.IMEventCollection) MongoDBStore.ping(mongo) } - def insertIMEvent(event: IMEventModel): IMEvent = { - val localEvent = MongoDBIMEvent.fromOther(event, new ObjectId().toStringMongod) - MongoDBStore.insertObject(localEvent, imEvents, MongoDBStore.jsonSupportToDBObject) - localEvent - } + def insertIMEvent(event: IMEventMsg) = { + val mongoID = new ObjectId() + event.setInStoreID(mongoID.toStringMongod) - def findIMEventById(id: String): Option[IMEvent] = { - MongoDBStore.findBy(IMEventNames.id, id, imEvents, MongoDBIMEvent.fromDBObject) - } + val dbObject = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames._id, mongoID). + add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)). + add(MongoDBStore.JsonNames.userID, event.getUserID). + add(MongoDBStore.JsonNames.eventType, event.getEventType().toLowerCase). + add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis). + add(MongoDBStore.JsonNames.receivedMillis, event.getReceivedMillis). + get() - def findLatestIMEventByUserID(userID: String): Option[IMEvent] = { - val query = new BasicDBObject(IMEventNames.userID, userID) - val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, -1)) + MongoDBStore.insertDBObject(dbObject, imEvents) + event + } - withCloseable(cursor) { cursor ⇒ - if(cursor.hasNext) { - Some(MongoDBIMEvent.fromDBObject(cursor.next())) - } else { - None - } + def findIMEventByID(id: String) = { + val dbObjectOpt = MongoDBStore.findOneByAttribute(imEvents, MongoDBStore.JsonNames.id, id) + for { + dbObject ← dbObjectOpt + payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]] + msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg) + } yield { + msg } - } + /** - * Find the very first activation event for a particular user. - * + * Find the `CREATE` even for the given user. Note that there must be only one such event. */ - def findFirstIsActiveIMEventByUserID(userID: String): Option[IMEvent] = { + def findCreateIMEventByUserID(userID: String) = { val query = new BasicDBObjectBuilder(). - add(IMEventNames.userID, userID). - add(IMEventNames.isActive, true).get() + add(MongoDBStore.JsonNames.userID, userID). + add(MongoDBStore.JsonNames.eventType, MessageConstants.IMEventMsg.EventTypes.create).get() - val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, 1)) + // Normally one such event is allowed ... + val cursor = imEvents.find(query).sort(new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1)) - withCloseable(cursor) { cursor ⇒ + val dbObjectOpt = withCloseable(cursor) { cursor ⇒ if(cursor.hasNext) { - Some(MongoDBIMEvent.fromDBObject(cursor.next())) + Some(cursor.next()) } else { None } - } + } + + for { + dbObject <- dbObjectOpt + payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]] + msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg) + } yield { + msg + } } /** @@ -295,225 +335,132 @@ class MongoDBStore( * * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case. */ - def replayIMEventsInOccurrenceOrder(userID: String)(f: (IMEvent) => Unit) = { - val query = new BasicDBObject(IMEventNames.userID, userID) - val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, 1)) + def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEventMsg) ⇒ Boolean) = { + val query = new BasicDBObject(MongoDBStore.JsonNames.userID, userID) + val cursor = imEvents.find(query).sort(new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1)) + var _shouldContinue = true withCloseable(cursor) { cursor ⇒ - while(cursor.hasNext) { - val model = MongoDBIMEvent.fromDBObject(cursor.next()) - f(model) + while(_shouldContinue && cursor.hasNext) { + val dbObject = cursor.next() + val payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]] + val msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg) + + _shouldContinue = f(msg) } } + + _shouldContinue } //-IMEventStore - - //+PolicyStore - def loadPolicyEntriesAfter(after: Long): List[PolicyEntry] = { - val query = new BasicDBObject(PolicyEntry.JsonNames.validFrom, - new BasicDBObject("$gt", after)) - MongoDBStore.runQuery(query, policyEntries)(MongoDBStore.dbObjectToPolicyEntry)(Some(_sortByTimestampAsc)) + def foreachPolicy[U](f: PolicyMsg ⇒ U) { + val cursor = policies.find() + withCloseable(cursor) { cursor ⇒ + while(cursor.hasNext) { + val dbObject = cursor.next() + val payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]] + val policy = AvroHelpers.specificRecordOfBytes(payload, new PolicyMsg) + f(policy) + } + } } - def storePolicyEntry(policy: PolicyEntry): Maybe[RecordID] = MongoDBStore.storePolicyEntry(policy, policyEntries) + def insertPolicy(policy: PolicyMsg): PolicyMsg = { + val mongoID = new ObjectId() + policy.setInStoreID(mongoID.toStringMongod) + val dbObject = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames._id, mongoID). + add(MongoDBStore.JsonNames.validFromMillis, policy.getValidFromMillis). + add(MongoDBStore.JsonNames.validToMillis, policy.getValidToMillis). + add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(policy)). + get() - - def updatePolicyEntry(policy: PolicyEntry) = { - //Find the entry - val query = new BasicDBObject(PolicyEntry.JsonNames.id, policy.id) - val policyObject = MongoDBStore.jsonSupportToDBObject(policy) - policyEntries.update(query, policyObject, true, false) + MongoDBStore.insertDBObject(dbObject, policies) + policy } - - def findPolicyEntry(id: String) = { - MongoDBStore.findBy(PolicyJsonNames.id, id, policyEntries, MongoDBStore.dbObjectToPolicyEntry) + + def loadPolicyAt(atMillis: Long): Option[PolicyMsg] = { + // FIXME Inefficient + var _policies = immutable.TreeSet[PolicyMsg]()(OrderingHelpers.DefaultPolicyMsgOrdering) + foreachPolicy(_policies += _) + _policies.to(MessageFactory.newDummyPolicyMsgAt(atMillis)).lastOption } + def loadSortedPoliciesWithin(fromMillis: Long, toMillis: Long): immutable.SortedMap[Timeslot, PolicyMsg] = { + // FIXME Inefficient + var _policies = immutable.TreeSet[PolicyMsg]()(OrderingHelpers.DefaultPolicyMsgOrdering) + foreachPolicy(_policies += _) + + immutable.SortedMap(_policies. + from(MessageFactory.newDummyPolicyMsgAt(fromMillis)). + to(MessageFactory.newDummyPolicyMsgAt(toMillis)).toSeq. + map(p ⇒ (Timeslot(p.getValidFromMillis, p.getValidToMillis), p)): _* + ) + } //-PolicyStore } object MongoDBStore { - object JsonNames { - final val _id = "_id" - } + final val JsonNames = gr.grnet.aquarium.util.json.JsonNames - /** - * Collection holding the [[gr.grnet.aquarium.event.model.resource.ResourceEventModel]]s. - * - * Resource events are coming from all systems handling billable resources. - */ - final val RESOURCE_EVENTS_COLLECTION = "resevents" + final val collections = List("resevents","userstates","imevents","policies") - /** - * Collection holding the snapshots of [[gr.grnet.aquarium.computation.UserState]]. - * - * [[gr.grnet.aquarium.computation.UserState]] is held internally within [[gr.grnet.aquarium.actor.service.user.UserActor]]s. - */ - final val USER_STATES_COLLECTION = "userstates" + final val ResourceEventCollection = collections(0) - /** - * Collection holding [[gr.grnet.aquarium.event.model.im.IMEventModel]]s. - * - * User events are coming from the IM module (external). - */ - final val IM_EVENTS_COLLECTION = "imevents" - - /** - * Collection holding [[gr.grnet.aquarium.event.model.im.IMEventModel]]s that could not be parsed to normal objects. - * - * We of course assume at least a valid JSON representation. - * - * User events are coming from the IM module (external). - */ - final val UNPARSED_IM_EVENTS_COLLECTION = "unparsed_imevents" - - /** - * Collection holding [[gr.grnet.aquarium.logic.accounting.dsl.DSLPolicy]]. - */ -// final val POLICIES_COLLECTION = "policies" + final val UserStateCollection = collections(1) - /** - * Collection holding [[gr.grnet.aquarium.event.model.PolicyEntry]]. - */ - final val POLICY_ENTRIES_COLLECTION = "policyEntries" - - def dbObjectToUserState(dbObj: DBObject): UserState = { - UserState.fromJson(JSON.serialize(dbObj)) - } - - def dbObjectToPolicyEntry(dbObj: DBObject): PolicyEntry = { - PolicyEntry.fromJson(JSON.serialize(dbObj)) - } - - def ping(mongo: Mongo): Unit = { - // This requires a network roundtrip - mongo.isLocked - } + final val IMEventCollection = collections(2) - def findBy[A >: Null <: AnyRef](name: String, - value: String, - collection: DBCollection, - deserializer: (DBObject) => A) : Option[A] = { - val query = new BasicDBObject(name, value) - val cursor = collection find query + final val PolicyCollection = collections(3) + def firstResultIfExists[A](cursor: DBCursor, f: DBObject ⇒ A): Option[A] = { withCloseable(cursor) { cursor ⇒ - if(cursor.hasNext) - Some(deserializer apply cursor.next) - else + if(cursor.hasNext) { + Some(f(cursor.next())) + } else { None - } - } - - def runQuery[A <: ExternalEventModel](query: DBObject, collection: DBCollection, orderBy: DBObject = null) - (deserializer: (DBObject) => A) - (sortWith: Option[(A, A) => Boolean]): List[A] = { - val cursor0 = collection find query - val cursor = if(orderBy ne null) { - cursor0 sort orderBy - } else { - cursor0 - } // I really know that docs say that it is the same cursor. - - if(!cursor.hasNext) { - cursor.close() - Nil - } else { - val buff = new ListBuffer[A]() - - while(cursor.hasNext) { - buff += deserializer apply cursor.next - } - - cursor.close() - - sortWith match { - case Some(sorter) => buff.toList.sortWith(sorter) - case None => buff.toList } } } - def storeUserState(userState: UserState, collection: DBCollection) = { - storeAny[UserState](userState, collection, ResourceEventNames.userID, _.userID, MongoDBStore.jsonSupportToDBObject) - } - - def storePolicyEntry(policyEntry: PolicyEntry, collection: DBCollection): Maybe[RecordID] = { - Maybe(storeAny[PolicyEntry](policyEntry, collection, PolicyJsonNames.id, _.id, MongoDBStore.jsonSupportToDBObject)) - } - - def storeAny[A](any: A, - collection: DBCollection, - idName: String, - idValueProvider: (A) => String, - serializer: (A) => DBObject) : RecordID = { - - val dbObject = serializer apply any - val _id = new ObjectId() - dbObject.put("_id", _id) - val writeResult = collection.insert(dbObject, WriteConcern.JOURNAL_SAFE) - writeResult.getLastError().throwOnError() - - RecordID(dbObject.get("_id").toString) + def ping(mongo: Mongo): Unit = synchronized { + // This requires a network roundtrip + mongo.isLocked } - // FIXME: consolidate - def insertUserState[A <: UserState](obj: A, collection: DBCollection, serializer: A ⇒ DBObject) = { - val dbObject = serializer apply obj - val objectId = obj._id match { - case null ⇒ - val _id = new ObjectId() - dbObject.put("_id", _id) - _id - - case _id ⇒ - _id + def findOneByAttribute( + collection: DBCollection, + attributeName: String, + attributeValue: String, + sortByOpt: Option[DBObject] = None + ): Option[DBObject] = { + val query = new BasicDBObject(attributeName, attributeValue) + val cursor = sortByOpt match { + case None ⇒ collection find query + case Some(sortBy) ⇒ collection find query sort sortBy } - - dbObject.put(JsonNames._id, objectId) - - collection.insert(dbObject, WriteConcern.JOURNAL_SAFE) - - obj - } - - def insertObject[A <: MongoDBEventModel](obj: A, collection: DBCollection, serializer: (A) => DBObject) : Unit = { - val dbObject = serializer apply obj - val objectId = obj._id match { - case null ⇒ - val _id = new ObjectId() - dbObject.put("_id", _id) - _id - - case _id ⇒ - _id + withCloseable(cursor) { cursor ⇒ + if(cursor.hasNext) Some(cursor.next()) else None } - - dbObject.put(JsonNames._id, objectId) - - collection.insert(dbObject, WriteConcern.JOURNAL_SAFE) - } - - def jsonSupportToDBObject(jsonSupport: JsonSupport) = { - Conversions.jsonSupportToDBObject(jsonSupport) } - final def isLocalIMEvent(event: IMEventModel) = event match { - case _: MongoDBIMEvent ⇒ true - case _ ⇒ false + def insertDBObject(dbObj: DBObject, collection: DBCollection) { + collection.insert(dbObj, WriteConcern.JOURNAL_SAFE) } - final def createIMEventFromJson(json: String) = { - MongoDBIMEvent.fromJsonString(json) - } - - final def createIMEventFromOther(event: IMEventModel) = { - MongoDBIMEvent.fromOther(event, null) + def findNextPayloadRecord[R <: SpecificRecord](cursor: DBCursor, fresh: R): Option[R] = { + for { + dbObject <- if(cursor.hasNext) Some(cursor.next()) else None + payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]] + msg = AvroHelpers.specificRecordOfBytes(payload, fresh) + } yield { + msg + } } - final def createIMEventFromJsonBytes(jsonBytes: Array[Byte]) = { - MongoDBIMEvent.fromJsonBytes(jsonBytes) + def jsonSupportToDBObject(jsonSupport: JsonSupport) = { + StdConverters.AllConverters.convertEx[DBObject](jsonSupport) } }