Add ping functionality to two of the stores
[aquarium] / src / main / scala / gr / grnet / aquarium / store / mongodb / MongoDBStore.scala
index 70a25ee..1171045 100644 (file)
 package gr.grnet.aquarium.store.mongodb
 
 import com.mongodb.util.JSON
-import gr.grnet.aquarium.user.UserState
-import gr.grnet.aquarium.user.UserState.{JsonNames => UserStateJsonNames}
+import gr.grnet.aquarium.computation.UserState.{JsonNames ⇒ UserStateJsonNames}
 import gr.grnet.aquarium.util.json.JsonSupport
 import collection.mutable.ListBuffer
-import gr.grnet.aquarium.events.im.IMEventModel.{Names => IMEventNames}
+import gr.grnet.aquarium.event.model.im.IMEventModel
+import gr.grnet.aquarium.event.model.im.IMEventModel.{Names ⇒ IMEventNames}
+import gr.grnet.aquarium.event.model.resource.ResourceEventModel
+import gr.grnet.aquarium.event.model.resource.ResourceEventModel.{Names ⇒ ResourceEventNames}
 import gr.grnet.aquarium.store._
-import gr.grnet.aquarium.events.ResourceEvent.{JsonNames => ResourceJsonNames}
-import gr.grnet.aquarium.events.WalletEntry.{JsonNames => WalletJsonNames}
-import gr.grnet.aquarium.events.PolicyEntry.{JsonNames => PolicyJsonNames}
+import gr.grnet.aquarium.event.model.WalletEntry.{JsonNames ⇒ WalletJsonNames}
+import gr.grnet.aquarium.event.model.PolicyEntry.{JsonNames ⇒ PolicyJsonNames}
 import java.util.Date
 import gr.grnet.aquarium.logic.accounting.Policy
 import com.mongodb._
 import org.bson.types.ObjectId
-import gr.grnet.aquarium.events._
-import com.ckkloverdos.maybe.{NoVal, Maybe}
-import im.IMEventModel
+import com.ckkloverdos.maybe.Maybe
 import gr.grnet.aquarium.util._
