X-Git-Url: https://code.grnet.gr/git/aquarium/blobdiff_plain/ee343d6670de15600afb4b954a6eec810090ab48..f2e3cc2beadcdfa86fb63e6b0429d958ec2adb8f:/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala diff --git a/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala b/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala index 4991550..1433e48 100644 --- a/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala +++ b/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala @@ -1,5 +1,5 @@ /* - * Copyright 2011 GRNET S.A. All rights reserved. + * Copyright 2011-2012 GRNET S.A. All rights reserved. * * Redistribution and use in source and binary forms, with or * without modification, are permitted provided that the following @@ -35,296 +35,432 @@ package gr.grnet.aquarium.store.mongodb -import gr.grnet.aquarium.util.Loggable -import com.mongodb.util.JSON -import gr.grnet.aquarium.user.UserState -import gr.grnet.aquarium.util.displayableObjectInfo -import gr.grnet.aquarium.util.json.JsonSupport -import collection.mutable.ListBuffer -import gr.grnet.aquarium.store._ -import gr.grnet.aquarium.logic.events.ResourceEvent.JsonNames -import java.util.Date -import com.ckkloverdos.maybe.Maybe +import collection.immutable import com.mongodb._ -import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry, ResourceEvent, AquariumEvent} +import gr.grnet.aquarium.computation.BillingMonthInfo +import gr.grnet.aquarium.converter.StdConverters +import gr.grnet.aquarium.logic.accounting.dsl.Timeslot +import gr.grnet.aquarium.message.MessageConstants +import gr.grnet.aquarium.message.avro.gen.{UserAgreementHistoryMsg, UserStateMsg, IMEventMsg, ResourceEventMsg, PolicyMsg} +import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, OrderingHelpers, AvroHelpers} +import gr.grnet.aquarium.store._ +import gr.grnet.aquarium.util._ +import gr.grnet.aquarium.util.Once +import gr.grnet.aquarium.util.json.JsonSupport +import gr.grnet.aquarium.{Aquarium, AquariumException} +import org.apache.avro.specific.SpecificRecord +import org.bson.types.ObjectId /** * Mongodb implementation of the various aquarium stores. * * @author Christos KK Loverdos * @author Georgios Gousios + * @author Prodromos Gerakios */ class MongoDBStore( + val aquarium: Aquarium, val mongo: Mongo, val database: String, val username: String, val password: String) - extends ResourceEventStore with UserStateStore - with WalletEntryStore with UserEventStore + extends ResourceEventStore + with UserStateStore + with IMEventStore + with PolicyStore with Loggable { - private[store] lazy val rcevents: DBCollection = getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION) - private[store] lazy val users: DBCollection = getCollection(MongoDBStore.USERS_COLLECTION) - private[store] lazy val imevents: DBCollection = getCollection(MongoDBStore.IM_EVENTS_COLLECTION) - private[store] lazy val wallets: DBCollection = getCollection(MongoDBStore.IM_WALLETS) + private[store] lazy val resourceEvents = getCollection(MongoDBStore.ResourceEventCollection) + private[store] lazy val userStates = getCollection(MongoDBStore.UserStateCollection) + private[store] lazy val imEvents = getCollection(MongoDBStore.IMEventCollection) + private[store] lazy val policies = getCollection(MongoDBStore.PolicyCollection) + + private[store] lazy val indicesMap = { + val resev= new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames.id,1). + add(MongoDBStore.JsonNames.userID,1). + add(MongoDBStore.JsonNames.occurredMillis,1). + add(MongoDBStore.JsonNames.receivedMillis,1).get + val imev = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames.userID,1). + add(MongoDBStore.JsonNames.eventType,""). + add(MongoDBStore.JsonNames.occurredMillis,1).get + val policy = new BasicDBObjectBuilder(). + add("validFromMillis",1). + add("validToMillis",1).get + val user = new BasicDBObjectBuilder(). + add( "occurredMillis",1). + add("isFullBillingMonth",false). + add("billingYear",1). + add("billingMonth",1). + add("billingMonthDay",1).get + Map(MongoDBStore.ResourceEventCollection -> resev, + MongoDBStore.IMEventCollection-> imev, + MongoDBStore.PolicyCollection-> policy, + MongoDBStore.UserStateCollection-> user + ) + } + private[this] val once = new Once() - private[this] def getCollection(name: String): DBCollection = { - val db = mongo.getDB(database) + private[this] def doAuthenticate(db: DB) { if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) { - throw new StoreException("Could not authenticate user %s".format(username)) + throw new AquariumException("Could not authenticate user %s".format(username)) } - db.getCollection(name) } - private[this] def _sortByTimestampAsc[A <: AquariumEvent](one: A, two: A): Boolean = { - if (one.occurredMillis > two.occurredMillis) false - else if (one.occurredMillis < two.occurredMillis) true - else true - } - - private[this] def _sortByTimestampDesc[A <: AquariumEvent](one: A, two: A): Boolean = { - if (one.occurredMillis < two.occurredMillis) false - else if (one.occurredMillis > two.occurredMillis) true - else true + private[this] def getCollection(name: String): DBCollection = { + val db = mongo.getDB(database) + doAuthenticate(db) + once.run { /* this code is thread-safe and will run exactly once*/ + indicesMap.foreach { case (collection,obj) => + mongo.getDB(database).getCollection(collection).createIndex(obj) + } + } + db.getCollection(name) } //+ResourceEventStore - def storeResourceEvent(event: ResourceEvent): Maybe[RecordID] = - MongoDBStore.storeAquariumEvent(event, rcevents) + def pingResourceEventStore(): Unit = synchronized { + getCollection(MongoDBStore.ResourceEventCollection) + MongoDBStore.ping(mongo) + } - def findResourceEventById(id: String): Maybe[ResourceEvent] = - MongoDBStore.findById(id, rcevents, MongoDBStore.dbObjectToResourceEvent) + def insertResourceEvent(event: ResourceEventMsg) = { + val mongoID = new ObjectId() + event.setInStoreID(mongoID.toStringMongod) - def findResourceEventsByUserId(userId: String) - (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = { - val query = new BasicDBObject(JsonNames.userId, userId) + val dbObject = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames._id, mongoID). + add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)). + add(MongoDBStore.JsonNames.userID, event.getUserID). + add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis). + add(MongoDBStore.JsonNames.receivedMillis, event.getReceivedMillis). + get() - MongoDBStore.runQuery(query, rcevents)(MongoDBStore.dbObjectToResourceEvent)(sortWith) + MongoDBStore.insertDBObject(dbObject, resourceEvents) + event } - def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = { - val query = new BasicDBObject() - query.put(JsonNames.userId, userId) - query.put(JsonNames.timestamp, new BasicDBObject("$gte", timestamp)) - - val sort = new BasicDBObject(JsonNames.timestamp, 1) + def findResourceEventByID(id: String): Option[ResourceEventMsg] = { + val dbObjectOpt = MongoDBStore.findOneByAttribute(resourceEvents, MongoDBStore.JsonNames.id, id) + for { + dbObject ← dbObjectOpt + payload = dbObject.get(MongoDBStore.JsonNames.payload) + msg = AvroHelpers.specificRecordOfBytes(payload.asInstanceOf[Array[Byte]], new ResourceEventMsg) + } yield msg + } - val cursor = rcevents.find(query).sort(sort) + def countOutOfSyncResourceEventsForBillingPeriod(userID: String, startMillis: Long, stopMillis: Long): Long = { + val query = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames.userID, userID). + // received within the period + add(MongoDBStore.JsonNames.receivedMillis, new BasicDBObject("$gte", startMillis)). + add(MongoDBStore.JsonNames.receivedMillis, new BasicDBObject("$lte", stopMillis)). + // occurred outside the period + add("$or", { + val dbList = new BasicDBList() + dbList.add(0, new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$lt", startMillis))) + dbList.add(1, new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$gt", stopMillis))) + dbList + }). + get() + + resourceEvents.count(query) + } - try { - val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent] + def foreachResourceEventOccurredInPeriod( + userID: String, + startMillis: Long, + stopMillis: Long + )(f: ResourceEventMsg ⇒ Unit): Long = { + var _counter= 0L + val query = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames.userID, userID). + add(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$gte", startMillis)). + add(MongoDBStore.JsonNames.occurredMillis, new BasicDBObject("$lte", stopMillis)). + get() + + val sorter = new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1) + val cursor = resourceEvents.find(query).sort(sorter) + + withCloseable(cursor) { cursor ⇒ while(cursor.hasNext) { - buffer += MongoDBStore.dbObjectToResourceEvent(cursor.next()) + val nextDBObject = cursor.next() + val payload = nextDBObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]] + val nextEvent = AvroHelpers.specificRecordOfBytes(payload, new ResourceEventMsg) + + f(nextEvent) + _counter += 1 } - buffer.toList - } finally { - cursor.close() } + + _counter } //-ResourceEventStore - //+UserStateStore - def storeUserState(userState: UserState): Maybe[RecordID] = - MongoDBStore.storeUserState(userState, users) - - def findUserStateByUserId(userId: String): Maybe[UserState] = { - Maybe { - val query = new BasicDBObject(JsonNames.userId, userId) - val cursor = rcevents find query - - try { - if(cursor.hasNext) - MongoDBStore.dbObjectToUserState(cursor.next()) - else - null - } finally { - cursor.close() - } + //+ UserStateStore + def findUserStateByUserID(userID: String) = { + val dbObjectOpt = MongoDBStore.findOneByAttribute(userStates, MongoDBStore.JsonNames.userID, userID) + for { + dbObject <- dbObjectOpt + payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]] + msg = AvroHelpers.specificRecordOfBytes(payload, new UserStateMsg) + } yield { + msg } } - //-UserStateStore - //+WalletEntryStore - def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] = - MongoDBStore.storeAquariumEvent(entry, wallets) + def findLatestUserStateForFullMonthBilling(userID: String, bmi: BillingMonthInfo) = { + val query = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames.userID, userID). + add(MongoDBStore.JsonNames.isForFullMonth, true). + add(MongoDBStore.JsonNames.billingYear, bmi.year). + add(MongoDBStore.JsonNames.billingMonth, bmi.month). + get() + + // Descending order, so that the latest comes first + val sorter = new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, -1) - def findWalletEntryById(id: String): Maybe[WalletEntry] = - MongoDBStore.findById[WalletEntry](id, wallets, MongoDBStore.dbObjectToWalletEntry) + val cursor = userStates.find(query).sort(sorter) - def findUserWalletEntries(userId: String) = { - // TODO: optimize - findUserWalletEntriesFromTo(userId, new Date(0), new Date(Int.MaxValue)) + withCloseable(cursor) { cursor ⇒ + MongoDBStore.findNextPayloadRecord(cursor, new UserStateMsg) + } } - def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date) : List[WalletEntry] = { - val q = new BasicDBObject() - // TODO: Is this the correct way for an AND query? - q.put(JsonNames.timestamp, new BasicDBObject("$gt", from.getTime)) - q.put(JsonNames.timestamp, new BasicDBObject("$lt", to.getTime)) - q.put(JsonNames.userId, userId) + def findLatestUserState(userID: String) = { + val query = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames.userID, userID). + get() + + // Descending order, so that the latest comes first + val sorter = new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, -1) - MongoDBStore.runQuery[WalletEntry](q, wallets)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc)) + val cursor = userStates.find(query).sort(sorter) + + withCloseable(cursor) { cursor ⇒ + MongoDBStore.findNextPayloadRecord(cursor, new UserStateMsg) + } } - def findLatestUserWalletEntries(userId: String) = { - Maybe { - val orderBy = new BasicDBObject(JsonNames.occurredMillis, -1) // -1 is descending order - val cursor = wallets.find().sort(orderBy) - - try { - val buffer = new scala.collection.mutable.ListBuffer[WalletEntry] - if(cursor.hasNext) { - val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next()) - buffer += walletEntry - - var _previousOccurredMillis = walletEntry.occurredMillis - var _ok = true - - while(cursor.hasNext && _ok) { - val walletEntry = MongoDBStore.dbObjectToWalletEntry(cursor.next()) - var currentOccurredMillis = walletEntry.occurredMillis - _ok = currentOccurredMillis == _previousOccurredMillis - - if(_ok) { - buffer += walletEntry - } - } - - buffer.toList - } else { - null - } - } finally { - cursor.close() - } + /** + * Stores a user state. + */ + def insertUserState(event: UserStateMsg)= { + val mongoID = new ObjectId() + event.setInStoreID(mongoID.toStringMongod) + + val dbObject = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames._id, mongoID). + add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)). + add(MongoDBStore.JsonNames.userID, event.getUserID). + add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis). + add(MongoDBStore.JsonNames.isForFullMonth, event.getIsForFullMonth). + add(MongoDBStore.JsonNames.billingYear, event.getBillingYear). + add(MongoDBStore.JsonNames.billingMonth, event.getBillingMonth). + add(MongoDBStore.JsonNames.billingMonthDay, event.getBillingMonthDay). + get() + + MongoDBStore.insertDBObject(dbObject, userStates) + event + } + //- UserStateStore + + //+IMEventStore + def pingIMEventStore(): Unit = { + getCollection(MongoDBStore.IMEventCollection) + MongoDBStore.ping(mongo) + } + + def insertIMEvent(event: IMEventMsg) = { + val mongoID = new ObjectId() + event.setInStoreID(mongoID.toStringMongod) + + val dbObject = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames._id, mongoID). + add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(event)). + add(MongoDBStore.JsonNames.userID, event.getUserID). + add(MongoDBStore.JsonNames.eventType, event.getEventType().toLowerCase). + add(MongoDBStore.JsonNames.occurredMillis, event.getOccurredMillis). + add(MongoDBStore.JsonNames.receivedMillis, event.getReceivedMillis). + get() + + MongoDBStore.insertDBObject(dbObject, imEvents) + event + } + + def findIMEventByID(id: String) = { + val dbObjectOpt = MongoDBStore.findOneByAttribute(imEvents, MongoDBStore.JsonNames.id, id) + for { + dbObject ← dbObjectOpt + payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]] + msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg) + } yield { + msg } } - //-WalletEntryStore - //+UserEventStore - def storeUserEvent(event: UserEvent): Maybe[RecordID] = - MongoDBStore.storeAny[UserEvent](event, imevents, JsonNames.userId, - _.userId, MongoDBStore.jsonSupportToDBObject) + /** + * Find the `CREATE` even for the given user. Note that there must be only one such event. + */ + def findCreateIMEventByUserID(userID: String) = { + val query = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames.userID, userID). + add(MongoDBStore.JsonNames.eventType, MessageConstants.IMEventMsg.EventTypes.create).get() - def findUserEventById(id: String): Maybe[UserEvent] = - MongoDBStore.findById[UserEvent](id, wallets, MongoDBStore.dbObjectToUserEvent) + // Normally one such event is allowed ... + val cursor = imEvents.find(query).sort(new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1)) + + val dbObjectOpt = withCloseable(cursor) { cursor ⇒ + if(cursor.hasNext) { + Some(cursor.next()) + } else { + None + } + } - def findUserEventsByUserId(userId: String) - (sortWith: Option[(UserEvent, UserEvent) => Boolean]): List[UserEvent] = { - val query = new BasicDBObject(JsonNames.userId, userId) - MongoDBStore.runQuery(query, rcevents)(MongoDBStore.dbObjectToUserEvent)(sortWith) + for { + dbObject <- dbObjectOpt + payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]] + msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg) + } yield { + msg + } } - //-UserEventStore -} -object MongoDBStore { - final val RESOURCE_EVENTS_COLLECTION = "resevents" - //final val PROCESSED_RESOURCE_EVENTS_COLLECTION = "procresevents" - final val USERS_COLLECTION = "users" - final val IM_EVENTS_COLLECTION = "imevents" - final val IM_WALLETS = "wallets" - - /* TODO: Some of the following methods rely on JSON (de-)serialization). - * A method based on proper object serialization would be much faster. - */ - def dbObjectToResourceEvent(dbObject: DBObject): ResourceEvent = { - ResourceEvent.fromJson(JSON.serialize(dbObject)) + /** + * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through + * the given function `f`. + * + * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case. + */ + def foreachIMEventInOccurrenceOrder(userID: String)(f: (IMEventMsg) ⇒ Boolean) = { + val query = new BasicDBObject(MongoDBStore.JsonNames.userID, userID) + val cursor = imEvents.find(query).sort(new BasicDBObject(MongoDBStore.JsonNames.occurredMillis, 1)) + + var _shouldContinue = true + withCloseable(cursor) { cursor ⇒ + while(_shouldContinue && cursor.hasNext) { + val dbObject = cursor.next() + val payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]] + val msg = AvroHelpers.specificRecordOfBytes(payload, new IMEventMsg) + + _shouldContinue = f(msg) + } + } + + _shouldContinue } + //-IMEventStore - def dbObjectToUserState(dbObj: DBObject): UserState = { - UserState.fromJson(JSON.serialize(dbObj)) + //+PolicyStore + def foreachPolicy[U](f: PolicyMsg ⇒ U) { + val cursor = policies.find() + withCloseable(cursor) { cursor ⇒ + while(cursor.hasNext) { + val dbObject = cursor.next() + val payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]] + val policy = AvroHelpers.specificRecordOfBytes(payload, new PolicyMsg) + f(policy) + } + } } - def dbObjectToWalletEntry(dbObj: DBObject): WalletEntry = { - WalletEntry.fromJson(JSON.serialize(dbObj)) + def insertPolicy(policy: PolicyMsg): PolicyMsg = { + val mongoID = new ObjectId() + policy.setInStoreID(mongoID.toStringMongod) + val dbObject = new BasicDBObjectBuilder(). + add(MongoDBStore.JsonNames._id, mongoID). + add(MongoDBStore.JsonNames.validFromMillis, policy.getValidFromMillis). + add(MongoDBStore.JsonNames.validToMillis, policy.getValidToMillis). + add(MongoDBStore.JsonNames.payload, AvroHelpers.bytesOfSpecificRecord(policy)). + get() + + MongoDBStore.insertDBObject(dbObject, policies) + policy } - def dbObjectToUserEvent(dbObj: DBObject): UserEvent = { - UserEvent.fromJson(JSON.serialize(dbObj)) + def loadPolicyAt(atMillis: Long): Option[PolicyMsg] = { + // FIXME Inefficient + var _policies = immutable.TreeSet[PolicyMsg]()(OrderingHelpers.DefaultPolicyMsgOrdering) + foreachPolicy(_policies += _) + _policies.to(MessageFactory.newDummyPolicyMsgAt(atMillis)).lastOption } - def findById[A >: Null <: AquariumEvent](id: String, collection: DBCollection, deserializer: (DBObject) => A) : Maybe[A] = Maybe { - val query = new BasicDBObject(JsonNames.id, id) - val cursor = collection find query - - try { - if(cursor.hasNext) - deserializer apply cursor.next - else - null: A // will be transformed to NoVal by the Maybe polymorphic constructor - } finally { - cursor.close() - } + def loadSortedPoliciesWithin(fromMillis: Long, toMillis: Long): immutable.SortedMap[Timeslot, PolicyMsg] = { + // FIXME Inefficient + var _policies = immutable.TreeSet[PolicyMsg]()(OrderingHelpers.DefaultPolicyMsgOrdering) + foreachPolicy(_policies += _) + + immutable.SortedMap(_policies. + from(MessageFactory.newDummyPolicyMsgAt(fromMillis)). + to(MessageFactory.newDummyPolicyMsgAt(toMillis)).toSeq. + map(p ⇒ (Timeslot(p.getValidFromMillis, p.getValidToMillis), p)): _* + ) } + //-PolicyStore +} - def runQuery[A <: AquariumEvent](query: DBObject, collection: DBCollection, orderBy: DBObject = null) - (deserializer: (DBObject) => A) - (sortWith: Option[(A, A) => Boolean]): List[A] = { - val cursor0 = collection find query - val cursor = if(orderBy ne null) { - cursor0 sort orderBy - } else { - cursor0 - } // I really know that docs say that it is the same cursor. - - if(!cursor.hasNext) { - cursor.close() - Nil - } else { - val buff = new ListBuffer[A]() +object MongoDBStore { + final val JsonNames = gr.grnet.aquarium.util.json.JsonNames - while(cursor.hasNext) { - buff += deserializer apply cursor.next - } + final val collections = List("resevents","userstates","imevents","policies") + + final val ResourceEventCollection = collections(0) + + final val UserStateCollection = collections(1) + + final val IMEventCollection = collections(2) - cursor.close() + final val PolicyCollection = collections(3) - sortWith match { - case Some(sorter) => buff.toList.sortWith(sorter) - case None => buff.toList + def firstResultIfExists[A](cursor: DBCursor, f: DBObject ⇒ A): Option[A] = { + withCloseable(cursor) { cursor ⇒ + if(cursor.hasNext) { + Some(f(cursor.next())) + } else { + None } } } - def storeAquariumEvent[A <: AquariumEvent](event: A, collection: DBCollection) : Maybe[RecordID] = { - storeAny[A](event, collection, JsonNames.id, (e) => e.id, MongoDBStore.jsonSupportToDBObject) + def ping(mongo: Mongo): Unit = synchronized { + // This requires a network roundtrip + mongo.isLocked } - def storeUserState(userState: UserState, collection: DBCollection): Maybe[RecordID] = { - storeAny[UserState](userState, collection, JsonNames.userId, _.userId, MongoDBStore.jsonSupportToDBObject) + def findOneByAttribute( + collection: DBCollection, + attributeName: String, + attributeValue: String, + sortByOpt: Option[DBObject] = None + ): Option[DBObject] = { + val query = new BasicDBObject(attributeName, attributeValue) + val cursor = sortByOpt match { + case None ⇒ collection find query + case Some(sortBy) ⇒ collection find query sort sortBy + } + withCloseable(cursor) { cursor ⇒ + if(cursor.hasNext) Some(cursor.next()) else None + } } - def storeAny[A](any: A, - collection: DBCollection, - idName: String, - idValueProvider: (A) => String, - serializer: (A) => DBObject) : Maybe[RecordID] = Maybe { - // Store - val dbObj = serializer apply any - val writeResult = collection insert dbObj - writeResult.getLastError().throwOnError() - - // Get back to retrieve unique id - val cursor = collection.find(new BasicDBObject(idName, idValueProvider(any))) - - try { - // TODO: better way to get _id? - if(cursor.hasNext) - RecordID(cursor.next().get(JsonNames._id).toString) - else - throw new StoreException("Could not store %s to %s".format(any, collection)) - } finally { - cursor.close() - } - } - - def jsonSupportToDBObject(any: JsonSupport): DBObject = { - JSON.parse(any.toJson) match { - case dbObject: DBObject ⇒ - dbObject - case _ ⇒ - throw new StoreException("Could not transform %s -> %s".format(displayableObjectInfo(any), classOf[DBObject].getName)) + def insertDBObject(dbObj: DBObject, collection: DBCollection) { + collection.insert(dbObj, WriteConcern.JOURNAL_SAFE) + } + + def findNextPayloadRecord[R <: SpecificRecord](cursor: DBCursor, fresh: R): Option[R] = { + for { + dbObject <- if(cursor.hasNext) Some(cursor.next()) else None + payload = dbObject.get(MongoDBStore.JsonNames.payload).asInstanceOf[Array[Byte]] + msg = AvroHelpers.specificRecordOfBytes(payload, fresh) + } yield { + msg } } + + def jsonSupportToDBObject(jsonSupport: JsonSupport) = { + StdConverters.AllConverters.convertEx[DBObject](jsonSupport) + } }