From: Georgios Gousios Date: Tue, 3 Jan 2012 13:36:20 +0000 (+0200) Subject: Implement the user event store X-Git-Tag: aquarium-0.1~108 X-Git-Url: https://code.grnet.gr/git/aquarium/commitdiff_plain/ee343d6670de15600afb4b954a6eec810090ab48 Implement the user event store --- diff --git a/src/main/resources/aquarium.properties b/src/main/resources/aquarium.properties index fa02706..8edc44f 100644 --- a/src/main/resources/aquarium.properties +++ b/src/main/resources/aquarium.properties @@ -54,6 +54,8 @@ store.provider.class=gr.grnet.aquarium.store.mongodb.MongoDBStoreProvider #resource.event.store.class= # Override the WalletEntry store (if present, it will not be given by the store provider above) #wallet.entry.store.class= +# Override the user event store (if present, it will not be given by the store provider above) +#user.event.store.class= # The lower mark for the UserActors' LRU, managed by UserActorManager. diff --git a/src/main/scala/gr/grnet/aquarium/Configurator.scala b/src/main/scala/gr/grnet/aquarium/Configurator.scala index 3184da0..31ea95a 100644 --- a/src/main/scala/gr/grnet/aquarium/Configurator.scala +++ b/src/main/scala/gr/grnet/aquarium/Configurator.scala @@ -41,7 +41,7 @@ import com.ckkloverdos.sys.SysProp import com.ckkloverdos.props.Props import com.ckkloverdos.maybe.{Maybe, Failed, Just, NoVal} import com.ckkloverdos.convert.Converters.{DefaultConverters => TheDefaultConverters} -import processor.actor.{IMEventProcessorService, ResourceEventProcessorService, EventProcessorService} +import processor.actor.{UserEventProcessorService, ResourceEventProcessorService, EventProcessorService} import store._ import util.{Lifecycle, Loggable} @@ -120,6 +120,14 @@ class Configurator(val props: Props) extends Loggable { } } + private[this] lazy val _userEventStoreM: Maybe[UserEventStore] = { + props.get(Keys.user_event_store_class) map { className ⇒ + val instance = newInstance[UserEventStore](className) + logger.info("Overriding UserEventStore provisioning. Implementation given by: %s".format(instance.getClass)) + instance + } + } + private[this] lazy val _WalletEventStoreM: Maybe[WalletEntryStore] = { // If there is a specific `IMStore` implementation specified in the // properties, then this implementation overrides the event store given by @@ -133,7 +141,7 @@ class Configurator(val props: Props) extends Loggable { private[this] lazy val _resEventProc: ResourceEventProcessorService = new ResourceEventProcessorService - private[this] lazy val _imEventProc: EventProcessorService = new IMEventProcessorService + private[this] lazy val _imEventProc: EventProcessorService = new UserEventProcessorService def get(key: String, default: String = ""): String = props.getOr(key, default) @@ -183,6 +191,13 @@ class Configurator(val props: Props) extends Loggable { } } + def userEventStore = { + _userEventStoreM match { + case Just(es) ⇒ es + case _ ⇒ storeProvider.userEventStore + } + } + def storeProvider = _storeProvider } @@ -311,7 +326,7 @@ object Configurator { /** * The class that implements the IM event store */ - final val im_eventstore_class = "imevent.store.class" + final val user_event_store_class = "user.event.store.class" /** * The class that implements the wallet entries store diff --git a/src/main/scala/gr/grnet/aquarium/logic/events/UserEvent.scala b/src/main/scala/gr/grnet/aquarium/logic/events/UserEvent.scala index e343488..bf6b304 100644 --- a/src/main/scala/gr/grnet/aquarium/logic/events/UserEvent.scala +++ b/src/main/scala/gr/grnet/aquarium/logic/events/UserEvent.scala @@ -68,9 +68,13 @@ case class UserEvent( } object UserEvent { - def fromJson(json: String): ResourceEvent = { + def fromJson(json: String): UserEvent = { implicit val formats = JsonHelpers.DefaultJsonFormats val jsonAST = parseJson(json) - Extraction.extract[ResourceEvent](jsonAST) + Extraction.extract[UserEvent](jsonAST) + } + + def fromBytes(bytes: Array[Byte]): UserEvent = { + JsonHelpers.jsonBytesToObject[UserEvent](bytes) } } diff --git a/src/main/scala/gr/grnet/aquarium/store/StoreProvider.scala b/src/main/scala/gr/grnet/aquarium/store/StoreProvider.scala index dca6e7f..00c6536 100644 --- a/src/main/scala/gr/grnet/aquarium/store/StoreProvider.scala +++ b/src/main/scala/gr/grnet/aquarium/store/StoreProvider.scala @@ -44,4 +44,5 @@ trait StoreProvider { def userStateStore: UserStateStore def resourceEventStore: ResourceEventStore def walletEntryStore: WalletEntryStore + def userEventStore: UserEventStore } \ No newline at end of file diff --git a/src/main/scala/gr/grnet/aquarium/store/UserEventStore.scala b/src/main/scala/gr/grnet/aquarium/store/UserEventStore.scala new file mode 100644 index 0000000..db34450 --- /dev/null +++ b/src/main/scala/gr/grnet/aquarium/store/UserEventStore.scala @@ -0,0 +1,19 @@ +package gr.grnet.aquarium.store + +import com.ckkloverdos.maybe.Maybe +import gr.grnet.aquarium.logic.events.{ResourceEvent, UserEvent} + +/** + * * An abstraction for Aquarium user event stores. + * + * @author Georgios Gousios + */ +trait UserEventStore { + + def storeUserEvent(event: UserEvent): Maybe[RecordID] + + def findUserEventById(id: String): Maybe[UserEvent] + + def findUserEventsByUserId(userId: String) + (sortWith: Option[(UserEvent, UserEvent) => Boolean]): List[UserEvent] +} \ No newline at end of file diff --git a/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala b/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala index c7c008f..4991550 100644 --- a/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala +++ b/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala @@ -42,11 +42,11 @@ import gr.grnet.aquarium.util.displayableObjectInfo import gr.grnet.aquarium.util.json.JsonSupport import collection.mutable.ListBuffer import gr.grnet.aquarium.store._ -import gr.grnet.aquarium.logic.events.{WalletEntry, ResourceEvent, AquariumEvent} import gr.grnet.aquarium.logic.events.ResourceEvent.JsonNames import java.util.Date import com.ckkloverdos.maybe.Maybe import com.mongodb._ +import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry, ResourceEvent, AquariumEvent} /** * Mongodb implementation of the various aquarium stores. @@ -59,7 +59,9 @@ class MongoDBStore( val database: String, val username: String, val password: String) - extends ResourceEventStore with UserStateStore with WalletEntryStore with Loggable { + extends ResourceEventStore with UserStateStore + with WalletEntryStore with UserEventStore + with Loggable { private[store] lazy val rcevents: DBCollection = getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION) private[store] lazy val users: DBCollection = getCollection(MongoDBStore.USERS_COLLECTION) @@ -68,16 +70,12 @@ class MongoDBStore( private[this] def getCollection(name: String): DBCollection = { val db = mongo.getDB(database) - if(!db.authenticate(username, password.toCharArray)) { + if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) { throw new StoreException("Could not authenticate user %s".format(username)) } db.getCollection(name) } - /* TODO: Some of the following methods rely on JSON (de-)serialization). - * A method based on proper object serialization would be much faster. - */ - private[this] def _sortByTimestampAsc[A <: AquariumEvent](one: A, two: A): Boolean = { if (one.occurredMillis > two.occurredMillis) false else if (one.occurredMillis < two.occurredMillis) true @@ -168,8 +166,6 @@ class MongoDBStore( MongoDBStore.runQuery[WalletEntry](q, wallets)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc)) } - - //-WalletEntryStore def findLatestUserWalletEntries(userId: String) = { Maybe { val orderBy = new BasicDBObject(JsonNames.occurredMillis, -1) // -1 is descending order @@ -203,15 +199,35 @@ class MongoDBStore( } } } + //-WalletEntryStore + + //+UserEventStore + def storeUserEvent(event: UserEvent): Maybe[RecordID] = + MongoDBStore.storeAny[UserEvent](event, imevents, JsonNames.userId, + _.userId, MongoDBStore.jsonSupportToDBObject) + + + def findUserEventById(id: String): Maybe[UserEvent] = + MongoDBStore.findById[UserEvent](id, wallets, MongoDBStore.dbObjectToUserEvent) + + def findUserEventsByUserId(userId: String) + (sortWith: Option[(UserEvent, UserEvent) => Boolean]): List[UserEvent] = { + val query = new BasicDBObject(JsonNames.userId, userId) + MongoDBStore.runQuery(query, rcevents)(MongoDBStore.dbObjectToUserEvent)(sortWith) + } + //-UserEventStore } object MongoDBStore { final val RESOURCE_EVENTS_COLLECTION = "resevents" - final val PROCESSED_RESOURCE_EVENTS_COLLECTION = "procresevents" + //final val PROCESSED_RESOURCE_EVENTS_COLLECTION = "procresevents" final val USERS_COLLECTION = "users" final val IM_EVENTS_COLLECTION = "imevents" final val IM_WALLETS = "wallets" + /* TODO: Some of the following methods rely on JSON (de-)serialization). + * A method based on proper object serialization would be much faster. + */ def dbObjectToResourceEvent(dbObject: DBObject): ResourceEvent = { ResourceEvent.fromJson(JSON.serialize(dbObject)) } @@ -224,6 +240,10 @@ object MongoDBStore { WalletEntry.fromJson(JSON.serialize(dbObj)) } + def dbObjectToUserEvent(dbObj: DBObject): UserEvent = { + UserEvent.fromJson(JSON.serialize(dbObj)) + } + def findById[A >: Null <: AquariumEvent](id: String, collection: DBCollection, deserializer: (DBObject) => A) : Maybe[A] = Maybe { val query = new BasicDBObject(JsonNames.id, id) val cursor = collection find query diff --git a/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStoreProvider.scala b/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStoreProvider.scala index 95b8d54..1524588 100644 --- a/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStoreProvider.scala +++ b/src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStoreProvider.scala @@ -55,6 +55,7 @@ class MongoDBStoreProvider extends StoreProvider with Configurable { private[this] var _eventStore: ResourceEventStore = _ private[this] var _userStore: UserStateStore = _ private[this] var _walletStore: WalletEntryStore = _ + private[this] var _userEventStore: UserEventStore = _ def configure(props: Props) = { this._database = props.getEx(Keys.persistence_db) @@ -72,6 +73,7 @@ class MongoDBStoreProvider extends StoreProvider with Configurable { this._eventStore = mongoStore this._userStore = mongoStore this._walletStore = mongoStore + this._userEventStore = mongoStore } catch { case e: MongoException => throw new Exception("Cannot connect to mongo at %s:%s".format(host, port), e) @@ -83,4 +85,6 @@ class MongoDBStoreProvider extends StoreProvider with Configurable { def resourceEventStore = _eventStore def walletEntryStore = _walletStore + + def userEventStore = _userEventStore } \ No newline at end of file