Implement the user event store
authorGeorgios Gousios <gousiosg@gmail.com>
Tue, 3 Jan 2012 13:36:20 +0000 (15:36 +0200)
committerGeorgios Gousios <gousiosg@gmail.com>
Tue, 3 Jan 2012 13:36:20 +0000 (15:36 +0200)
src/main/resources/aquarium.properties
src/main/scala/gr/grnet/aquarium/Configurator.scala
src/main/scala/gr/grnet/aquarium/logic/events/UserEvent.scala
src/main/scala/gr/grnet/aquarium/store/StoreProvider.scala
src/main/scala/gr/grnet/aquarium/store/UserEventStore.scala [new file with mode: 0644]
src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala
src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStoreProvider.scala

index fa02706..8edc44f 100644 (file)
@@ -54,6 +54,8 @@ store.provider.class=gr.grnet.aquarium.store.mongodb.MongoDBStoreProvider
 #resource.event.store.class=
 # Override the WalletEntry store (if present, it will not be given by the store provider above)
 #wallet.entry.store.class=
+# Override the user event store (if present, it will not be given by the store provider above)
+#user.event.store.class=
 
 
 # The lower mark for the UserActors' LRU, managed by UserActorManager.
index 3184da0..31ea95a 100644 (file)
@@ -41,7 +41,7 @@ import com.ckkloverdos.sys.SysProp
 import com.ckkloverdos.props.Props
 import com.ckkloverdos.maybe.{Maybe, Failed, Just, NoVal}
 import com.ckkloverdos.convert.Converters.{DefaultConverters => TheDefaultConverters}
-import processor.actor.{IMEventProcessorService, ResourceEventProcessorService, EventProcessorService}
+import processor.actor.{UserEventProcessorService, ResourceEventProcessorService, EventProcessorService}
 import store._
 import util.{Lifecycle, Loggable}
 
@@ -120,6 +120,14 @@ class Configurator(val props: Props) extends Loggable {
     }
   }
 
+  private[this] lazy val _userEventStoreM: Maybe[UserEventStore] = {
+    props.get(Keys.user_event_store_class) map { className ⇒
+      val instance = newInstance[UserEventStore](className)
+      logger.info("Overriding UserEventStore provisioning. Implementation given by: %s".format(instance.getClass))
+      instance
+    }
+  }
+
   private[this] lazy val _WalletEventStoreM: Maybe[WalletEntryStore] = {
     // If there is a specific `IMStore` implementation specified in the
     // properties, then this implementation overrides the event store given by
@@ -133,7 +141,7 @@ class Configurator(val props: Props) extends Loggable {
 
   private[this] lazy val _resEventProc: ResourceEventProcessorService = new ResourceEventProcessorService
 
-  private[this] lazy val _imEventProc: EventProcessorService = new IMEventProcessorService
+  private[this] lazy val _imEventProc: EventProcessorService = new UserEventProcessorService
 
   def get(key: String, default: String = ""): String = props.getOr(key, default)
 
@@ -183,6 +191,13 @@ class Configurator(val props: Props) extends Loggable {
     }
   }
 
+  def userEventStore = {
+    _userEventStoreM match {
+      case Just(es) ⇒ es
+      case _        ⇒ storeProvider.userEventStore
+    }
+  }
+
   def storeProvider = _storeProvider
 }
 
