Fixed a concurrency bug in BillTest. Added some validation tests. Modified the format...
[aquarium] / src / test / scala / gr / grnet / aquarium / BillTest.scala
index 0d6f8a0..31afe38 100644 (file)
@@ -44,10 +44,13 @@ import java.util.concurrent.atomic.AtomicLong
 import gr.grnet.aquarium.util.{Lock, Loggable}
 import java.util.{Date, Calendar, GregorianCalendar}
 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
-import gr.grnet.aquarium.policy.CronSpec
-import gr.grnet.aquarium.message.avro.gen.{BillEntryMsg, IMEventMsg, ResourceEventMsg}
+import message.avro.gen._
 import org.apache.avro.specific.SpecificRecord
+import policy.CronSpec
 import util.json.JsonSupport
+import scala.Some
+import scala.Tuple2
+import java.util.concurrent.locks.ReentrantLock
 
 
 /*
@@ -56,6 +59,21 @@ import util.json.JsonSupport
 
 
 object UID {
+
+  private[this] var privCounters = Map[String,Long]()
+  private[this] val lock = new Lock()
+
+  def next(s:String) : Long = {
+     val l = lock.withLock{
+      privCounters.get(s) match {
+        case None => 1
+        case Some(l) => l+1
+      }
+    }
+    privCounters = privCounters + ((s,l))
+    l
+  }
+
   private[this] val counter = new AtomicLong(0L)
   def next() = counter.getAndIncrement
   def random(min:Int=Int.MinValue,max:Int=Int.MaxValue) =
@@ -113,18 +131,27 @@ object AquariumInstance {
   }
 
   private[this] val count=new java.util.concurrent.atomic.AtomicLong()
+  private[this] val ready=new java.util.concurrent.atomic.AtomicBoolean(false)
+
   def run(billWait:Int, stop:Int)(f : => Unit) = {
-    if(count.addAndGet(1) == 1)
+    if(count.addAndGet(1) == 1){
+      Console.err.println("Starting aquarium")
       aquarium.start
-    Thread.sleep(billWait)
+      Thread.sleep(billWait)
+      Console.err.println("Starting aquarium  (%d seconds) --- DONE".format(billWait/1000))
+      ready.set(true)
+      this.synchronized(this.notifyAll)
+    }
     try{
+      if(!ready.get) this.synchronized(this.wait)
       f
     } finally {
-      Console.err.println("Stopping aquarium")
-      if(count.addAndGet(-1) == 0)
-        aquarium.stop
-      Thread.sleep(stop)
-      Console.err.println("Stopping aquarium --- DONE")
+      if(count.addAndGet(-1) == 0){
+       Console.err.println("Stopping aquarium")
+       aquarium.stop
+       Thread.sleep(stop)
+       Console.err.println("Stopping aquarium --- DONE")
+      }
     }
   }
 }
@@ -142,6 +169,7 @@ object JsonLog {
 } */
 
 object MessageService {
+  private[this] val lock = new Lock
 
   def send(event:SpecificRecord, rabbitMQEnabled : Boolean = false, debugEnabled:Boolean =false) = {
     val json = AvroHelpers.jsonStringOfSpecificRecord(event)
@@ -173,7 +201,7 @@ object MessageService {
              AquariumInstance.aquarium.imEventStore.insertIMEvent(imevent)
              imevent.getUserID
       }
-      val userActorRef = AquariumInstance.aquarium.akkaService.getOrCreateUserActor(uid)
+      val userActorRef = lock.withLock(AquariumInstance.aquarium.akkaService.getOrCreateUserActor(uid))
       userActorRef ! event
     }
     val millis = event match {
@@ -210,7 +238,8 @@ abstract class Message {
         if((_cronSpec==null ||  _cronSpec.cronSpec != spec ||cal.get(Calendar.MONTH) != month -1)) {
            val d1 = getDate(1,if(month==12) 1 else month+1,year,0,0,0)
            val d0 = getDate(1,month,year,0,0,0)
-           _range = Timeslot(d0,d1 - 1000)
+           _range = Timeslot((d0/1000)*1000,(d1/1000)*1000 - 1000)
+           cal.setTimeInMillis(d0)
           _cronSpec = new CronSpec(if(spec.isEmpty) "* * * * *" else spec)
         }
       case _ => ()
@@ -240,13 +269,19 @@ abstract class Message {
   }
 
   def year : Int = {
+    val tmp = getMillis
     cal.setTimeInMillis(System.currentTimeMillis())
-    cal.get(Calendar.YEAR)
+    val ret = cal.get(Calendar.YEAR)
+    cal.setTimeInMillis(tmp)
+    ret
   }
 
   def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int,sec:Int) : Long = {
+    val tmp = getMillis
     cal.set(year,month-1,day,hour,min,sec)
-    cal.getTimeInMillis
+    val ret = cal.getTimeInMillis
+    cal.setTimeInMillis(tmp)
+    ret
   }
 
   def getMillis : Long = cal.getTimeInMillis
@@ -254,16 +289,21 @@ abstract class Message {
   def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int) : Long =
     getDate(day,month,year,hour,min,0)
 
-  def setMillis(millis:Long) = {
+  /*def setMillis(millis:Long) = {
     cal.setTimeInMillis(millis)
-  }
+  }*/
 
-  def addMillis(day:Int,hour:Int) = {
+  /*def addMillis(day:Int,hour:Int) = {
     cal.roll(Calendar.DATE,day)
     cal.roll(Calendar.DATE,hour)
-  }
+  }*/
 
-  def nextID = UID.next
+  def nextID = UID.next(this.getClass().getName) /*this match {
+    case _:DiskMessage => UID.next("DiskMessage")
+    case _:CreationMessage => UID.next("CreationMessage")
+    case _:VMMessage => UID.next("VMMessage")
+    case _:AddCreditsMessage =>  UID.next("AddCreditsMessage")
+  }*/
 
   def makeEvent(millis:Long,map:Map[String,String]) : SpecificRecord
 
@@ -406,7 +446,6 @@ class CreationMessage extends Message {
       MessageFactory.newDetails(),
       uid
     )
-
     msg
   }
 }
@@ -439,7 +478,6 @@ class AddCreditsMessage extends Message {
       MessageFactory.newDetails(),
       uid
     )
-
     msg
   }
 }