-import gr.grnet.aquarium.converter.StdConverters
+import gr.grnet.aquarium.converter.Conversions
+import gr.grnet.aquarium.computation.UserState
+import gr.grnet.aquarium.event.model.{ExternalEventModel, WalletEntry, PolicyEntry}
 
 /**
  * Mongodb implementation of the various aquarium stores.
@@ -74,6 +75,7 @@ class MongoDBStore(
   with Loggable {
 
   override type IMEvent = MongoDBIMEvent
+  override type ResourceEvent = MongoDBResourceEvent
 
   private[store] lazy val resourceEvents   = getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION)
   private[store] lazy val userStates       = getCollection(MongoDBStore.USER_STATES_COLLECTION)
@@ -91,51 +93,51 @@ class MongoDBStore(
     db.getCollection(name)
   }
 
-  private[this] def _sortByTimestampAsc[A <: AquariumEventModel](one: A, two: A): Boolean = {
+  private[this] def _sortByTimestampAsc[A <: ExternalEventModel](one: A, two: A): Boolean = {
     if (one.occurredMillis > two.occurredMillis) false
     else if (one.occurredMillis < two.occurredMillis) true
     else true
   }
 
-  private[this] def _sortByTimestampDesc[A <: AquariumEventSkeleton](one: A, two: A): Boolean = {
-    if (one.occurredMillis < two.occurredMillis) false
-    else if (one.occurredMillis > two.occurredMillis) true
-    else true
+  //+ResourceEventStore
+  def createResourceEventFromOther(event: ResourceEventModel): ResourceEvent = {
+    MongoDBResourceEvent.fromOther(event, null)
   }
 
-  //+ResourceEventStore
-  def storeResourceEvent(event: ResourceEvent) = {
-    MongoDBStore.storeAny[ResourceEvent](
-      event,
-      resourceEvents,
-      ResourceJsonNames.id,
-      (e) => e.id,
-      MongoDBStore.jsonSupportToDBObject)
+  def pingResourceEventStore(): Unit = {
+    MongoDBStore.ping(mongo)
   }
 
-  def findResourceEventById(id: String): Maybe[ResourceEvent] =
-    MongoDBStore.findById(id, resourceEvents, MongoDBStore.dbObjectToResourceEvent)
+  def insertResourceEvent(event: ResourceEventModel) = {
+    val localEvent = MongoDBResourceEvent.fromOther(event, new ObjectId().toStringMongod)
+    MongoDBStore.insertObject(localEvent, resourceEvents, MongoDBStore.jsonSupportToDBObject)
+    localEvent
+  }
+
+  def findResourceEventById(id: String): Option[ResourceEvent] = {
+    MongoDBStore.findBy(ResourceEventNames.id, id, resourceEvents, MongoDBResourceEvent.fromDBObject)
+  }
 
   def findResourceEventsByUserId(userId: String)
                                 (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
-    val query = new BasicDBObject(ResourceJsonNames.userId, userId)
+    val query = new BasicDBObject(ResourceEventNames.userID, userId)
 
-    MongoDBStore.runQuery(query, resourceEvents)(MongoDBStore.dbObjectToResourceEvent)(sortWith)
+    MongoDBStore.runQuery(query, resourceEvents)(MongoDBResourceEvent.fromDBObject)(sortWith)
   }
 
   def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
     val query = new BasicDBObject()
-    query.put(ResourceJsonNames.userId, userId)
-    query.put(ResourceJsonNames.occurredMillis, new BasicDBObject("$gt", timestamp))
+    query.put(ResourceEventNames.userID, userId)
+    query.put(ResourceEventNames.occurredMillis, new BasicDBObject("$gt", timestamp))
     
-    val sort = new BasicDBObject(ResourceJsonNames.occurredMillis, 1)
+    val sort = new BasicDBObject(ResourceEventNames.occurredMillis, 1)
 
     val cursor = resourceEvents.find(query).sort(sort)
 
     try {
       val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
       while(cursor.hasNext) {
-        buffer += MongoDBStore.dbObjectToResourceEvent(cursor.next())
+        buffer += MongoDBResourceEvent.fromDBObject(cursor.next())
       }
       buffer.toList.sortWith(_sortByTimestampAsc)
     } finally {
@@ -146,27 +148,27 @@ class MongoDBStore(
   def findResourceEventHistory(userId: String, resName: String,
                                instid: Option[String], upTo: Long) : List[ResourceEvent] = {
     val query = new BasicDBObject()
-    query.put(ResourceJsonNames.userId, userId)
-    query.put(ResourceJsonNames.occurredMillis, new BasicDBObject("$lt", upTo))
-    query.put(ResourceJsonNames.resource, resName)
+    query.put(ResourceEventNames.userID, userId)
+    query.put(ResourceEventNames.occurredMillis, new BasicDBObject("$lt", upTo))
+    query.put(ResourceEventNames.resource, resName)
 
     instid match {
       case Some(id) =>
         Policy.policy.findResource(resName) match {
-          case Some(y) => query.put(ResourceJsonNames.details,
+          case Some(y) => query.put(ResourceEventNames.details,
             new BasicDBObject(y.descriminatorField, instid.get))
           case None =>
         }
       case None =>
     }
 
-    val sort = new BasicDBObject(ResourceJsonNames.occurredMillis, 1)
+    val sort = new BasicDBObject(ResourceEventNames.occurredMillis, 1)
     val cursor = resourceEvents.find(query).sort(sort)
 
     try {
       val buffer = new scala.collection.mutable.ListBuffer[ResourceEvent]
       while(cursor.hasNext) {
-        buffer += MongoDBStore.dbObjectToResourceEvent(cursor.next())
+        buffer += MongoDBResourceEvent.fromDBObject(cursor.next())
       }
       buffer.toList.sortWith(_sortByTimestampAsc)
     } finally {
@@ -176,21 +178,19 @@ class MongoDBStore(
 
   def findResourceEventsForReceivedPeriod(userId: String, startTimeMillis: Long, stopTimeMillis: Long): List[ResourceEvent] = {
     val query = new BasicDBObject()
-    query.put(ResourceJsonNames.userId, userId)
-    query.put(ResourceJsonNames.receivedMillis, new BasicDBObject("$gte", startTimeMillis))
-    query.put(ResourceJsonNames.receivedMillis, new BasicDBObject("$lte", stopTimeMillis))
+    query.put(ResourceEventNames.userID, userId)
+    query.put(ResourceEventNames.receivedMillis, new BasicDBObject("$gte", startTimeMillis))
+    query.put(ResourceEventNames.receivedMillis, new BasicDBObject("$lte", stopTimeMillis))
 
     // Sort them by increasing order for occurred time
-    val orderBy = new BasicDBObject(ResourceJsonNames.occurredMillis, 1)
+    val orderBy = new BasicDBObject(ResourceEventNames.occurredMillis, 1)
 
-    MongoDBStore.runQuery[ResourceEvent](query, resourceEvents, orderBy)(MongoDBStore.dbObjectToResourceEvent)(None)
+    MongoDBStore.runQuery[ResourceEvent](query, resourceEvents, orderBy)(MongoDBResourceEvent.fromDBObject)(None)
   }
   
-  def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Maybe[Long] = {
-    Maybe {
-      // FIXME: Implement
-      0L
-    }
+  def countOutOfSyncEventsForBillingPeriod(userId: String, startMillis: Long, stopMillis: Long): Long = {
+    // FIXME: Implement
+    0L
   }
 
   def findAllRelevantResourceEventsForBillingPeriod(userId: String,
@@ -202,34 +202,36 @@ class MongoDBStore(
   //-ResourceEventStore
 
   //+ UserStateStore
-  def storeUserState(userState: UserState): Maybe[RecordID] = {
-    MongoDBStore.storeUserState(userState, userStates)
+  def insertUserState(userState: UserState) = {
+    MongoDBStore.insertUserState(userState, userStates, MongoDBStore.jsonSupportToDBObject)
   }
 
-  def findUserStateByUserId(userId: String): Maybe[UserState] = {
-    Maybe {
-      val query = new BasicDBObject(UserStateJsonNames.userId, userId)
-      val cursor = userStates find query
+  def findUserStateByUserID(userID: String): Option[UserState] = {
+    val query = new BasicDBObject(UserStateJsonNames.userID, userID)
+    val cursor = userStates find query
 
-      try {
-        if(cursor.hasNext)
-          MongoDBStore.dbObjectToUserState(cursor.next())
-        else
-          null
-      } finally {
-        cursor.close()
-      }
+    withCloseable(cursor) { cursor ⇒
+      if(cursor.hasNext)
+        Some(MongoDBStore.dbObjectToUserState(cursor.next()))
+      else
+        None
     }
   }
 
+
+  def findLatestUserStateByUserID(userID: String) = {
+    // FIXME: implement
+    null
+  }
+
   def findLatestUserStateForEndOfBillingMonth(userId: String,
                                               yearOfBillingMonth: Int,
-                                              billingMonth: Int): Maybe[UserState] = {
-    NoVal // FIXME: implement
+                                              billingMonth: Int): Option[UserState] = {
+    None // FIXME: implement
   }
 
   def deleteUserState(userId: String) = {
-    val query = new BasicDBObject(UserStateJsonNames.userId, userId)
+    val query = new BasicDBObject(UserStateJsonNames.userID, userId)
     userStates.findAndRemove(query)
   }
   //- UserStateStore
@@ -240,14 +242,15 @@ class MongoDBStore(
       MongoDBStore.storeAny[WalletEntry](
         entry,
         walletEntries,
-        ResourceJsonNames.id,
+        WalletJsonNames.id,
         (e) => e.id,
         MongoDBStore.jsonSupportToDBObject)
     }
   }
 
-  def findWalletEntryById(id: String): Maybe[WalletEntry] =
-    MongoDBStore.findById[WalletEntry](id, walletEntries, MongoDBStore.dbObjectToWalletEntry)
+  def findWalletEntryById(id: String): Maybe[WalletEntry] = {
+    MongoDBStore.findBy(WalletJsonNames.id, id, walletEntries, MongoDBStore.dbObjectToWalletEntry): Maybe[WalletEntry]
+  }
 
   def findUserWalletEntries(userId: String) = {
     // TODO: optimize
@@ -323,10 +326,6 @@ class MongoDBStore(
   //-WalletEntryStore
 
   //+IMEventStore
-  def isLocalIMEvent(event: IMEventModel) = {
-    MongoDBStore.isLocalIMEvent(event)
-  }
-
   def createIMEventFromJson(json: String) = {
     MongoDBStore.createIMEventFromJson(json)
   }
@@ -335,27 +334,18 @@ class MongoDBStore(
     MongoDBStore.createIMEventFromOther(event)
   }
 
-  def storeUnparsed(json: String): Maybe[RecordID] = {
-    MongoDBStore.storeJustJson(json, unparsedIMEvents)
+  def pingIMEventStore(): Unit = {
+    MongoDBStore.ping(mongo)
   }
 
-  def storeIMEvent(_event: IMEventModel): RecordID = {
-    val event = createIMEventFromOther(_event)
-    MongoDBStore.storeAny[IMEvent](
-      event,
-      imEvents,
-      IMEventNames.userID,
-      _.userID,
-      MongoDBStore.jsonSupportToDBObject
-    )
+  def insertIMEvent(event: IMEventModel): IMEvent = {
+    val localEvent = MongoDBIMEvent.fromOther(event, new ObjectId().toStringMongod)
+    MongoDBStore.insertObject(localEvent, imEvents, MongoDBStore.jsonSupportToDBObject)
+    localEvent
   }
 
-  def findIMEventById(id: String): Maybe[IMEvent] =
-    MongoDBStore.findById[IMEvent](id, imEvents, MongoDBStore.dbObjectToIMEvent)
-
-  def findIMEventsByUserId(userId: String): List[IMEvent] = {
-    val query = new BasicDBObject(IMEventNames.userID, userId)
-    MongoDBStore.runQuery(query, imEvents)(MongoDBStore.dbObjectToIMEvent)(Some(_sortByTimestampAsc))
+  def findIMEventById(id: String): Option[IMEvent] = {
+    MongoDBStore.findBy(IMEventNames.id, id, imEvents, MongoDBIMEvent.fromDBObject)
   }
   //-IMEventStore
 
@@ -376,8 +366,9 @@ class MongoDBStore(
     policyEntries.update(query, policyObject, true, false)
   }
   
-  def findPolicyEntry(id: String) =
-    MongoDBStore.findById[PolicyEntry](id, policyEntries, MongoDBStore.dbObjectToPolicyEntry)
+  def findPolicyEntry(id: String) = {
+    MongoDBStore.findBy(PolicyJsonNames.id, id, policyEntries, MongoDBStore.dbObjectToPolicyEntry)
+  }
 
   //-PolicyStore
 }
@@ -388,28 +379,28 @@ object MongoDBStore {
   }
 
   /**
-   * Collection holding the [[gr.grnet.aquarium.events.ResourceEvent]]s.
+   * Collection holding the [[gr.grnet.aquarium.event.model.resource.ResourceEventModel]]s.
    *
    * Resource events are coming from all systems handling billable resources.
    */
   final val RESOURCE_EVENTS_COLLECTION = "resevents"
 
   /**
-   * Collection holding the snapshots of [[gr.grnet.aquarium.user.UserState]].
+   * Collection holding the snapshots of [[gr.grnet.aquarium.computation.UserState]].
    *
-   * [[gr.grnet.aquarium.user.UserState]] is held internally within [[gr.grnet.aquarium.actor.service.user .UserActor]]s.
+   * [[gr.grnet.aquarium.computation.UserState]] is held internally within [[gr.grnet.aquarium.actor.service.user.UserActor]]s.
    */
   final val USER_STATES_COLLECTION = "userstates"
 
   /**
-   * Collection holding [[gr.grnet.aquarium.events.im.IMEventModel]]s.
+   * Collection holding [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
    *
    * User events are coming from the IM module (external).
    */
   final val IM_EVENTS_COLLECTION = "imevents"
 
   /**
-   * Collection holding [[gr.grnet.aquarium.events.im.IMEventModel]]s that could not be parsed to normal objects.
+   * Collection holding [[gr.grnet.aquarium.event.model.im.IMEventModel]]s that could not be parsed to normal objects.
    *
    * We of course assume at least a valid JSON representation.
    *
@@ -418,7 +409,7 @@ object MongoDBStore {
   final val UNPARSED_IM_EVENTS_COLLECTION = "unparsed_imevents"
 
   /**
-   * Collection holding [[gr.grnet.aquarium.events.WalletEntry]].
+   * Collection holding [[gr.grnet.aquarium.event.model.WalletEntry]].
    *
    * Wallet entries are generated internally in Aquarium.
    */
