import gr.grnet.aquarium.util.{Lock, Loggable}
import java.util.{Date, Calendar, GregorianCalendar}
import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
-import gr.grnet.aquarium.policy.CronSpec
-import gr.grnet.aquarium.message.avro.gen.{BillEntryMsg, IMEventMsg, ResourceEventMsg}
+import message.avro.gen._
import org.apache.avro.specific.SpecificRecord
+import policy.CronSpec
import util.json.JsonSupport
+import scala.Some
+import scala.Tuple2
+import java.util.concurrent.locks.ReentrantLock
/*
object UID {
+
+ private[this] var privCounters = Map[String,Long]()
+ private[this] val lock = new Lock()
+
+ def next(s:String) : Long = {
+ val l = lock.withLock{
+ privCounters.get(s) match {
+ case None => 1
+ case Some(l) => l+1
+ }
+ }
+ privCounters = privCounters + ((s,l))
+ l
+ }
+
private[this] val counter = new AtomicLong(0L)
def next() = counter.getAndIncrement
def random(min:Int=Int.MinValue,max:Int=Int.MaxValue) =
}
private[this] val count=new java.util.concurrent.atomic.AtomicLong()
+ private[this] val ready=new java.util.concurrent.atomic.AtomicBoolean(false)
+
def run(billWait:Int, stop:Int)(f : => Unit) = {
- if(count.addAndGet(1) == 1)
+ if(count.addAndGet(1) == 1){
+ Console.err.println("Starting aquarium")
aquarium.start
- Thread.sleep(billWait)
+ Thread.sleep(billWait)
+ Console.err.println("Starting aquarium (%d seconds) --- DONE".format(billWait/1000))
+ ready.set(true)
+ this.synchronized(this.notifyAll)
+ }
try{
+ if(!ready.get) this.synchronized(this.wait)
f
} finally {
- Console.err.println("Stopping aquarium")
- if(count.addAndGet(-1) == 0)
- aquarium.stop
- Thread.sleep(stop)
- Console.err.println("Stopping aquarium --- DONE")
+ if(count.addAndGet(-1) == 0){
+ Console.err.println("Stopping aquarium")
+ aquarium.stop
+ Thread.sleep(stop)
+ Console.err.println("Stopping aquarium --- DONE")
+ }
}
}
}
} */
object MessageService {
+ private[this] val lock = new Lock
def send(event:SpecificRecord, rabbitMQEnabled : Boolean = false, debugEnabled:Boolean =false) = {
val json = AvroHelpers.jsonStringOfSpecificRecord(event)
AquariumInstance.aquarium.imEventStore.insertIMEvent(imevent)
imevent.getUserID
}
- val userActorRef = AquariumInstance.aquarium.akkaService.getOrCreateUserActor(uid)
+ val userActorRef = lock.withLock(AquariumInstance.aquarium.akkaService.getOrCreateUserActor(uid))
userActorRef ! event
}
val millis = event match {
if((_cronSpec==null || _cronSpec.cronSpec != spec ||cal.get(Calendar.MONTH) != month -1)) {
val d1 = getDate(1,if(month==12) 1 else month+1,year,0,0,0)
val d0 = getDate(1,month,year,0,0,0)
- _range = Timeslot(d0,d1 - 1000)
+ _range = Timeslot((d0/1000)*1000,(d1/1000)*1000 - 1000)
+ cal.setTimeInMillis(d0)
_cronSpec = new CronSpec(if(spec.isEmpty) "* * * * *" else spec)
}
case _ => ()
}
def year : Int = {
+ val tmp = getMillis
cal.setTimeInMillis(System.currentTimeMillis())
- cal.get(Calendar.YEAR)
+ val ret = cal.get(Calendar.YEAR)
+ cal.setTimeInMillis(tmp)
+ ret
}
def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int,sec:Int) : Long = {
+ val tmp = getMillis
cal.set(year,month-1,day,hour,min,sec)
- cal.getTimeInMillis
+ val ret = cal.getTimeInMillis
+ cal.setTimeInMillis(tmp)
+ ret
}
def getMillis : Long = cal.getTimeInMillis
def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int) : Long =
getDate(day,month,year,hour,min,0)
- def setMillis(millis:Long) = {
+ /*def setMillis(millis:Long) = {
cal.setTimeInMillis(millis)
- }
+ }*/
- def addMillis(day:Int,hour:Int) = {
+ /*def addMillis(day:Int,hour:Int) = {
cal.roll(Calendar.DATE,day)
cal.roll(Calendar.DATE,hour)
- }
+ }*/
- def nextID = UID.next
+ def nextID = UID.next(this.getClass().getName) /*this match {
+ case _:DiskMessage => UID.next("DiskMessage")
+ case _:CreationMessage => UID.next("CreationMessage")
+ case _:VMMessage => UID.next("VMMessage")
+ case _:AddCreditsMessage => UID.next("AddCreditsMessage")
+ }*/
def makeEvent(millis:Long,map:Map[String,String]) : SpecificRecord
MessageFactory.newDetails(),
uid
)
-
msg
}
}
MessageFactory.newDetails(),
uid
)
-
msg
}
}
var _resources : List[Message] = Nil
var _billEntryMsg :Option[BillEntryMsg] = None
+
override def toString() = uid
+ def scalaList[A](s:java.util.List[A]) : List[A] = {
+ import scala.collection.JavaConverters.asScalaBufferConverter
+ s.asScala.toList
+ }
+
+ def sumOf[A,D](l: java.util.List[A],start:D)(f:A=>D)(add:(D,D)=>D) : D =
+ scalaList(l).map(f).foldLeft(start) {case (sum,v) => add(sum,v) }
+
+
+ def checkSum[A,D](s:D,l: java.util.List[A],start:D)(f:A=>D)(add:(D,D)=>D) =
+ check(s == sumOf(l,start)(f)(add))
+
+ def check(b: => Boolean) = {
+ if(!b)
+ throw new Exception("Invalid property")
+ }
+
+
+ type S[A] = {def getTotalCredits : String
+ def getTotalElapsedTime:String
+ def getTotalUnits:String
+ def getDetails:java.util.List[A]}
+ type V = (Double,Long,Double)
+
+ def valuesOf[A,T<:S[A]](t:T) : (V,java.util.List[A]) =
+ ((t.getTotalCredits.toDouble,
+ t.getTotalElapsedTime.toLong,
+ t.getTotalUnits.toDouble),
+ t.getDetails)
+
+ def checkS[A,T<:S[A]](s:T)(f:A=>V)(add:(V,V)=>V) = {
+ val zero = (0D,0L,0D)
+ val (v0,v1) = valuesOf[A,T](s)
+ checkSum(v0,v1,zero)(f)(add)
+ }
+
+ def add(a:V,b:V,pos:Boolean=false) : V= {
+ val (a1,a2,a3) = a
+ val (b1,b2,b3) = b
+ if(pos && b1 < 0.0D) a else
+ (a1+b1,a2+b2,a3+b3)
+ }
+
+ def validateBillEntry(b:BillEntryMsg) : Boolean = {
+
+ try{
+ check(uid == b.getUserID)
+ check(_creationMessage._range.from.getTime == b.getStartTime().toLong &&
+ _creationMessage._range.to.getTime == b.getEndTime().toLong)
+
+ /*for{ (s:S[ResourceEntryMsg]) <- scalaList(b.getDetails)}
+ checkS[ResourceEntryMsg,S[ResourceEntryMsg]](s){
+ case r:ResourceEntryMsg =>
+ valuesOf(r)._1
+ } add
+ */
+
+
+ true
+ /*val partialSums : List[Double] = for { s <- scalaList(b.getBill) } yield sumService(s)
+check(partialSums.foldLeft(0D){case (v,d) => v+ (if(d>0.0) d else 0.0)} == b.getDeductedCredits)*/
+ } catch {
+ case e:Exception =>
+ e.printStackTrace
+ false
+ }
+ }
+
def validateResults() : Boolean = {
- throw new Exception("Not implemented !!!!")
+ _billEntryMsg match {
+ case None => false
+ case Some(b) => validateBillEntry(b)
+ }
+ //throw new Exception("Not implemented !!!!")
}
- def printResults() = {
+ def printMessages() = {
Console.err.println("Messages sent:")
for { m <- JsonLog.get}
Console.err.println("%s".format(m)) //"\n==============\n%s\n==============="
Console.err.println("\n=========================\n")
+ }
+ def printResponse() = {
Console.err.println("Response:\n" + (_billEntryMsg match {
case None => "NONE!!!!"
case Some(r) => AvroHelpers.jsonStringOfSpecificRecord(r)
Map("instanceID"->"cyclades.vm.%d".format(i),
"vmName" -> "Virtual Machine #%d".format(i),
"status" -> "on", // initially "on" msg
- "spec" -> cronSpec)})
+ "spec" -> cronSpec.format(month))})
def addFiles(no:Int,action:String/*,value:Int,minVal:Int,maxVal:Int*/,spec:String) : User =
add(no,"disk",{i =>
Map("action" -> action,
"path"->"/Papers/file_%d.PDF".format(i),
//"value"->UID.random(minVal,maxVal).toString,
- "spec" -> spec
+ "spec" -> spec.format(month)
)
})
def addCredits(no:Int,spec:String) : User = {
- add(no,"credits",/*"month"->month.toString,"uid"->uid,*/"spec"->spec/*,"amount"->amount.toString*/)
+ add(no,"credits",/*"month"->month.toString,"uid"->uid,*/"spec"->spec.format(month)
+ /*,"amount"->amount.toString*/)
}
def run(ordered:Boolean,wait:Int,minFile:Int,maxFile:Int,minAmount:Int,maxAmount:Int,maxJSONRetry :Int,
case class Scenario(
val ignoreScenario : Boolean,
+ val printMessages : Boolean,
+ val printResponses: Boolean,
val host : String,
val port : Long,
val sendOrdered : Boolean,
yield scala.actors.Futures.future(runUser(s))
val users = for { u <- tasks} yield u()
users.foreach {u =>
- u.printResults()
+ if(s.printMessages) u.printMessages()
+ if(s.printResponses) u.printResponse()
if(s.validationEnabled && u.validateResults() == false)
Console.err.println("Validation FAILED for user " + u)
}
}
object UserTest extends Loggable {
-
- //vm,disk,credits
- //add(1,"credits","month"->month.toString,"uid"->uid,"spec"->spec,"amount"->amount.toString)
- /*
- val host : String,
- val port : Long,
- val sendOrdered : Boolean,
- val sendViaRabbitMQ : Boolean,
- val sendDebugEnabled : Boolean,
- val validationEnabled : Boolean,
- val billingMonth: Long,
- val aquariumStartWaitMillis : Long,
- val aquariumStopWaitMillis : Long,
- val billResponseWaitMillis : Long,
- val numberOfUsers : Long,
- val numberOfResponseRetries : Long,
- val minFileCredits : Long,
- val maxFileCredits : Long,
- val minUserCredits : Long,
- val maxUserCredits : Long,
- val resources : List[Resource]
-
- */
- val basic = new Scenario(false,"localhost",8888,true,false,false,false,9,2000,2000,2000,
+/*
+ JSON example:
+ {
+ "scenarios":[{
+ "ignoreScenario":false,
+ "printMessages":false,
+ "printResponses":true,
+ "host":"localhost",
+ "port":8888,
+ "sendOrdered":true,
+ "sendViaRabbitMQ":false,
+ "sendDebugEnabled":false,
+ "validationEnabled":false,
+ "billingMonth":9,
+ "aquariumStartWaitMillis":2000,
+ "aquariumStopWaitMillis":2000,
+ "billResponseWaitMillis":2000,
+ "numberOfUsers":2,
+ "numberOfResponseRetries":10,
+ "minFileCredits":2000,
+ "maxFileCredits":5000,
+ "minUserCredits":10000,
+ "maxUserCredits":50000,
+ "resources":[{
+ "resType":"credits",
+ "instances":1,
+ "cronSpec":"00 00 10,12 %d ?"
+ },{
+ "resType":"disk",
+ "instances":1,
+ "cronSpec":"00 18 15,20,29,30 %d ?"
+ },{
+ "resType":"vm",
+ "instances":1,
+ "cronSpec":"00 18 14,17,19,20 %d ?"
+ }]
+ }]
+}
+ */
+ val basic = new Scenario(false,false,true,"localhost",8888,true,false,false,false,9,2000,2000,2000,
1,10,2000,5000,10000,50000,List[Resource](
- new Resource("credits",1, "00 00 10,12 9 ?"),
- new Resource("disk",1,"00 18 15,20,29,30 9 ?"),
- new Resource("vm",1,"00 18 14,17,19,20 9 ?")
+ new Resource("credits",1, "00 00 10,12 %d ?".format(9)),
+ new Resource("disk",1,"00 18 15,20,29,30 %d ?".format(9)),
+ new Resource("vm",1,"00 18 14,17,19,20 %d ?".format(9))
))
def main(args: Array[String]) = {