@@ -469,17 +507,92 @@ class User(serverAndPort:String,month:Int) {
   var _resources : List[Message] = Nil
   var _billEntryMsg :Option[BillEntryMsg] = None
 
+
   override def toString() = uid
 
+  def scalaList[A](s:java.util.List[A]) : List[A] = {
+    import scala.collection.JavaConverters.asScalaBufferConverter
+    s.asScala.toList
+  }
+
+  def sumOf[A,D](l: java.util.List[A],start:D)(f:A=>D)(add:(D,D)=>D) : D =
+    scalaList(l).map(f).foldLeft(start) {case (sum,v) =>  add(sum,v) }
+
+
+  def checkSum[A,D](s:D,l: java.util.List[A],start:D)(f:A=>D)(add:(D,D)=>D) =
+    check(s == sumOf(l,start)(f)(add))
+
+  def check(b: => Boolean) = {
+    if(!b)
+      throw new Exception("Invalid property")
+  }
+
+
+  type S[A] = {def getTotalCredits : String
+               def getTotalElapsedTime:String
+               def getTotalUnits:String
+               def getDetails:java.util.List[A]}
+  type V = (Double,Long,Double)
+
+  def valuesOf[A,T<:S[A]](t:T) : (V,java.util.List[A]) =
+    ((t.getTotalCredits.toDouble,
+      t.getTotalElapsedTime.toLong,
+      t.getTotalUnits.toDouble),
+      t.getDetails)
+
+  def checkS[A,T<:S[A]](s:T)(f:A=>V)(add:(V,V)=>V) = {
+    val zero = (0D,0L,0D)
+    val (v0,v1) = valuesOf[A,T](s)
+    checkSum(v0,v1,zero)(f)(add)
+  }
+
+  def add(a:V,b:V,pos:Boolean=false) : V= {
+    val (a1,a2,a3) = a
+    val (b1,b2,b3) = b
+    if(pos && b1 < 0.0D) a  else
+      (a1+b1,a2+b2,a3+b3)
+  }
+
+  def validateBillEntry(b:BillEntryMsg) : Boolean = {
+
+    try{
+      check(uid == b.getUserID)
+      check(_creationMessage._range.from.getTime == b.getStartTime().toLong &&
+            _creationMessage._range.to.getTime == b.getEndTime().toLong)
+
+      /*for{ (s:S[ResourceEntryMsg]) <- scalaList(b.getDetails)}
+        checkS[ResourceEntryMsg,S[ResourceEntryMsg]](s){
+          case r:ResourceEntryMsg =>
+            valuesOf(r)._1
+        } add
+      */
+
+
+     true
+      /*val partialSums : List[Double] = for { s <- scalaList(b.getBill) } yield sumService(s)
+check(partialSums.foldLeft(0D){case (v,d) => v+ (if(d>0.0) d else 0.0)} == b.getDeductedCredits)*/
+    } catch {
+      case e:Exception =>
+        e.printStackTrace
+        false
+    }
+  }
+
   def validateResults() : Boolean = {
-    throw new Exception("Not implemented !!!!")
+    _billEntryMsg match {
+      case None => false
+      case Some(b) => validateBillEntry(b)
+    }
+    //throw new Exception("Not implemented !!!!")
   }
 
-  def printResults() = {
+  def printMessages() = {
     Console.err.println("Messages sent:")
     for { m <- JsonLog.get}
       Console.err.println("%s".format(m)) //"\n==============\n%s\n==============="
     Console.err.println("\n=========================\n")
+  }
+  def printResponse() = {
     Console.err.println("Response:\n" + (_billEntryMsg match {
       case None => "NONE!!!!"
       case Some(r) => AvroHelpers.jsonStringOfSpecificRecord(r)
@@ -505,7 +618,7 @@ class User(serverAndPort:String,month:Int) {
          Map("instanceID"->"cyclades.vm.%d".format(i),
          "vmName"  -> "Virtual Machine #%d".format(i),
          "status"  -> "on", // initially "on" msg
-         "spec"    -> cronSpec)})
+         "spec"    -> cronSpec.format(month))})
 
   def addFiles(no:Int,action:String/*,value:Int,minVal:Int,maxVal:Int*/,spec:String) : User =
     add(no,"disk",{i =>
@@ -513,12 +626,13 @@ class User(serverAndPort:String,month:Int) {
        Map("action" -> action,
            "path"->"/Papers/file_%d.PDF".format(i),
            //"value"->UID.random(minVal,maxVal).toString,
-           "spec" -> spec
+           "spec" -> spec.format(month)
           )
     })
 
   def addCredits(no:Int,spec:String) : User = {
-    add(no,"credits",/*"month"->month.toString,"uid"->uid,*/"spec"->spec/*,"amount"->amount.toString*/)
+    add(no,"credits",/*"month"->month.toString,"uid"->uid,*/"spec"->spec.format(month)
+       /*,"amount"->amount.toString*/)
   }
 
   def run(ordered:Boolean,wait:Int,minFile:Int,maxFile:Int,minAmount:Int,maxAmount:Int,maxJSONRetry :Int,
@@ -604,6 +718,8 @@ extends JsonSupport {}
 
 case class Scenario(
   val ignoreScenario : Boolean,
+  val printMessages : Boolean,
+  val printResponses: Boolean,
   val host : String,
   val port : Long,
   val sendOrdered : Boolean,
@@ -669,7 +785,8 @@ object ScenarioRunner {
                   yield scala.actors.Futures.future(runUser(s))
       val users = for { u <- tasks}  yield u()
       users.foreach {u =>
-         u.printResults()
+         if(s.printMessages) u.printMessages()
+         if(s.printResponses) u.printResponse()
          if(s.validationEnabled && u.validateResults() == false)
            Console.err.println("Validation FAILED for user " + u)
       }
@@ -687,34 +804,50 @@ object ScenarioRunner {
 }
 
 object UserTest extends Loggable {
-
-   //vm,disk,credits
-  //add(1,"credits","month"->month.toString,"uid"->uid,"spec"->spec,"amount"->amount.toString)
-  /*
-    val host : String,
-  val port : Long,
-  val sendOrdered : Boolean,
-  val sendViaRabbitMQ : Boolean,
-  val sendDebugEnabled : Boolean,
-  val validationEnabled : Boolean,
-  val billingMonth: Long,
-  val aquariumStartWaitMillis : Long,
-  val aquariumStopWaitMillis : Long,
-  val billResponseWaitMillis : Long,
-  val numberOfUsers  : Long,
-  val numberOfResponseRetries : Long,
-  val minFileCredits : Long,
-  val maxFileCredits : Long,
-  val minUserCredits : Long,
-  val maxUserCredits : Long,
-  val resources : List[Resource]
-
-   */
- val basic = new Scenario(false,"localhost",8888,true,false,false,false,9,2000,2000,2000,
+/*
+    JSON example:
+  {
+  "scenarios":[{
+    "ignoreScenario":false,
+    "printMessages":false,
+    "printResponses":true,
+    "host":"localhost",
+    "port":8888,
+    "sendOrdered":true,
+    "sendViaRabbitMQ":false,
+    "sendDebugEnabled":false,
+    "validationEnabled":false,
+    "billingMonth":9,
+    "aquariumStartWaitMillis":2000,
+    "aquariumStopWaitMillis":2000,
+    "billResponseWaitMillis":2000,
+    "numberOfUsers":2,
+    "numberOfResponseRetries":10,
+    "minFileCredits":2000,
+    "maxFileCredits":5000,
+    "minUserCredits":10000,
+    "maxUserCredits":50000,
+    "resources":[{
+      "resType":"credits",
+      "instances":1,
+      "cronSpec":"00 00 10,12 %d ?"
+    },{
+      "resType":"disk",
+      "instances":1,
+      "cronSpec":"00 18 15,20,29,30 %d ?"
+    },{
+      "resType":"vm",
+      "instances":1,
+      "cronSpec":"00 18 14,17,19,20 %d ?"
+    }]
+  }]
+}
+ */
+ val basic = new Scenario(false,false,true,"localhost",8888,true,false,false,false,9,2000,2000,2000,
                           1,10,2000,5000,10000,50000,List[Resource](
-                          new Resource("credits",1, "00 00 10,12 9 ?"),
-                          new Resource("disk",1,"00 18 15,20,29,30 9 ?"),
-                          new Resource("vm",1,"00 18 14,17,19,20 9 ?")
+                          new Resource("credits",1, "00 00 10,12 %d ?".format(9)),
+                          new Resource("disk",1,"00 18 15,20,29,30 %d ?".format(9)),
+                          new Resource("vm",1,"00 18 14,17,19,20 %d ?".format(9))
                         ))
 
  def main(args: Array[String]) = {