@@ -311,7 +326,7 @@ object Configurator {
     /**
      * The class that implements the IM event store
      */
-    final val im_eventstore_class = "imevent.store.class"
+    final val user_event_store_class = "user.event.store.class"
 
     /**
      * The class that implements the wallet entries store
index e343488..bf6b304 100644 (file)
@@ -68,9 +68,13 @@ case class UserEvent(
 }
 
 object UserEvent {
-  def fromJson(json: String): ResourceEvent = {
+  def fromJson(json: String): UserEvent = {
     implicit val formats = JsonHelpers.DefaultJsonFormats
     val jsonAST = parseJson(json)
-    Extraction.extract[ResourceEvent](jsonAST)
+    Extraction.extract[UserEvent](jsonAST)
+  }
+
+  def fromBytes(bytes: Array[Byte]): UserEvent = {
+    JsonHelpers.jsonBytesToObject[UserEvent](bytes)
   }
 }
index dca6e7f..00c6536 100644 (file)
@@ -44,4 +44,5 @@ trait StoreProvider {
   def userStateStore: UserStateStore
   def resourceEventStore: ResourceEventStore
   def walletEntryStore: WalletEntryStore
+  def userEventStore: UserEventStore
 }
\ No newline at end of file
diff --git a/src/main/scala/gr/grnet/aquarium/store/UserEventStore.scala b/src/main/scala/gr/grnet/aquarium/store/UserEventStore.scala
new file mode 100644 (file)
index 0000000..db34450
--- /dev/null
@@ -0,0 +1,19 @@
+package gr.grnet.aquarium.store
+
+import com.ckkloverdos.maybe.Maybe
+import gr.grnet.aquarium.logic.events.{ResourceEvent, UserEvent}
+
+/**
+ * * An abstraction for Aquarium user event stores.
+ *
+ * @author Georgios Gousios <gousiosg@gmail.com>
+ */
+trait UserEventStore {
+
+  def storeUserEvent(event: UserEvent): Maybe[RecordID]
+
+  def findUserEventById(id: String): Maybe[UserEvent]
+
+  def findUserEventsByUserId(userId: String)
+                            (sortWith: Option[(UserEvent, UserEvent) => Boolean]): List[UserEvent]
+}
\ No newline at end of file
index c7c008f..4991550 100644 (file)
@@ -42,11 +42,11 @@ import gr.grnet.aquarium.util.displayableObjectInfo
 import gr.grnet.aquarium.util.json.JsonSupport
 import collection.mutable.ListBuffer
 import gr.grnet.aquarium.store._
-import gr.grnet.aquarium.logic.events.{WalletEntry, ResourceEvent, AquariumEvent}
 import gr.grnet.aquarium.logic.events.ResourceEvent.JsonNames
 import java.util.Date
 import com.ckkloverdos.maybe.Maybe
 import com.mongodb._
+import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry, ResourceEvent, AquariumEvent}
 
 /**
  * Mongodb implementation of the various aquarium stores.
@@ -59,7 +59,9 @@ class MongoDBStore(
     val database: String,
     val username: String,
     val password: String)
-  extends ResourceEventStore with UserStateStore with WalletEntryStore with Loggable {
+  extends ResourceEventStore with UserStateStore
+  with WalletEntryStore with UserEventStore
+  with Loggable {
 
   private[store] lazy val rcevents: DBCollection = getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION)
   private[store] lazy val users: DBCollection = getCollection(MongoDBStore.USERS_COLLECTION)
@@ -68,16 +70,12 @@ class MongoDBStore(
 
   private[this] def getCollection(name: String): DBCollection = {
     val db = mongo.getDB(database)
-    if(!db.authenticate(username, password.toCharArray)) {
+    if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) {
       throw new StoreException("Could not authenticate user %s".format(username))
     }
     db.getCollection(name)
   }
 
-  /* TODO: Some of the following methods rely on JSON (de-)serialization).
-  * A method based on proper object serialization would be much faster.
-  */
-
   private[this] def _sortByTimestampAsc[A <: AquariumEvent](one: A, two: A): Boolean = {
     if (one.occurredMillis > two.occurredMillis) false
     else if (one.occurredMillis < two.occurredMillis) true
@@ -168,8 +166,6 @@ class MongoDBStore(
     MongoDBStore.runQuery[WalletEntry](q, wallets)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
   }
 
-
-  //-WalletEntryStore
   def findLatestUserWalletEntries(userId: String) = {
     Maybe {
       val orderBy = new BasicDBObject(JsonNames.occurredMillis, -1) // -1 is descending order
@@ -203,15 +199,35 @@ class MongoDBStore(
       }
     }
   }
+  //-WalletEntryStore
+
+  //+UserEventStore
+  def storeUserEvent(event: UserEvent): Maybe[RecordID] =
+    MongoDBStore.storeAny[UserEvent](event, imevents, JsonNames.userId,
+      _.userId, MongoDBStore.jsonSupportToDBObject)
+
+
+  def findUserEventById(id: String): Maybe[UserEvent] =
+    MongoDBStore.findById[UserEvent](id, wallets, MongoDBStore.dbObjectToUserEvent)
+
+  def findUserEventsByUserId(userId: String)
+                            (sortWith: Option[(UserEvent, UserEvent) => Boolean]): List[UserEvent] = {
+    val query = new BasicDBObject(JsonNames.userId, userId)
+    MongoDBStore.runQuery(query, rcevents)(MongoDBStore.dbObjectToUserEvent)(sortWith)
+  }
+  //-UserEventStore
 }
 
 object MongoDBStore {
   final val RESOURCE_EVENTS_COLLECTION = "resevents"
-  final val PROCESSED_RESOURCE_EVENTS_COLLECTION = "procresevents"
+  //final val PROCESSED_RESOURCE_EVENTS_COLLECTION = "procresevents"
   final val USERS_COLLECTION = "users"
   final val IM_EVENTS_COLLECTION = "imevents"
   final val IM_WALLETS = "wallets"
 
+  /* TODO: Some of the following methods rely on JSON (de-)serialization).
+  * A method based on proper object serialization would be much faster.
+  */
   def dbObjectToResourceEvent(dbObject: DBObject): ResourceEvent = {
     ResourceEvent.fromJson(JSON.serialize(dbObject))
   }
@@ -224,6 +240,10 @@ object MongoDBStore {
     WalletEntry.fromJson(JSON.serialize(dbObj))
   }
 
+  def dbObjectToUserEvent(dbObj: DBObject): UserEvent = {
+    UserEvent.fromJson(JSON.serialize(dbObj))
+  }
+
   def findById[A >: Null <: AquariumEvent](id: String, collection: DBCollection, deserializer: (DBObject) => A) : Maybe[A] = Maybe {
     val query = new BasicDBObject(JsonNames.id, id)
     val cursor = collection find query
index 95b8d54..1524588 100644 (file)
@@ -55,6 +55,7 @@ class MongoDBStoreProvider extends StoreProvider with Configurable {
   private[this] var _eventStore: ResourceEventStore = _
   private[this] var _userStore: UserStateStore = _
   private[this] var _walletStore: WalletEntryStore = _
+  private[this] var _userEventStore: UserEventStore = _
 
   def configure(props: Props) = {
     this._database = props.getEx(Keys.persistence_db)
@@ -72,6 +73,7 @@ class MongoDBStoreProvider extends StoreProvider with Configurable {
       this._eventStore = mongoStore
       this._userStore  = mongoStore
       this._walletStore  = mongoStore
+      this._userEventStore  = mongoStore
     } catch {
       case e: MongoException =>
         throw new Exception("Cannot connect to mongo at %s:%s".format(host, port), e)
@@ -83,4 +85,6 @@ class MongoDBStoreProvider extends StoreProvider with Configurable {
   def resourceEventStore = _eventStore
 
   def walletEntryStore = _walletStore
+
+  def userEventStore = _userEventStore
 }
\ No newline at end of file