#resource.event.store.class=
# Override the WalletEntry store (if present, it will not be given by the store provider above)
#wallet.entry.store.class=
+# Override the user event store (if present, it will not be given by the store provider above)
+#user.event.store.class=
# The lower mark for the UserActors' LRU, managed by UserActorManager.
import com.ckkloverdos.props.Props
import com.ckkloverdos.maybe.{Maybe, Failed, Just, NoVal}
import com.ckkloverdos.convert.Converters.{DefaultConverters => TheDefaultConverters}
-import processor.actor.{IMEventProcessorService, ResourceEventProcessorService, EventProcessorService}
+import processor.actor.{UserEventProcessorService, ResourceEventProcessorService, EventProcessorService}
import store._
import util.{Lifecycle, Loggable}
}
}
+ private[this] lazy val _userEventStoreM: Maybe[UserEventStore] = {
+ props.get(Keys.user_event_store_class) map { className ⇒
+ val instance = newInstance[UserEventStore](className)
+ logger.info("Overriding UserEventStore provisioning. Implementation given by: %s".format(instance.getClass))
+ instance
+ }
+ }
+
private[this] lazy val _WalletEventStoreM: Maybe[WalletEntryStore] = {
// If there is a specific `IMStore` implementation specified in the
// properties, then this implementation overrides the event store given by
private[this] lazy val _resEventProc: ResourceEventProcessorService = new ResourceEventProcessorService
- private[this] lazy val _imEventProc: EventProcessorService = new IMEventProcessorService
+ private[this] lazy val _imEventProc: EventProcessorService = new UserEventProcessorService
def get(key: String, default: String = ""): String = props.getOr(key, default)
}
}
+ def userEventStore = {
+ _userEventStoreM match {
+ case Just(es) ⇒ es
+ case _ ⇒ storeProvider.userEventStore
+ }
+ }
+
def storeProvider = _storeProvider
}
/**
* The class that implements the IM event store
*/
- final val im_eventstore_class = "imevent.store.class"
+ final val user_event_store_class = "user.event.store.class"
/**
* The class that implements the wallet entries store
}
object UserEvent {
- def fromJson(json: String): ResourceEvent = {
+ def fromJson(json: String): UserEvent = {
implicit val formats = JsonHelpers.DefaultJsonFormats
val jsonAST = parseJson(json)
- Extraction.extract[ResourceEvent](jsonAST)
+ Extraction.extract[UserEvent](jsonAST)
+ }
+
+ def fromBytes(bytes: Array[Byte]): UserEvent = {
+ JsonHelpers.jsonBytesToObject[UserEvent](bytes)
}
}
def userStateStore: UserStateStore
def resourceEventStore: ResourceEventStore
def walletEntryStore: WalletEntryStore
+ def userEventStore: UserEventStore
}
\ No newline at end of file
--- /dev/null
+package gr.grnet.aquarium.store
+
+import com.ckkloverdos.maybe.Maybe
+import gr.grnet.aquarium.logic.events.{ResourceEvent, UserEvent}
+
+/**
+ * * An abstraction for Aquarium user event stores.
+ *
+ * @author Georgios Gousios <gousiosg@gmail.com>
+ */
+trait UserEventStore {
+
+ def storeUserEvent(event: UserEvent): Maybe[RecordID]
+
+ def findUserEventById(id: String): Maybe[UserEvent]
+
+ def findUserEventsByUserId(userId: String)
+ (sortWith: Option[(UserEvent, UserEvent) => Boolean]): List[UserEvent]
+}
\ No newline at end of file
import gr.grnet.aquarium.util.json.JsonSupport
import collection.mutable.ListBuffer
import gr.grnet.aquarium.store._
-import gr.grnet.aquarium.logic.events.{WalletEntry, ResourceEvent, AquariumEvent}
import gr.grnet.aquarium.logic.events.ResourceEvent.JsonNames
import java.util.Date
import com.ckkloverdos.maybe.Maybe
import com.mongodb._
+import gr.grnet.aquarium.logic.events.{UserEvent, WalletEntry, ResourceEvent, AquariumEvent}
/**
* Mongodb implementation of the various aquarium stores.
val database: String,
val username: String,
val password: String)
- extends ResourceEventStore with UserStateStore with WalletEntryStore with Loggable {
+ extends ResourceEventStore with UserStateStore
+ with WalletEntryStore with UserEventStore
+ with Loggable {
private[store] lazy val rcevents: DBCollection = getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION)
private[store] lazy val users: DBCollection = getCollection(MongoDBStore.USERS_COLLECTION)
private[this] def getCollection(name: String): DBCollection = {
val db = mongo.getDB(database)
- if(!db.authenticate(username, password.toCharArray)) {
+ if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) {
throw new StoreException("Could not authenticate user %s".format(username))
}
db.getCollection(name)
}
- /* TODO: Some of the following methods rely on JSON (de-)serialization).
- * A method based on proper object serialization would be much faster.
- */
-
private[this] def _sortByTimestampAsc[A <: AquariumEvent](one: A, two: A): Boolean = {
if (one.occurredMillis > two.occurredMillis) false
else if (one.occurredMillis < two.occurredMillis) true
MongoDBStore.runQuery[WalletEntry](q, wallets)(MongoDBStore.dbObjectToWalletEntry)(Some(_sortByTimestampAsc))
}
-
- //-WalletEntryStore
def findLatestUserWalletEntries(userId: String) = {
Maybe {
val orderBy = new BasicDBObject(JsonNames.occurredMillis, -1) // -1 is descending order
}
}
}
+ //-WalletEntryStore
+
+ //+UserEventStore
+ def storeUserEvent(event: UserEvent): Maybe[RecordID] =
+ MongoDBStore.storeAny[UserEvent](event, imevents, JsonNames.userId,
+ _.userId, MongoDBStore.jsonSupportToDBObject)
+
+
+ def findUserEventById(id: String): Maybe[UserEvent] =
+ MongoDBStore.findById[UserEvent](id, wallets, MongoDBStore.dbObjectToUserEvent)
+
+ def findUserEventsByUserId(userId: String)
+ (sortWith: Option[(UserEvent, UserEvent) => Boolean]): List[UserEvent] = {
+ val query = new BasicDBObject(JsonNames.userId, userId)
+ MongoDBStore.runQuery(query, rcevents)(MongoDBStore.dbObjectToUserEvent)(sortWith)
+ }
+ //-UserEventStore
}
object MongoDBStore {
final val RESOURCE_EVENTS_COLLECTION = "resevents"
- final val PROCESSED_RESOURCE_EVENTS_COLLECTION = "procresevents"
+ //final val PROCESSED_RESOURCE_EVENTS_COLLECTION = "procresevents"
final val USERS_COLLECTION = "users"
final val IM_EVENTS_COLLECTION = "imevents"
final val IM_WALLETS = "wallets"
+ /* TODO: Some of the following methods rely on JSON (de-)serialization).
+ * A method based on proper object serialization would be much faster.
+ */
def dbObjectToResourceEvent(dbObject: DBObject): ResourceEvent = {
ResourceEvent.fromJson(JSON.serialize(dbObject))
}
WalletEntry.fromJson(JSON.serialize(dbObj))
}
+ def dbObjectToUserEvent(dbObj: DBObject): UserEvent = {
+ UserEvent.fromJson(JSON.serialize(dbObj))
+ }
+
def findById[A >: Null <: AquariumEvent](id: String, collection: DBCollection, deserializer: (DBObject) => A) : Maybe[A] = Maybe {
val query = new BasicDBObject(JsonNames.id, id)
val cursor = collection find query
private[this] var _eventStore: ResourceEventStore = _
private[this] var _userStore: UserStateStore = _
private[this] var _walletStore: WalletEntryStore = _
+ private[this] var _userEventStore: UserEventStore = _
def configure(props: Props) = {
this._database = props.getEx(Keys.persistence_db)
this._eventStore = mongoStore
this._userStore = mongoStore
this._walletStore = mongoStore
+ this._userEventStore = mongoStore
} catch {
case e: MongoException =>
throw new Exception("Cannot connect to mongo at %s:%s".format(host, port), e)
def resourceEventStore = _eventStore
def walletEntryStore = _walletStore
+
+ def userEventStore = _userEventStore
}
\ No newline at end of file