import gr.grnet.aquarium.message.avro.{MessageFactory, OrderingHelpers, AvroHelpers}
import gr.grnet.aquarium.store._
import gr.grnet.aquarium.util._
+import gr.grnet.aquarium.util.Once
import gr.grnet.aquarium.util.json.JsonSupport
import gr.grnet.aquarium.{Aquarium, AquariumException}
import org.apache.avro.specific.SpecificRecord
private[store] lazy val imEvents = getCollection(MongoDBStore.IMEventCollection)
private[store] lazy val policies = getCollection(MongoDBStore.PolicyCollection)
+ private[store] lazy val indicesMap = {
+ val resev= new BasicDBObjectBuilder().
+ add(MongoDBStore.JsonNames.id,1).
+ add(MongoDBStore.JsonNames.userID,1).
+ add(MongoDBStore.JsonNames.occurredMillis,1).
+ add(MongoDBStore.JsonNames.receivedMillis,1).get
+ val imev = new BasicDBObjectBuilder().
+ add(MongoDBStore.JsonNames.userID,1).
+ add(MongoDBStore.JsonNames.eventType,"").
+ add(MongoDBStore.JsonNames.occurredMillis,1).get
+ val policy = new BasicDBObjectBuilder().
+ add("validFromMillis",1).
+ add("validToMillis",1).get
+ val user = new BasicDBObjectBuilder().
+ add( "occurredMillis",1).
+ add("isFullBillingMonth",false).
+ add("billingYear",1).
+ add("billingMonth",1).
+ add("billingMonthDay",1).get
+ Map(MongoDBStore.ResourceEventCollection -> resev,
+ MongoDBStore.IMEventCollection-> imev,
+ MongoDBStore.PolicyCollection-> policy,
+ MongoDBStore.UserStateCollection-> user
+ )
+ }
+ private[this] val once = new Once()
+
private[this] def doAuthenticate(db: DB) {
if(!db.isAuthenticated && !db.authenticate(username, password.toCharArray)) {
throw new AquariumException("Could not authenticate user %s".format(username))
private[this] def getCollection(name: String): DBCollection = {
val db = mongo.getDB(database)
doAuthenticate(db)
+ once.run { /* this code is thread-safe and will run exactly once*/
+ indicesMap.foreach { case (collection,obj) =>
+ mongo.getDB(database).getCollection(collection).createIndex(obj)
+ }
+ }
db.getCollection(name)
}
object MongoDBStore {
final val JsonNames = gr.grnet.aquarium.util.json.JsonNames
- final val ResourceEventCollection = "resevents"
+ final val collections = List("resevents","userstates","imevents","policies")
+
+ final val ResourceEventCollection = collections(0)
- final val UserStateCollection = "userstates"
+ final val UserStateCollection = collections(1)
- final val IMEventCollection = "imevents"
+ final val IMEventCollection = collections(2)
- final val PolicyCollection = "policies"
+ final val PolicyCollection = collections(3)
def firstResultIfExists[A](cursor: DBCursor, f: DBObject ⇒ A): Option[A] = {
withCloseable(cursor) { cursor ⇒
aquarium.start
Thread.sleep(billWait)
Console.err.println("Starting aquarium (%d seconds) --- DONE".format(billWait/1000))
- ready.set(true)
- this.synchronized(this.notifyAll)
+ this.synchronized{
+ ready.set(true)
+ this.synchronized(this.notifyAll)
+ }
}
try{
- if(!ready.get) this.synchronized(this.wait)
- f
+ this.synchronized{
+ while(!ready.get) this.wait
+ }
} finally {
- if(count.addAndGet(-1) == 0){
+ if(count.addAndGet(-1) == 0){
Console.err.println("Stopping aquarium")
aquarium.stop
Thread.sleep(stop)
val _creationMessage : Message = Message("create","uid"->uid,"month"->month.toString,"spec"->"")
var _resources : List[Message] = Nil
var _billEntryMsg :Option[BillEntryMsg] = None
-
+ var _resMsgs = 0
+ var _vmMsgs = 0
+ var _addMsgs = 0
+ var _messagesSent : List[Message] = Nil
override def toString() = uid
checkSum(v0,v1,zero)(f)(add)
}
- def add(a:V,b:V,pos:Boolean=false) : V= {
+ def add(a:V,b:V) : V= {
val (a1,a2,a3) = a
val (b1,b2,b3) = b
- if(pos && b1 < 0.0D) a else
+ //if(pos && b1 < 0.0D) a else
(a1+b1,a2+b2,a3+b3)
}
- def validateBillEntry(b:BillEntryMsg) : Boolean = {
+ val zero = (0.0D,0L,0.0D)
+
+ def filterMessagesSent(serviceName:String) : List[Message] =
+ _messagesSent.filter { (_,serviceName) match {
+ case (_:DiskMessage,"diskspace") => true
+ case (_:VMMessage,"vmtime") => true
+ case (_:AddCreditsMessage,"addcredits") => true
+ case _ => false
+ }}
+
+ def checkMessages(serviceName:String,c:List[ChargeEntryMsg],m:List[Message]) = {
+ if(m.length == 0) check(c.length == 0)
+ else check(c.length == (serviceName match {
+ case "diskspace" => m.length - 1
+ case "vmtime" => m.length - 1
+ case "addcredits" => m.length
+ }))
+ }
+
+ def validateChargeEntry(c:ChargeEntryMsg) : V = {
+ (c.getTotalCredits.toDouble,c.getTotalElapsedTime.toLong,c.getTotalUnits.toDouble)
+ }
+
+ def validateEventEntry(serviceName:String,e:EventEntryMsg) : V = {
+ val v1 = scalaList(e.getDetails)
+ val v2 = filterMessagesSent(serviceName)
+ checkMessages(serviceName,v1,v2)
+ //val v3 = (e.getTotalCredits.toDouble,e.getTotalElapsedTime.toLong,e.getTotalUnits.toDouble)
+ val v4 = sumOf(e.getDetails,zero)(validateChargeEntry)(add)
+ v4
+ }
+
+ def validateResourceEntry(serviceName:String,r:ResourceEntryMsg) : V = {
+ val v0 = (r.getTotalCredits.toDouble,r.getTotalElapsedTime.toLong,r.getTotalUnits.toDouble)
+ val v1 = sumOf(r.getDetails,zero)(validateEventEntry(serviceName,_))(add)
+ check(v0 == v1)
+ v0
+ }
+
+ def validateServiceEntry(s:ServiceEntryMsg) : Double = {
+ val v0 = (s.getTotalCredits.toDouble,s.getTotalElapsedTime.toLong,s.getTotalUnits.toDouble)
+ val v1 = sumOf(s.getDetails,zero)(validateResourceEntry(s.getServiceName,_))(add)
+ check(v0 == v1)
+ v0._1
+ }
+ def validateBillEntry(b:BillEntryMsg) : Boolean = {
try{
+ check(b.getStatus == "ok")
check(uid == b.getUserID)
check(_creationMessage._range.from.getTime == b.getStartTime().toLong &&
_creationMessage._range.to.getTime == b.getEndTime().toLong)
-
- /*for{ (s:S[ResourceEntryMsg]) <- scalaList(b.getDetails)}
- checkS[ResourceEntryMsg,S[ResourceEntryMsg]](s){
- case r:ResourceEntryMsg =>
- valuesOf(r)._1
- } add
- */
-
-
- true
- /*val partialSums : List[Double] = for { s <- scalaList(b.getBill) } yield sumService(s)
-check(partialSums.foldLeft(0D){case (v,d) => v+ (if(d>0.0) d else 0.0)} == b.getDeductedCredits)*/
+ check(b.getDeductedCredits.toDouble ==
+ sumOf(b.getDetails,0.0D)(validateServiceEntry)(_ + _))
+ true
} catch {
case e:Exception =>
e.printStackTrace
def run(ordered:Boolean,wait:Int,minFile:Int,maxFile:Int,minAmount:Int,maxAmount:Int,maxJSONRetry :Int,
sendViaRabbitMQ:Boolean, sendDebugEnabled : Boolean) = {
- var _messagesSent : List[Message] = Nil
+ _messagesSent = Nil
_creationMessage.send("month"->month.toString,"uid"->uid,"spec"->"0 0 * %d ?".format(month)) // send once!
//Thread.sleep(2000)
var iter = _resources.toList
case (Some(l),None) => false
}}).filter({m =>
_messagesSent = _messagesSent ::: List(m)
- m.send("value"->UID.random(minFile,maxFile).toString,
- "amount"->UID.random(minAmount,maxAmount).toString,
- "rabbitMQEnabled" -> sendViaRabbitMQ.toString,
- "debugEnabled" -> sendDebugEnabled.toString
- //"status" -> UID.random(List("off","on"))
- )})
+ val b = m.send("value"->UID.random(minFile,maxFile).toString,
+ "amount"->UID.random(minAmount,maxAmount).toString,
+ "rabbitMQEnabled" -> sendViaRabbitMQ.toString,
+ "debugEnabled" -> sendDebugEnabled.toString
+ //"status" -> UID.random(List("off","on"))
+ )
+ if(b) m match {
+ case _:DiskMessage => _resMsgs += 1
+ case _:VMMessage => _vmMsgs += 1
+ case _:AddCreditsMessage => _addMsgs +=1
+ }
+ b
+ })
Thread.sleep(wait)
_billEntryMsg = getBillResponse(maxJSONRetry)
}
val (minFileCredits,maxFileCredits) = (s.minFileCredits,s.maxFileCredits)
val (minUserCredits,maxUserCredits) = (s.maxUserCredits,s.maxUserCredits)
//Cron spec minutes hours day-of-month Month Day-of-Week (we do not specify seconds)
- AquariumInstance.run(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt) {
+ //AquariumInstance.run(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt) {
for{ r <- s.resources} // create messages
r.resType match {
case "vm" =>
user.run(s.sendOrdered,s.billResponseWaitMillis.toInt,s.minFileCredits.toInt,
s.maxFileCredits.toInt,s.minUserCredits.toInt,s.maxUserCredits.toInt,
s.numberOfResponseRetries.toInt,s.sendViaRabbitMQ,s.sendDebugEnabled)
- }
+ //}
user
}
+ private[this] def runAquarium[A](billWait:Long,stop:Long,default:A)(forkJoinCode: => A) : A = {
+ Console.err.println("Starting aquarium")
+ AquariumInstance.aquarium.start
+ Thread.sleep(billWait)
+ Console.err.println("Starting aquarium (%d seconds) --- DONE".format(billWait/1000))
+ try{
+ forkJoinCode
+ } finally {
+ Console.err.println("Stopping aquarium")
+ AquariumInstance.aquarium.stop
+ Thread.sleep(stop)
+ Console.err.println("Stopping aquarium --- DONE")
+ default
+ }
+ }
+
def runScenario(s:Scenario): Unit = {
if(s.ignoreScenario == false) {
Console.err.println("=================\nRunning scenario:\n %s\n=======================\n".format(s.toJsonString))
- val tasks = for { u <- 1 to s.numberOfUsers.toInt}
- yield scala.actors.Futures.future(runUser(s))
- val users = for { u <- tasks} yield u()
- users.foreach {u =>
- if(s.printMessages) u.printMessages()
- if(s.printResponses) u.printResponse()
- if(s.validationEnabled && u.validateResults() == false)
- Console.err.println("Validation FAILED for user " + u)
+ runAquarium(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt,List[User]()){
+ val tasks = for { u <- 1 to s.numberOfUsers.toInt}
+ yield scala.actors.Futures.future(runUser(s))
+ tasks.map(_()).toList
+ }.foreach{ u =>
+ if(s.printMessages) u.printMessages()
+ if(s.printResponses) u.printResponse()
+ if(s.validationEnabled && u.validateResults() == false)
+ Console.err.println("Validation FAILED for user " + u)
}
Console.err.println("\n=========================\nStopping scenario\n=======================")
}