MongoDBStore adds indices to mongo collections exactly Once (new class). Removed...
authorProdromos Gerakios <pgerakios@grnet.gr>
Mon, 1 Oct 2012 12:32:17 +0000 (15:32 +0300)
committerProdromos Gerakios <pgerakios@grnet.gr>
Mon, 1 Oct 2012 12:32:17 +0000 (15:32 +0300)
src/main/scala/gr/grnet/aquarium/store/mongodb/MongoDBStore.scala
src/main/scala/gr/grnet/aquarium/util/Once.scala [new file with mode: 0644]
src/test/scala/gr/grnet/aquarium/BillTest.scala

index 31dd626..3aa450d 100644 (file)
@@ -45,6 +45,7 @@ import gr.grnet.aquarium.message.avro.gen.{UserStateMsg, IMEventMsg, ResourceEve
 import gr.grnet.aquarium.message.avro.{MessageFactory, OrderingHelpers, AvroHelpers}
 import gr.grnet.aquarium.store._
 import gr.grnet.aquarium.util._
+import gr.grnet.aquarium.util.Once
 import gr.grnet.aquarium.util.json.JsonSupport
 import gr.grnet.aquarium.{Aquarium, AquariumException}
 import org.apache.avro.specific.SpecificRecord
@@ -74,6 +75,33 @@ class MongoDBStore(
   private[store] lazy val imEvents = getCollection(MongoDBStore.IMEventCollection)
   private[store] lazy val policies = getCollection(MongoDBStore.PolicyCollection)
 
+  private[store] lazy val indicesMap = {
+   val resev=  new BasicDBObjectBuilder().
+                      add(MongoDBStore.JsonNames.id,1).
+                      add(MongoDBStore.JsonNames.userID,1).
+                      add(MongoDBStore.JsonNames.occurredMillis,1).
+                      add(MongoDBStore.JsonNames.receivedMillis,1).get
+   val imev =  new BasicDBObjectBuilder().
+                 add(MongoDBStore.JsonNames.userID,1).
+                 add(MongoDBStore.JsonNames.eventType,"").
+                 add(MongoDBStore.JsonNames.occurredMillis,1).get
+   val policy = new BasicDBObjectBuilder().
+                 add("validFromMillis",1).
+                 add("validToMillis",1).get
+   val user = new BasicDBObjectBuilder().
+              add( "occurredMillis",1).
+              add("isFullBillingMonth",false).
+              add("billingYear",1).
+              add("billingMonth",1).
+              add("billingMonthDay",1).get
+    Map(MongoDBStore.ResourceEventCollection -> resev,
+        MongoDBStore.IMEventCollection-> imev,
+        MongoDBStore.PolicyCollection-> policy,
+        MongoDBStore.UserStateCollection-> user
+       )
+  }
+  private[this] val once = new Once()
+
   private[this] def doAuthenticate(db: DB) {
     if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) {
       throw new AquariumException("Could not authenticate user %s".format(username))
@@ -83,6 +111,11 @@ class MongoDBStore(
   private[this] def getCollection(name: String): DBCollection = {
     val db = mongo.getDB(database)
     doAuthenticate(db)
+    once.run { /* this code is thread-safe and will run exactly once*/
+      indicesMap.foreach { case (collection,obj) =>
+        mongo.getDB(database).getCollection(collection).createIndex(obj)
+      }
+    }
     db.getCollection(name)
   }
 
@@ -353,13 +386,15 @@ class MongoDBStore(
 object MongoDBStore {
   final val JsonNames = gr.grnet.aquarium.util.json.JsonNames
 
-  final val ResourceEventCollection = "resevents"
+  final val collections = List("resevents","userstates","imevents","policies")
+
+  final val ResourceEventCollection = collections(0)
 
-  final val UserStateCollection = "userstates"
+  final val UserStateCollection = collections(1)
 
-  final val IMEventCollection = "imevents"
+  final val IMEventCollection = collections(2)
 
-  final val PolicyCollection = "policies"
+  final val PolicyCollection = collections(3)
 
   def firstResultIfExists[A](cursor: DBCursor, f: DBObject ⇒ A): Option[A] = {
     withCloseable(cursor) { cursor ⇒
diff --git a/src/main/scala/gr/grnet/aquarium/util/Once.scala b/src/main/scala/gr/grnet/aquarium/util/Once.scala
new file mode 100644 (file)
index 0000000..6202039
--- /dev/null
@@ -0,0 +1,28 @@
+package gr.grnet.aquarium.util
+
+/**
+ * Run exactly once
+ *
+ * @author Prodromos Gerakios <pgerakios@grnet.gr>
+ */
+
+class Once {
+  private[this] val count=new java.util.concurrent.atomic.AtomicLong()
+  private[this] val ready=new java.util.concurrent.atomic.AtomicBoolean(false)
+
+  def run(once : => Unit): Unit = {
+    if(!ready.get){
+      if(count.addAndGet(1) == 1){
+        try {
+          once
+        } finally {
+          this.synchronized{
+            ready.set(true)
+            this.synchronized(this.notifyAll)
+          }
+        }
+      } else
+        this.synchronized {while(!ready.get) this.wait}
+  }
+ }
+}
index 31afe38..5ef8571 100644 (file)
@@ -139,14 +139,17 @@ object AquariumInstance {
       aquarium.start
       Thread.sleep(billWait)
       Console.err.println("Starting aquarium  (%d seconds) --- DONE".format(billWait/1000))
-      ready.set(true)
-      this.synchronized(this.notifyAll)
+      this.synchronized{
+        ready.set(true)
+        this.synchronized(this.notifyAll)
+      }
     }
     try{
-      if(!ready.get) this.synchronized(this.wait)
-      f
+      this.synchronized{
+        while(!ready.get) this.wait
+      }
     } finally {
-      if(count.addAndGet(-1) == 0){
+        if(count.addAndGet(-1) == 0){
        Console.err.println("Stopping aquarium")
        aquarium.stop
        Thread.sleep(stop)
@@ -506,7 +509,10 @@ class User(serverAndPort:String,month:Int) {
   val _creationMessage  : Message = Message("create","uid"->uid,"month"->month.toString,"spec"->"")
   var _resources : List[Message] = Nil
   var _billEntryMsg :Option[BillEntryMsg] = None
-
+  var _resMsgs = 0
+  var _vmMsgs = 0
+  var _addMsgs = 0
+  var _messagesSent : List[Message] = Nil
 
   override def toString() = uid
 
@@ -546,31 +552,68 @@ class User(serverAndPort:String,month:Int) {
     checkSum(v0,v1,zero)(f)(add)
   }
 
-  def add(a:V,b:V,pos:Boolean=false) : V= {
+  def add(a:V,b:V) : V= {
     val (a1,a2,a3) = a
     val (b1,b2,b3) = b
-    if(pos && b1 < 0.0D) a  else
+    //if(pos && b1 < 0.0D) a  else
       (a1+b1,a2+b2,a3+b3)
   }
 
-  def validateBillEntry(b:BillEntryMsg) : Boolean = {
+  val zero  = (0.0D,0L,0.0D)
+
+  def filterMessagesSent(serviceName:String) : List[Message] =
+    _messagesSent.filter { (_,serviceName) match  {
+       case (_:DiskMessage,"diskspace") => true
+       case (_:VMMessage,"vmtime") => true
+       case (_:AddCreditsMessage,"addcredits") => true
+       case _ => false
+    }}
+
+  def checkMessages(serviceName:String,c:List[ChargeEntryMsg],m:List[Message]) = {
+     if(m.length == 0) check(c.length == 0)
+     else check(c.length == (serviceName match  {
+       case "diskspace" =>  m.length - 1
+       case "vmtime" => m.length - 1
+       case "addcredits" => m.length
+     }))
+  }
+
+  def validateChargeEntry(c:ChargeEntryMsg) : V = {
+    (c.getTotalCredits.toDouble,c.getTotalElapsedTime.toLong,c.getTotalUnits.toDouble)
+  }
+
+  def validateEventEntry(serviceName:String,e:EventEntryMsg) : V = {
+    val v1 = scalaList(e.getDetails)
+    val v2 = filterMessagesSent(serviceName)
+    checkMessages(serviceName,v1,v2)
+    //val v3  = (e.getTotalCredits.toDouble,e.getTotalElapsedTime.toLong,e.getTotalUnits.toDouble)
+    val v4 = sumOf(e.getDetails,zero)(validateChargeEntry)(add)
+    v4
+  }
+
+  def validateResourceEntry(serviceName:String,r:ResourceEntryMsg) : V = {
+    val v0  = (r.getTotalCredits.toDouble,r.getTotalElapsedTime.toLong,r.getTotalUnits.toDouble)
+    val v1 = sumOf(r.getDetails,zero)(validateEventEntry(serviceName,_))(add)
+    check(v0 == v1)
+    v0
+  }
+
+  def validateServiceEntry(s:ServiceEntryMsg) : Double = {
+    val v0  = (s.getTotalCredits.toDouble,s.getTotalElapsedTime.toLong,s.getTotalUnits.toDouble)
+    val v1 = sumOf(s.getDetails,zero)(validateResourceEntry(s.getServiceName,_))(add)
+    check(v0 == v1)
+    v0._1
+  }
 
+  def validateBillEntry(b:BillEntryMsg) : Boolean = {
     try{
+      check(b.getStatus == "ok")
       check(uid == b.getUserID)
       check(_creationMessage._range.from.getTime == b.getStartTime().toLong &&
             _creationMessage._range.to.getTime == b.getEndTime().toLong)
-
-      /*for{ (s:S[ResourceEntryMsg]) <- scalaList(b.getDetails)}
-        checkS[ResourceEntryMsg,S[ResourceEntryMsg]](s){
-          case r:ResourceEntryMsg =>
-            valuesOf(r)._1
-        } add
-      */
-
-
-     true
-      /*val partialSums : List[Double] = for { s <- scalaList(b.getBill) } yield sumService(s)
-check(partialSums.foldLeft(0D){case (v,d) => v+ (if(d>0.0) d else 0.0)} == b.getDeductedCredits)*/
+      check(b.getDeductedCredits.toDouble ==
+            sumOf(b.getDetails,0.0D)(validateServiceEntry)(_ + _))
+      true
     } catch {
       case e:Exception =>
         e.printStackTrace
@@ -637,7 +680,7 @@ check(partialSums.foldLeft(0D){case (v,d) => v+ (if(d>0.0) d else 0.0)} == b.get
 
   def run(ordered:Boolean,wait:Int,minFile:Int,maxFile:Int,minAmount:Int,maxAmount:Int,maxJSONRetry :Int,
           sendViaRabbitMQ:Boolean, sendDebugEnabled : Boolean)  =  {
-    var _messagesSent : List[Message] = Nil
+    _messagesSent = Nil
     _creationMessage.send("month"->month.toString,"uid"->uid,"spec"->"0 0 * %d ?".format(month)) // send once!
     //Thread.sleep(2000)
     var iter = _resources.toList
@@ -650,12 +693,19 @@ check(partialSums.foldLeft(0D){case (v,d) => v+ (if(d>0.0) d else 0.0)} == b.get
         case (Some(l),None) => false
       }}).filter({m =>
         _messagesSent = _messagesSent ::: List(m)
-        m.send("value"->UID.random(minFile,maxFile).toString,
-               "amount"->UID.random(minAmount,maxAmount).toString,
-               "rabbitMQEnabled" -> sendViaRabbitMQ.toString,
-               "debugEnabled" -> sendDebugEnabled.toString
-                //"status" -> UID.random(List("off","on"))
-        )})
+        val b = m.send("value"->UID.random(minFile,maxFile).toString,
+                       "amount"->UID.random(minAmount,maxAmount).toString,
+                       "rabbitMQEnabled" -> sendViaRabbitMQ.toString,
+                       "debugEnabled" -> sendDebugEnabled.toString
+                       //"status" -> UID.random(List("off","on"))
+                      )
+        if(b) m match {
+          case _:DiskMessage => _resMsgs += 1
+          case _:VMMessage => _vmMsgs += 1
+          case _:AddCreditsMessage => _addMsgs +=1
+        }
+        b
+      })
     Thread.sleep(wait)
     _billEntryMsg = getBillResponse(maxJSONRetry)
   }
@@ -760,7 +810,7 @@ object ScenarioRunner {
     val (minFileCredits,maxFileCredits) = (s.minFileCredits,s.maxFileCredits)
     val (minUserCredits,maxUserCredits) = (s.maxUserCredits,s.maxUserCredits)
     //Cron spec  minutes hours day-of-month Month Day-of-Week (we do not specify seconds)
-    AquariumInstance.run(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt) {
+    //AquariumInstance.run(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt) {
       for{ r <- s.resources}  // create messages
         r.resType match {
           case "vm" =>
@@ -774,21 +824,38 @@ object ScenarioRunner {
       user.run(s.sendOrdered,s.billResponseWaitMillis.toInt,s.minFileCredits.toInt,
                s.maxFileCredits.toInt,s.minUserCredits.toInt,s.maxUserCredits.toInt,
                s.numberOfResponseRetries.toInt,s.sendViaRabbitMQ,s.sendDebugEnabled)
-    }
+    //}
     user
   }
 
+  private[this] def runAquarium[A](billWait:Long,stop:Long,default:A)(forkJoinCode: => A) : A = {
+     Console.err.println("Starting aquarium")
+     AquariumInstance.aquarium.start
+     Thread.sleep(billWait)
+     Console.err.println("Starting aquarium  (%d seconds) --- DONE".format(billWait/1000))
+     try{
+       forkJoinCode
+     } finally {
+       Console.err.println("Stopping aquarium")
+       AquariumInstance.aquarium.stop
+       Thread.sleep(stop)
+       Console.err.println("Stopping aquarium --- DONE")
+       default
+     }
+  }
+
   def runScenario(s:Scenario): Unit = {
     if(s.ignoreScenario == false) {
       Console.err.println("=================\nRunning scenario:\n %s\n=======================\n".format(s.toJsonString))
-      val tasks = for { u <- 1 to s.numberOfUsers.toInt}
-                  yield scala.actors.Futures.future(runUser(s))
-      val users = for { u <- tasks}  yield u()
-      users.foreach {u =>
-         if(s.printMessages) u.printMessages()
-         if(s.printResponses) u.printResponse()
-         if(s.validationEnabled && u.validateResults() == false)
-           Console.err.println("Validation FAILED for user " + u)
+      runAquarium(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt,List[User]()){
+        val tasks = for { u <- 1 to s.numberOfUsers.toInt}
+                    yield scala.actors.Futures.future(runUser(s))
+        tasks.map(_()).toList
+      }.foreach{ u =>
+        if(s.printMessages) u.printMessages()
+        if(s.printResponses) u.printResponse()
+        if(s.validationEnabled && u.validateResults() == false)
+          Console.err.println("Validation FAILED for user " + u)
       }
       Console.err.println("\n=========================\nStopping scenario\n=======================")
     }