MongoDBStore adds indices to mongo collections exactly Once (new class). Removed...
[aquarium] / src / test / scala / gr / grnet / aquarium / BillTest.scala
index 31afe38..5ef8571 100644 (file)
@@ -139,14 +139,17 @@ object AquariumInstance {
       aquarium.start
       Thread.sleep(billWait)
       Console.err.println("Starting aquarium  (%d seconds) --- DONE".format(billWait/1000))
-      ready.set(true)
-      this.synchronized(this.notifyAll)
+      this.synchronized{
+        ready.set(true)
+        this.synchronized(this.notifyAll)
+      }
     }
     try{
-      if(!ready.get) this.synchronized(this.wait)
-      f
+      this.synchronized{
+        while(!ready.get) this.wait
+      }
     } finally {
-      if(count.addAndGet(-1) == 0){
+        if(count.addAndGet(-1) == 0){
        Console.err.println("Stopping aquarium")
        aquarium.stop
        Thread.sleep(stop)
@@ -506,7 +509,10 @@ class User(serverAndPort:String,month:Int) {
   val _creationMessage  : Message = Message("create","uid"->uid,"month"->month.toString,"spec"->"")
   var _resources : List[Message] = Nil
   var _billEntryMsg :Option[BillEntryMsg] = None
-
+  var _resMsgs = 0
+  var _vmMsgs = 0
+  var _addMsgs = 0
+  var _messagesSent : List[Message] = Nil
 
   override def toString() = uid
 
@@ -546,31 +552,68 @@ class User(serverAndPort:String,month:Int) {
     checkSum(v0,v1,zero)(f)(add)
   }
 
-  def add(a:V,b:V,pos:Boolean=false) : V= {
+  def add(a:V,b:V) : V= {
     val (a1,a2,a3) = a
     val (b1,b2,b3) = b
-    if(pos && b1 < 0.0D) a  else
+    //if(pos && b1 < 0.0D) a  else
       (a1+b1,a2+b2,a3+b3)
   }
 
-  def validateBillEntry(b:BillEntryMsg) : Boolean = {
+  val zero  = (0.0D,0L,0.0D)
+
+  def filterMessagesSent(serviceName:String) : List[Message] =
+    _messagesSent.filter { (_,serviceName) match  {
+       case (_:DiskMessage,"diskspace") => true
+       case (_:VMMessage,"vmtime") => true
+       case (_:AddCreditsMessage,"addcredits") => true
+       case _ => false
+    }}
+
+  def checkMessages(serviceName:String,c:List[ChargeEntryMsg],m:List[Message]) = {
+     if(m.length == 0) check(c.length == 0)
+     else check(c.length == (serviceName match  {
+       case "diskspace" =>  m.length - 1
+       case "vmtime" => m.length - 1
+       case "addcredits" => m.length
+     }))
+  }
+
+  def validateChargeEntry(c:ChargeEntryMsg) : V = {
+    (c.getTotalCredits.toDouble,c.getTotalElapsedTime.toLong,c.getTotalUnits.toDouble)
+  }
+
+  def validateEventEntry(serviceName:String,e:EventEntryMsg) : V = {
+    val v1 = scalaList(e.getDetails)
+    val v2 = filterMessagesSent(serviceName)
+    checkMessages(serviceName,v1,v2)
+    //val v3  = (e.getTotalCredits.toDouble,e.getTotalElapsedTime.toLong,e.getTotalUnits.toDouble)
+    val v4 = sumOf(e.getDetails,zero)(validateChargeEntry)(add)
+    v4
+  }
+
+  def validateResourceEntry(serviceName:String,r:ResourceEntryMsg) : V = {
+    val v0  = (r.getTotalCredits.toDouble,r.getTotalElapsedTime.toLong,r.getTotalUnits.toDouble)
+    val v1 = sumOf(r.getDetails,zero)(validateEventEntry(serviceName,_))(add)
+    check(v0 == v1)
+    v0
+  }
+
+  def validateServiceEntry(s:ServiceEntryMsg) : Double = {
+    val v0  = (s.getTotalCredits.toDouble,s.getTotalElapsedTime.toLong,s.getTotalUnits.toDouble)
+    val v1 = sumOf(s.getDetails,zero)(validateResourceEntry(s.getServiceName,_))(add)
+    check(v0 == v1)
+    v0._1
+  }
 
+  def validateBillEntry(b:BillEntryMsg) : Boolean = {
     try{
+      check(b.getStatus == "ok")
       check(uid == b.getUserID)
       check(_creationMessage._range.from.getTime == b.getStartTime().toLong &&
             _creationMessage._range.to.getTime == b.getEndTime().toLong)
-
-      /*for{ (s:S[ResourceEntryMsg]) <- scalaList(b.getDetails)}
-        checkS[ResourceEntryMsg,S[ResourceEntryMsg]](s){
-          case r:ResourceEntryMsg =>
-            valuesOf(r)._1
-        } add
-      */
-
-
-     true
-      /*val partialSums : List[Double] = for { s <- scalaList(b.getBill) } yield sumService(s)
-check(partialSums.foldLeft(0D){case (v,d) => v+ (if(d>0.0) d else 0.0)} == b.getDeductedCredits)*/
+      check(b.getDeductedCredits.toDouble ==
+            sumOf(b.getDetails,0.0D)(validateServiceEntry)(_ + _))
+      true
     } catch {
       case e:Exception =>
         e.printStackTrace
@@ -637,7 +680,7 @@ check(partialSums.foldLeft(0D){case (v,d) => v+ (if(d>0.0) d else 0.0)} == b.get
 
   def run(ordered:Boolean,wait:Int,minFile:Int,maxFile:Int,minAmount:Int,maxAmount:Int,maxJSONRetry :Int,
           sendViaRabbitMQ:Boolean, sendDebugEnabled : Boolean)  =  {
-    var _messagesSent : List[Message] = Nil
+    _messagesSent = Nil
     _creationMessage.send("month"->month.toString,"uid"->uid,"spec"->"0 0 * %d ?".format(month)) // send once!
     //Thread.sleep(2000)
     var iter = _resources.toList
@@ -650,12 +693,19 @@ check(partialSums.foldLeft(0D){case (v,d) => v+ (if(d>0.0) d else 0.0)} == b.get
         case (Some(l),None) => false
       }}).filter({m =>
         _messagesSent = _messagesSent ::: List(m)
-        m.send("value"->UID.random(minFile,maxFile).toString,
-               "amount"->UID.random(minAmount,maxAmount).toString,
-               "rabbitMQEnabled" -> sendViaRabbitMQ.toString,
-               "debugEnabled" -> sendDebugEnabled.toString
-                //"status" -> UID.random(List("off","on"))
-        )})
+        val b = m.send("value"->UID.random(minFile,maxFile).toString,
+                       "amount"->UID.random(minAmount,maxAmount).toString,
+                       "rabbitMQEnabled" -> sendViaRabbitMQ.toString,
+                       "debugEnabled" -> sendDebugEnabled.toString
+                       //"status" -> UID.random(List("off","on"))
+                      )
+        if(b) m match {
+          case _:DiskMessage => _resMsgs += 1
+          case _:VMMessage => _vmMsgs += 1
+          case _:AddCreditsMessage => _addMsgs +=1
+        }
+        b
+      })
     Thread.sleep(wait)
     _billEntryMsg = getBillResponse(maxJSONRetry)
   }
@@ -760,7 +810,7 @@ object ScenarioRunner {
     val (minFileCredits,maxFileCredits) = (s.minFileCredits,s.maxFileCredits)
     val (minUserCredits,maxUserCredits) = (s.maxUserCredits,s.maxUserCredits)
     //Cron spec  minutes hours day-of-month Month Day-of-Week (we do not specify seconds)
-    AquariumInstance.run(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt) {
+    //AquariumInstance.run(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt) {
       for{ r <- s.resources}  // create messages
         r.resType match {
           case "vm" =>
@@ -774,21 +824,38 @@ object ScenarioRunner {
       user.run(s.sendOrdered,s.billResponseWaitMillis.toInt,s.minFileCredits.toInt,
                s.maxFileCredits.toInt,s.minUserCredits.toInt,s.maxUserCredits.toInt,
                s.numberOfResponseRetries.toInt,s.sendViaRabbitMQ,s.sendDebugEnabled)
-    }
+    //}
     user
   }
 
+  private[this] def runAquarium[A](billWait:Long,stop:Long,default:A)(forkJoinCode: => A) : A = {
+     Console.err.println("Starting aquarium")
+     AquariumInstance.aquarium.start
+     Thread.sleep(billWait)
+     Console.err.println("Starting aquarium  (%d seconds) --- DONE".format(billWait/1000))
+     try{
+       forkJoinCode
+     } finally {
+       Console.err.println("Stopping aquarium")
+       AquariumInstance.aquarium.stop
+       Thread.sleep(stop)
+       Console.err.println("Stopping aquarium --- DONE")
+       default
+     }
+  }
+
   def runScenario(s:Scenario): Unit = {
     if(s.ignoreScenario == false) {
       Console.err.println("=================\nRunning scenario:\n %s\n=======================\n".format(s.toJsonString))
-      val tasks = for { u <- 1 to s.numberOfUsers.toInt}
-                  yield scala.actors.Futures.future(runUser(s))
-      val users = for { u <- tasks}  yield u()
-      users.foreach {u =>
-         if(s.printMessages) u.printMessages()
-         if(s.printResponses) u.printResponse()
-         if(s.validationEnabled && u.validateResults() == false)
-           Console.err.println("Validation FAILED for user " + u)
+      runAquarium(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt,List[User]()){
+        val tasks = for { u <- 1 to s.numberOfUsers.toInt}
+                    yield scala.actors.Futures.future(runUser(s))
+        tasks.map(_()).toList
+      }.foreach{ u =>
+        if(s.printMessages) u.printMessages()
+        if(s.printResponses) u.printResponse()
+        if(s.validationEnabled && u.validateResults() == false)
+          Console.err.println("Validation FAILED for user " + u)
       }
       Console.err.println("\n=========================\nStopping scenario\n=======================")
     }