Revision 53389ed0 src/test/scala/gr/grnet/aquarium/BillTest.scala

b/src/test/scala/gr/grnet/aquarium/BillTest.scala
139 139
      aquarium.start
140 140
      Thread.sleep(billWait)
141 141
      Console.err.println("Starting aquarium  (%d seconds) --- DONE".format(billWait/1000))
142
      ready.set(true)
143
      this.synchronized(this.notifyAll)
142
      this.synchronized{
143
        ready.set(true)
144
        this.synchronized(this.notifyAll)
145
      }
144 146
    }
145 147
    try{
146
      if(!ready.get) this.synchronized(this.wait)
147
      f
148
      this.synchronized{
149
        while(!ready.get) this.wait
150
      }
148 151
    } finally {
149
      if(count.addAndGet(-1) == 0){
152
        if(count.addAndGet(-1) == 0){
150 153
       Console.err.println("Stopping aquarium")
151 154
       aquarium.stop
152 155
       Thread.sleep(stop)
......
506 509
  val _creationMessage  : Message = Message("create","uid"->uid,"month"->month.toString,"spec"->"")
507 510
  var _resources : List[Message] = Nil
508 511
  var _billEntryMsg :Option[BillEntryMsg] = None
509

  
512
  var _resMsgs = 0
513
  var _vmMsgs = 0
514
  var _addMsgs = 0
515
  var _messagesSent : List[Message] = Nil
510 516

  
511 517
  override def toString() = uid
512 518

  
......
546 552
    checkSum(v0,v1,zero)(f)(add)
547 553
  }
548 554

  
549
  def add(a:V,b:V,pos:Boolean=false) : V= {
555
  def add(a:V,b:V) : V= {
550 556
    val (a1,a2,a3) = a
551 557
    val (b1,b2,b3) = b
552
    if(pos && b1 < 0.0D) a  else
558
    //if(pos && b1 < 0.0D) a  else
553 559
      (a1+b1,a2+b2,a3+b3)
554 560
  }
555 561

  
556
  def validateBillEntry(b:BillEntryMsg) : Boolean = {
562
  val zero  = (0.0D,0L,0.0D)
563

  
564
  def filterMessagesSent(serviceName:String) : List[Message] =
565
    _messagesSent.filter { (_,serviceName) match  {
566
       case (_:DiskMessage,"diskspace") => true
567
       case (_:VMMessage,"vmtime") => true
568
       case (_:AddCreditsMessage,"addcredits") => true
569
       case _ => false
570
    }}
571

  
572
  def checkMessages(serviceName:String,c:List[ChargeEntryMsg],m:List[Message]) = {
573
     if(m.length == 0) check(c.length == 0)
574
     else check(c.length == (serviceName match  {
575
       case "diskspace" =>  m.length - 1
576
       case "vmtime" => m.length - 1
577
       case "addcredits" => m.length
578
     }))
579
  }
580

  
581
  def validateChargeEntry(c:ChargeEntryMsg) : V = {
582
    (c.getTotalCredits.toDouble,c.getTotalElapsedTime.toLong,c.getTotalUnits.toDouble)
583
  }
584

  
585
  def validateEventEntry(serviceName:String,e:EventEntryMsg) : V = {
586
    val v1 = scalaList(e.getDetails)
587
    val v2 = filterMessagesSent(serviceName)
588
    checkMessages(serviceName,v1,v2)
589
    //val v3  = (e.getTotalCredits.toDouble,e.getTotalElapsedTime.toLong,e.getTotalUnits.toDouble)
590
    val v4 = sumOf(e.getDetails,zero)(validateChargeEntry)(add)
591
    v4
592
  }
593

  
594
  def validateResourceEntry(serviceName:String,r:ResourceEntryMsg) : V = {
595
    val v0  = (r.getTotalCredits.toDouble,r.getTotalElapsedTime.toLong,r.getTotalUnits.toDouble)
596
    val v1 = sumOf(r.getDetails,zero)(validateEventEntry(serviceName,_))(add)
597
    check(v0 == v1)
598
    v0
599
  }
600

  
601
  def validateServiceEntry(s:ServiceEntryMsg) : Double = {
602
    val v0  = (s.getTotalCredits.toDouble,s.getTotalElapsedTime.toLong,s.getTotalUnits.toDouble)
603
    val v1 = sumOf(s.getDetails,zero)(validateResourceEntry(s.getServiceName,_))(add)
604
    check(v0 == v1)
605
    v0._1
606
  }
557 607

  
608
  def validateBillEntry(b:BillEntryMsg) : Boolean = {
558 609
    try{
610
      check(b.getStatus == "ok")
559 611
      check(uid == b.getUserID)
560 612
      check(_creationMessage._range.from.getTime == b.getStartTime().toLong &&
561 613
            _creationMessage._range.to.getTime == b.getEndTime().toLong)
562

  
563
      /*for{ (s:S[ResourceEntryMsg]) <- scalaList(b.getDetails)}
564
        checkS[ResourceEntryMsg,S[ResourceEntryMsg]](s){
565
          case r:ResourceEntryMsg =>
566
            valuesOf(r)._1
567
        } add
568
      */
569

  
570

  
571
     true
572
      /*val partialSums : List[Double] = for { s <- scalaList(b.getBill) } yield sumService(s)
573
check(partialSums.foldLeft(0D){case (v,d) => v+ (if(d>0.0) d else 0.0)} == b.getDeductedCredits)*/
614
      check(b.getDeductedCredits.toDouble ==
615
            sumOf(b.getDetails,0.0D)(validateServiceEntry)(_ + _))
616
      true
574 617
    } catch {
575 618
      case e:Exception =>
576 619
        e.printStackTrace
......
637 680

  
638 681
  def run(ordered:Boolean,wait:Int,minFile:Int,maxFile:Int,minAmount:Int,maxAmount:Int,maxJSONRetry :Int,
639 682
          sendViaRabbitMQ:Boolean, sendDebugEnabled : Boolean)  =  {
640
    var _messagesSent : List[Message] = Nil
683
    _messagesSent = Nil
641 684
    _creationMessage.send("month"->month.toString,"uid"->uid,"spec"->"0 0 * %d ?".format(month)) // send once!
642 685
    //Thread.sleep(2000)
643 686
    var iter = _resources.toList
......
650 693
        case (Some(l),None) => false
651 694
      }}).filter({m =>
652 695
        _messagesSent = _messagesSent ::: List(m)
653
        m.send("value"->UID.random(minFile,maxFile).toString,
654
               "amount"->UID.random(minAmount,maxAmount).toString,
655
               "rabbitMQEnabled" -> sendViaRabbitMQ.toString,
656
               "debugEnabled" -> sendDebugEnabled.toString
657
                //"status" -> UID.random(List("off","on"))
658
        )})
696
        val b = m.send("value"->UID.random(minFile,maxFile).toString,
697
                       "amount"->UID.random(minAmount,maxAmount).toString,
698
                       "rabbitMQEnabled" -> sendViaRabbitMQ.toString,
699
                       "debugEnabled" -> sendDebugEnabled.toString
700
                       //"status" -> UID.random(List("off","on"))
701
                      )
702
        if(b) m match {
703
          case _:DiskMessage => _resMsgs += 1
704
          case _:VMMessage => _vmMsgs += 1
705
          case _:AddCreditsMessage => _addMsgs +=1
706
        }
707
        b
708
      })
659 709
    Thread.sleep(wait)
660 710
    _billEntryMsg = getBillResponse(maxJSONRetry)
661 711
  }
......
760 810
    val (minFileCredits,maxFileCredits) = (s.minFileCredits,s.maxFileCredits)
761 811
    val (minUserCredits,maxUserCredits) = (s.maxUserCredits,s.maxUserCredits)
762 812
    //Cron spec  minutes hours day-of-month Month Day-of-Week (we do not specify seconds)
763
    AquariumInstance.run(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt) {
813
    //AquariumInstance.run(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt) {
764 814
      for{ r <- s.resources}  // create messages
765 815
        r.resType match {
766 816
          case "vm" =>
......
774 824
      user.run(s.sendOrdered,s.billResponseWaitMillis.toInt,s.minFileCredits.toInt,
775 825
               s.maxFileCredits.toInt,s.minUserCredits.toInt,s.maxUserCredits.toInt,
776 826
               s.numberOfResponseRetries.toInt,s.sendViaRabbitMQ,s.sendDebugEnabled)
777
    }
827
    //}
778 828
    user
779 829
  }
780 830

  
831
  private[this] def runAquarium[A](billWait:Long,stop:Long,default:A)(forkJoinCode: => A) : A = {
832
     Console.err.println("Starting aquarium")
833
     AquariumInstance.aquarium.start
834
     Thread.sleep(billWait)
835
     Console.err.println("Starting aquarium  (%d seconds) --- DONE".format(billWait/1000))
836
     try{
837
       forkJoinCode
838
     } finally {
839
       Console.err.println("Stopping aquarium")
840
       AquariumInstance.aquarium.stop
841
       Thread.sleep(stop)
842
       Console.err.println("Stopping aquarium --- DONE")
843
       default
844
     }
845
  }
846

  
781 847
  def runScenario(s:Scenario): Unit = {
782 848
    if(s.ignoreScenario == false) {
783 849
      Console.err.println("=================\nRunning scenario:\n %s\n=======================\n".format(s.toJsonString))
784
      val tasks = for { u <- 1 to s.numberOfUsers.toInt}
785
                  yield scala.actors.Futures.future(runUser(s))
786
      val users = for { u <- tasks}  yield u()
787
      users.foreach {u =>
788
         if(s.printMessages) u.printMessages()
789
         if(s.printResponses) u.printResponse()
790
         if(s.validationEnabled && u.validateResults() == false)
791
           Console.err.println("Validation FAILED for user " + u)
850
      runAquarium(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt,List[User]()){
851
        val tasks = for { u <- 1 to s.numberOfUsers.toInt}
852
                    yield scala.actors.Futures.future(runUser(s))
853
        tasks.map(_()).toList
854
      }.foreach{ u =>
855
        if(s.printMessages) u.printMessages()
856
        if(s.printResponses) u.printResponse()
857
        if(s.validationEnabled && u.validateResults() == false)
858
          Console.err.println("Validation FAILED for user " + u)
792 859
      }
793 860
      Console.err.println("\n=========================\nStopping scenario\n=======================")
794 861
    }

Also available in: Unified diff