@@ -430,17 +421,10 @@ object MongoDBStore {
 //  final val POLICIES_COLLECTION = "policies"
 
   /**
-   * Collection holding [[gr.grnet.aquarium.events.PolicyEntry]].
+   * Collection holding [[gr.grnet.aquarium.event.model.PolicyEntry]].
    */
   final val POLICY_ENTRIES_COLLECTION = "policyEntries"
 
-  /* TODO: Some of the following methods rely on JSON (de-)serialization).
-  * A method based on proper object serialization would be much faster.
-  */
-  def dbObjectToResourceEvent(dbObject: DBObject): ResourceEvent = {
-    ResourceEvent.fromJson(JSON.serialize(dbObject))
-  }
-
   def dbObjectToUserState(dbObj: DBObject): UserState = {
     UserState.fromJson(JSON.serialize(dbObj))
   }
@@ -449,30 +433,31 @@ object MongoDBStore {
     WalletEntry.fromJson(JSON.serialize(dbObj))
   }
 
-  def dbObjectToIMEvent(dbObj: DBObject): MongoDBIMEvent = {
-    MongoDBIMEvent.fromJson(JSON.serialize(dbObj))
-  }
-
   def dbObjectToPolicyEntry(dbObj: DBObject): PolicyEntry = {
     PolicyEntry.fromJson(JSON.serialize(dbObj))
   }
 
-  def findById[A >: Null <: AnyRef](id: String, collection: DBCollection, deserializer: (DBObject) => A) : Maybe[A] =
-    Maybe {
-    val query = new BasicDBObject(ResourceJsonNames.id, id)
+  def ping(mongo: Mongo): Unit = {
+    // This requires a network roundtrip
+    mongo.isLocked
+  }
+
+  def findBy[A >: Null <: AnyRef](name: String,
+                                  value: String,
+                                  collection: DBCollection,
+                                  deserializer: (DBObject) => A) : Option[A] = {
+    val query = new BasicDBObject(name, value)
     val cursor = collection find query
 
-    try {
+    withCloseable(cursor) { cursor ⇒
       if(cursor.hasNext)
-        deserializer apply cursor.next
+        Some(deserializer apply cursor.next)
       else
-        null: A // will be transformed to NoVal by the Maybe polymorphic constructor
-    } finally {
-      cursor.close()
+        None
     }
   }
 
-  def runQuery[A <: AquariumEventModel](query: DBObject, collection: DBCollection, orderBy: DBObject = null)
+  def runQuery[A <: ExternalEventModel](query: DBObject, collection: DBCollection, orderBy: DBObject = null)
                                   (deserializer: (DBObject) => A)
                                   (sortWith: Option[(A, A) => Boolean]): List[A] = {
     val cursor0 = collection find query
@@ -501,25 +486,14 @@ object MongoDBStore {
     }
   }
 
-  def storeUserState(userState: UserState, collection: DBCollection): Maybe[RecordID] = {
-    Maybe(storeAny[UserState](userState, collection, ResourceJsonNames.userId, _.userId, MongoDBStore.jsonSupportToDBObject))
+  def storeUserState(userState: UserState, collection: DBCollection) = {
+    storeAny[UserState](userState, collection, ResourceEventNames.userID, _.userID, MongoDBStore.jsonSupportToDBObject)
   }
   
   def storePolicyEntry(policyEntry: PolicyEntry, collection: DBCollection): Maybe[RecordID] = {
     Maybe(storeAny[PolicyEntry](policyEntry, collection, PolicyJsonNames.id, _.id, MongoDBStore.jsonSupportToDBObject))
   }
 
-  def storeJustJson(json: String, collection: DBCollection): Maybe[RecordID] = {
-    Maybe {
-      val dbObj = jsonStringToDBObject(json)
-      val writeResult = collection insert dbObj
-      writeResult.getLastError().throwOnError()
-      val objectId = dbObj.get("_id").asInstanceOf[ObjectId]
-
-      RecordID(objectId.toString)
-    }
-  }
-
   def storeAny[A](any: A,
                   collection: DBCollection,
                   idName: String,
@@ -535,12 +509,45 @@ object MongoDBStore {
     RecordID(dbObject.get("_id").toString)
   }
 
-  def jsonSupportToDBObject(jsonSupport: JsonSupport): DBObject = {
-    StdConverters.StdConverters.convertEx[DBObject](jsonSupport)
+  // FIXME: consolidate
+  def insertUserState[A <: UserState](obj: A, collection: DBCollection, serializer: A ⇒ DBObject) = {
+    val dbObject = serializer apply obj
+    val objectId = obj._id  match {
+      case null ⇒
+        val _id = new ObjectId()
+        dbObject.put("_id", _id)
+        _id
+
+      case _id ⇒
+        _id
+    }
+
+    dbObject.put(JsonNames._id, objectId)
+
+    collection.insert(dbObject, WriteConcern.JOURNAL_SAFE)
+
+    obj
+  }
+
+  def insertObject[A <: MongoDBEventModel](obj: A, collection: DBCollection, serializer: (A) => DBObject) : Unit = {
+    val dbObject = serializer apply obj
+    val objectId = obj._id  match {
+      case null ⇒
+        val _id = new ObjectId()
+        dbObject.put("_id", _id)
+        _id
+
+      case _id ⇒
+        _id
+    }
+
+    dbObject.put(JsonNames._id, objectId)
+
+    collection.insert(dbObject, WriteConcern.JOURNAL_SAFE)
   }
 
-  def jsonStringToDBObject(jsonString: String): DBObject = {
-    StdConverters.StdConverters.convertEx[DBObject](jsonString)
+  def jsonSupportToDBObject(jsonSupport: JsonSupport) = {
+    Conversions.jsonSupportToDBObject(jsonSupport)
   }
 
   final def isLocalIMEvent(event: IMEventModel) = event match {
@@ -549,11 +556,11 @@ object MongoDBStore {
   }
 
   final def createIMEventFromJson(json: String) = {
-    MongoDBIMEvent.fromJson(json)
+    MongoDBIMEvent.fromJsonString(json)
   }
 
   final def createIMEventFromOther(event: IMEventModel) = {
-    MongoDBIMEvent.fromOther(event)
+    MongoDBIMEvent.fromOther(event, null)
   }
 
   final def createIMEventFromJsonBytes(jsonBytes: Array[Byte]) = {