-package gr.grnet.aquarium
-
-import com.ckkloverdos.resource.FileStreamResource
-import converter.StdConverters
-import event.model.im.StdIMEvent
-import event.model.resource.StdResourceEvent
-import java.io.{InputStreamReader, BufferedReader, File}
-import com.ckkloverdos.props.Props
-import store.memory.MemStoreProvider
-import java.util.concurrent.atomic.AtomicLong
-import java.text.SimpleDateFormat
-import java.net.{URLConnection, URL}
-import util.Loggable
-
/*
* Copyright 2011-2012 GRNET S.A. All rights reserved.
*
* or implied, of GRNET S.A.
*/
+package gr.grnet.aquarium
+
+import com.ckkloverdos.props.Props
+import converter.{JsonTextFormat, StdConverters}
+import gr.grnet.aquarium.message.avro.{AvroHelpers, MessageFactory}
+import java.io.{InputStreamReader, BufferedReader, File}
+import java.net.URL
+import java.util.concurrent.atomic.AtomicLong
+import gr.grnet.aquarium.util.{Lock, Loggable}
+import java.util.{Date, Calendar, GregorianCalendar}
+import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
+import message.avro.gen._
+import org.apache.avro.specific.SpecificRecord
+import policy.CronSpec
+import util.json.JsonSupport
+import scala.Some
+import scala.Tuple2
+import java.util.concurrent.locks.ReentrantLock
+
/*
* @author Prodromos Gerakios <pgerakios@grnet.gr>
*/
+
+
+object UID {
+
+ private[this] var privCounters = Map[String,Long]()
+ private[this] val lock = new Lock()
+
+ def next(s:String) : Long = {
+ val l = lock.withLock{
+ privCounters.get(s) match {
+ case None => 1
+ case Some(l) => l+1
+ }
+ }
+ privCounters = privCounters + ((s,l))
+ l
+ }
+
+ private[this] val counter = new AtomicLong(0L)
+ def next() = counter.getAndIncrement
+ def random(min:Int=Int.MinValue,max:Int=Int.MaxValue) =
+ min + (scala.math.random.toInt % (max+1)) % (max+1)
+
+ def random[A](l:List[A]) : A = {
+ val sz = l.size
+ if(sz==0) throw new Exception("random")
+ l(random(0,sz-1))
+ }
+}
+
+object Process {
+ private[this] def exec(cmd : String,func : String=>Unit) : Unit = {
+ val commands = cmd.split(" ")
+ val proc = new ProcessBuilder(commands: _*).redirectErrorStream(true).start();
+ val ins = new java.io.BufferedReader(new java.io.InputStreamReader(proc.getInputStream))
+ val sb = new StringBuilder
+
+ //spin off a thread to read process output.
+ val outputReaderThread = new Thread(new Runnable(){
+ def run : Unit = {
+ var ln : String = null
+ while({ln = ins.readLine; ln != null})
+ func(ln)
+ }
+ })
+ outputReaderThread.start()
+
+ //suspense this main thread until sub process is done.
+ proc.waitFor
+
+ //wait until output is fully read/completed.
+ outputReaderThread.join()
+
+ ins.close()
+ }
+ def exec(cmd:String) : Unit = exec(cmd,Console.err.println(_))
+}
+
+object Mongo {
+ def clear = Process.exec("mongo aquarium --eval db.resevents.remove();db.imevents.remove();db.policies.remove();db.userstates.remove()")
+}
+
+object AquariumInstance {
+ //val propsfile = new FileStreamResource(new File("aquarium.properties"))
+ var props: Props = ResourceLocator.AquariumProperties
+ // Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
+ val aquarium = {
+ Mongo.clear
+ new AquariumBuilder(props, ResourceLocator.DefaultPolicyMsg).
+ //update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
+ update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))).
+ build()
+ }
+
+ private[this] val count=new java.util.concurrent.atomic.AtomicLong()
+ private[this] val ready=new java.util.concurrent.atomic.AtomicBoolean(false)
+
+ def run(billWait:Int, stop:Int)(f : => Unit) = {
+ if(count.addAndGet(1) == 1){
+ Console.err.println("Starting aquarium")
+ aquarium.start
+ Thread.sleep(billWait)
+ Console.err.println("Starting aquarium (%d seconds) --- DONE".format(billWait/1000))
+ this.synchronized{
+ ready.set(true)
+ this.synchronized(this.notifyAll)
+ }
+ }
+ try{
+ this.synchronized{
+ while(!ready.get) this.wait
+ }
+ } finally {
+ if(count.addAndGet(-1) == 0){
+ Console.err.println("Stopping aquarium")
+ aquarium.stop
+ Thread.sleep(stop)
+ Console.err.println("Stopping aquarium --- DONE")
+ }
+ }
+ }
+}
+
+object JsonLog {
+ private[this] final val lock = new Lock()
+ private[this] var _log : List[String] = Nil
+ def add(json:String) = lock.withLock(_log = _log ::: List(json))
+ def get() : List[String] = lock.withLock(_log.toList)
+}
+
+/*object MessageQueue {
+ private[this] final val lock = new Lock()
+ private[this] var _sortedMsgs = SortedMap[Timeslot,(String,String,String)]
+} */
+
+object MessageService {
+ private[this] val lock = new Lock
+
+ def send(event:SpecificRecord, rabbitMQEnabled : Boolean = false, debugEnabled:Boolean =false) = {
+ val json = AvroHelpers.jsonStringOfSpecificRecord(event)
+ if(rabbitMQEnabled){
+ val (exchangeName,routingKey) = event match {
+ case rc:ResourceEventMsg => rc.getResource match {
+ case "vmtime" =>
+ ("cyclades","cyclades.resource.vmtime")
+ case "diskspace" =>
+ ("pithos","pithos.resource.diskspace")
+ case "addcredits" =>
+ ("astakos","astakos.resource")
+ case x =>
+ throw new Exception("send cast failed: %s".format(x))
+ }
+ case im:IMEventMsg =>
+ ("astakos","astakos.user")
+ case _ =>
+ throw new Exception("send cast failed")
+ }
+ AquariumInstance.aquarium(Aquarium.EnvKeys.rabbitMQProducer).
+ sendMessage(exchangeName,routingKey,json)
+ } else {
+ val uid = event match {
+ case rcevent: ResourceEventMsg =>
+ AquariumInstance.aquarium.resourceEventStore.insertResourceEvent(rcevent)
+ rcevent.getUserID
+ case imevent: IMEventMsg =>
+ AquariumInstance.aquarium.imEventStore.insertIMEvent(imevent)
+ imevent.getUserID
+ }
+ val userActorRef = lock.withLock(AquariumInstance.aquarium.akkaService.getOrCreateUserActor(uid))
+ userActorRef ! event
+ }
+ val millis = event match {
+ case rc:ResourceEventMsg => rc.getOccurredMillis
+ case im:IMEventMsg => im.getOccurredMillis
+ }
+ JsonLog.add(/*new Date(millis).toString + " ---- " +*/ json)
+ if(debugEnabled)
+ Console.err.println("Sent message:\n%s - %s\n".format(new Date(millis).toString,json))
+ }
+}
+
+abstract class Message {
+ val dbg = true
+ val cal = new GregorianCalendar
+ var _range : Timeslot = null
+ var _cronSpec : CronSpec = null
+ var _messagesSent = 0
+ //var _done = false
+ var _map = Map[String,String]()
+
+ def updateMap(args:Tuple2[String,String]*) : Message =
+ updateMap(args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
+
+ def updateMap(map:Map[String,String]) : Message = {
+ def mergeMap[A, B](ms: List[Map[A, B]])(f: (B, B) => B): Map[A, B] =
+ (Map[A, B]() /: (for (m <- ms; kv <- m) yield kv)) { (a, kv) =>
+ a + (if (a.contains(kv._1)) kv._1 -> f(a(kv._1), kv._2) else kv)
+ }
+ _map = mergeMap(List(_map,map))((v1,v2) => v2)
+ (_map.get("month"),_map.get("spec")) match {
+ case (Some((month0:String)),Some(spec)) =>
+ val month : Int = month0.toInt
+ if((_cronSpec==null || _cronSpec.cronSpec != spec ||cal.get(Calendar.MONTH) != month -1)) {
+ val d1 = getDate(1,if(month==12) 1 else month+1,year,0,0,0)
+ val d0 = getDate(1,month,year,0,0,0)
+ _range = Timeslot((d0/1000)*1000,(d1/1000)*1000 - 1000)
+ cal.setTimeInMillis(d0)
+ _cronSpec = new CronSpec(if(spec.isEmpty) "* * * * *" else spec)
+ }
+ case _ => ()
+ }
+ this
+ }
+
+ //def done = _done
+ def sentMessages = _messagesSent
+
+ def nextTime : Option[Long] = nextTime(false)
+
+ def nextTime(update:Boolean) : Option[Long] = {
+ _cronSpec match{
+ case null =>
+ None
+ case _ =>
+ _cronSpec.nextValidDate(_range,cal.getTime) match {
+ case Some(d) =>
+ val millis = d.getTime
+ if(update) cal.setTimeInMillis(millis)
+ Some(millis)
+ case None =>
+ None
+ }
+ }
+ }
+
+ def year : Int = {
+ val tmp = getMillis
+ cal.setTimeInMillis(System.currentTimeMillis())
+ val ret = cal.get(Calendar.YEAR)
+ cal.setTimeInMillis(tmp)
+ ret
+ }
+
+ def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int,sec:Int) : Long = {
+ val tmp = getMillis
+ cal.set(year,month-1,day,hour,min,sec)
+ val ret = cal.getTimeInMillis
+ cal.setTimeInMillis(tmp)
+ ret
+ }
+
+ def getMillis : Long = cal.getTimeInMillis
+
+ def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int) : Long =
+ getDate(day,month,year,hour,min,0)
+
+ /*def setMillis(millis:Long) = {
+ cal.setTimeInMillis(millis)
+ }*/
+
+ /*def addMillis(day:Int,hour:Int) = {
+ cal.roll(Calendar.DATE,day)
+ cal.roll(Calendar.DATE,hour)
+ }*/
+
+ def nextID = UID.next(this.getClass().getName) /*this match {
+ case _:DiskMessage => UID.next("DiskMessage")
+ case _:CreationMessage => UID.next("CreationMessage")
+ case _:VMMessage => UID.next("VMMessage")
+ case _:AddCreditsMessage => UID.next("AddCreditsMessage")
+ }*/
+
+ def makeEvent(millis:Long,map:Map[String,String]) : SpecificRecord
+
+ def send(args:Tuple2[String,String]*) : Boolean =
+ send(args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
+
+ def send(map:Map[String,String]) : Boolean = {
+ nextTime(true) match {
+ case Some(millis) =>
+ updateMap(map)
+ val event = makeEvent(millis,_map)
+ val ren = _map.getOrElse("rabbitMQEnabled","false").toBoolean
+ val rdb = _map.getOrElse("debugEnabled","false").toBoolean
+ MessageService.send(event,ren,rdb)
+ _messagesSent += 1
+ true
+ case None =>
+ //_done = true
+ false
+ }
+ }
+
+}
+
+class DiskMessage extends Message {
+ /*
+ * map:
+ * "action" -> "update" , "delete" , "purge"
+ * "uid" ->
+ * "path" ->
+ * "value" ->
+ */
+ def makeEvent(millis:Long,map:Map[String,String]) = {
+ val action = map("action")
+ val uid = map("uid")
+ val path = map("path")
+ val value = map("value")
+ val id = "rc.%d.object.%s".format(nextID,action)
+ val occurredMillis = millis
+ val receivedMillis = millis
+ val userID = uid //"user%s@grnet.gr".format(uid)
+ val clientID = "pithos"
+ val resource ="diskspace"
+ val instanceID = "1"
+ val eventVersion = "1.0"
+ val details = MessageFactory.newDetails(
+ MessageFactory.newStringDetail("action", "object %s".format(action)),
+ MessageFactory.newStringDetail("total", "0.0"),
+ MessageFactory.newStringDetail("user", userID),
+ MessageFactory.newStringDetail("path", path)
+ )
+
+ val msg = MessageFactory.newResourceEventMsg(
+ id,
+ occurredMillis, receivedMillis,
+ userID, clientID,
+ resource, instanceID,
+ value,
+ eventVersion,
+ details,
+ uid
+ )
+
+ msg
+ }
+}
+
+class VMMessage extends Message {
+ /*
+ * map:
+ * uid -> unique id for user
+ * instanceID -> "cyclades.vm.kJSOLek"
+ * vmName -> "My Lab VM"
+ * status -> "on", "off" , "destroy"
+ */
+ var _status = "on"
+ def nextStatus = {
+ if(_status=="off") _status = "on" else _status = "off"
+ _status
+ }
+ def makeEvent(millis:Long,map:Map[String,String]) = {
+ val uid = map("uid")
+ val value = /* map("status")*/nextStatus match {
+ case "on" => "1"
+ case "off" => "0"
+ case "destroy" => "2"
+ case x => throw new Exception("VMMessage bad status: %s".format(x))
+ }
+ val id = "rc.%d.vmtime".format(nextID)
+ val occurredMillis = millis
+ val receivedMillis = millis
+ val userID = uid // "user%s@grnet.gr".format(uid)
+ val clientID = "cyclades"
+ val resource ="vmtime"
+ val instanceID = map("instanceID")
+ val eventVersion = "1.0"
+ val details = MessageFactory.newDetails(
+ MessageFactory.newStringDetail("VM Name", map("vmName"))
+ )
+
+ val msg = MessageFactory.newResourceEventMsg(
+ id,
+ occurredMillis, receivedMillis,
+ userID, clientID,
+ resource, instanceID,
+ value,
+ eventVersion,
+ details,
+ uid
+ )
+
+ msg
+ }
+ }
+
+class CreationMessage extends Message {
+ /*
+ * map contains:
+ * uid -> user id
+ */
+ def makeEvent(millis:Long,map:Map[String,String]) = {
+ val uid = map("uid") //
+ val id = "im.%d.create.user".format(nextID)
+ val occurredMillis = millis
+ val receivedMillis = millis
+ val userID = uid //"user%d@grnet.gr".format(mid)
+ val clientID = "astakos"
+ val isActive = false
+ val role = "default"
+ val eventVersion = "1.0"
+ val eventType = "create"
+
+ val msg = MessageFactory.newIMEventMsg(
+ id,
+ occurredMillis, receivedMillis,
+ userID, clientID,
+ isActive,
+ role,
+ eventVersion, eventType,
+ MessageFactory.newDetails(),
+ uid
+ )
+ msg
+ }
+}
+
+class AddCreditsMessage extends Message {
+ /*
+ * map contains:
+ * amount -> "2000"
+ * uid -> loverdos1
+ */
+ def makeEvent(millis:Long,map:Map[String,String]) = {
+ val uid = map("uid") //
+ val amount = map("amount")
+ val id = "im.%d.add.credits".format(nextID)
+ val occurredMillis = millis
+ val receivedMillis = millis
+ val userID = uid //"user%d@grnet.gr".format(uid)
+ val clientID = "astakos"
+ val isActive = false
+ val role = "default"
+ val eventVersion = "1.0"
+ val eventType = "addcredits"
+ val msg = MessageFactory.newResourceEventMsg(
+ id,
+ occurredMillis, receivedMillis,
+ userID, clientID,
+ "addcredits", "addcredits",
+ amount,
+ eventVersion,
+ MessageFactory.newDetails(),
+ uid
+ )
+ msg
+ }
+}
+
+object Message {
+ def apply(typ:String,args:Tuple2[String,String]*) : Message =
+ apply(typ,args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
+
+ val msgMap = Map[String,()=>Message](
+ "vm" -> (() => new VMMessage),
+ "disk" -> (() => new DiskMessage),
+ "create" -> (() => new CreationMessage),
+ "credits" -> (() => new AddCreditsMessage)
+ )
+
+ def apply(typ:String,map:Map[String,String]) : Message = {
+ val msg = msgMap.getOrElse(typ,throw new Exception("Invalid type : "+typ))()
+ msg.updateMap(map)
+ msg
+ }
+}
+
+
+class User(serverAndPort:String,month:Int) {
+ val uid = "user%d@grnet.gr".format(UID.next)
+ val _creationMessage : Message = Message("create","uid"->uid,"month"->month.toString,"spec"->"")
+ var _resources : List[Message] = Nil
+ var _billEntryMsg :Option[BillEntryMsg] = None
+ var _resMsgs = 0
+ var _vmMsgs = 0
+ var _addMsgs = 0
+ var _messagesSent : List[Message] = Nil
+
+ override def toString() = uid
+
+ def scalaList[A](s:java.util.List[A]) : List[A] = {
+ import scala.collection.JavaConverters.asScalaBufferConverter
+ s.asScala.toList
+ }
+
+ def sumOf[A,D](l: java.util.List[A],start:D)(f:A=>D)(add:(D,D)=>D) : D =
+ scalaList(l).map(f).foldLeft(start) {case (sum,v) => add(sum,v) }
+
+
+ def checkSum[A,D](s:D,l: java.util.List[A],start:D)(f:A=>D)(add:(D,D)=>D) =
+ check(s == sumOf(l,start)(f)(add))
+
+ def check(b: => Boolean) = {
+ if(!b)
+ throw new Exception("Invalid property")
+ }
+
+
+ type S[A] = {def getTotalCredits : String
+ def getTotalElapsedTime:String
+ def getTotalUnits:String
+ def getDetails:java.util.List[A]}
+ type V = (Double,Long,Double)
+
+ def valuesOf[A,T<:S[A]](t:T) : (V,java.util.List[A]) =
+ ((t.getTotalCredits.toDouble,
+ t.getTotalElapsedTime.toLong,
+ t.getTotalUnits.toDouble),
+ t.getDetails)
+
+ def checkS[A,T<:S[A]](s:T)(f:A=>V)(add:(V,V)=>V) = {
+ val zero = (0D,0L,0D)
+ val (v0,v1) = valuesOf[A,T](s)
+ checkSum(v0,v1,zero)(f)(add)
+ }
+
+ def add(a:V,b:V) : V= {
+ val (a1,a2,a3) = a
+ val (b1,b2,b3) = b
+ //if(pos && b1 < 0.0D) a else
+ (a1+b1,a2+b2,a3+b3)
+ }
+
+ val zero = (0.0D,0L,0.0D)
+
+ def filterMessagesSent(serviceName:String) : List[Message] =
+ _messagesSent.filter { (_,serviceName) match {
+ case (_:DiskMessage,"diskspace") => true
+ case (_:VMMessage,"vmtime") => true
+ case (_:AddCreditsMessage,"addcredits") => true
+ case _ => false
+ }}
+
+ def checkMessages(serviceName:String,c:List[ChargeEntryMsg],m:List[Message]) = {
+ if(m.length == 0) check(c.length == 0)
+ else check(c.length == (serviceName match {
+ case "diskspace" => m.length - 1
+ case "vmtime" => m.length - 1
+ case "addcredits" => m.length
+ }))
+ }
+
+ def validateChargeEntry(c:ChargeEntryMsg) : V = {
+ (c.getTotalCredits.toDouble,c.getTotalElapsedTime.toLong,c.getTotalUnits.toDouble)
+ }
+
+ def validateEventEntry(serviceName:String,e:EventEntryMsg) : V = {
+ val v1 = scalaList(e.getDetails)
+ val v2 = filterMessagesSent(serviceName)
+ checkMessages(serviceName,v1,v2)
+ //val v3 = (e.getTotalCredits.toDouble,e.getTotalElapsedTime.toLong,e.getTotalUnits.toDouble)
+ val v4 = sumOf(e.getDetails,zero)(validateChargeEntry)(add)
+ v4
+ }
+
+ def validateResourceEntry(serviceName:String,r:ResourceEntryMsg) : V = {
+ val v0 = (r.getTotalCredits.toDouble,r.getTotalElapsedTime.toLong,r.getTotalUnits.toDouble)
+ val v1 = sumOf(r.getDetails,zero)(validateEventEntry(serviceName,_))(add)
+ check(v0 == v1)
+ v0
+ }
+
+ def validateServiceEntry(s:ServiceEntryMsg) : Double = {
+ val v0 = (s.getTotalCredits.toDouble,s.getTotalElapsedTime.toLong,s.getTotalUnits.toDouble)
+ val v1 = sumOf(s.getDetails,zero)(validateResourceEntry(s.getServiceName,_))(add)
+ check(v0 == v1)
+ v0._1
+ }
+
+ def validateBillEntry(b:BillEntryMsg) : Boolean = {
+ try{
+ check(b.getStatus == "ok")
+ check(uid == b.getUserID)
+ check(_creationMessage._range.from.getTime == b.getStartTime().toLong &&
+ _creationMessage._range.to.getTime == b.getEndTime().toLong)
+ check(b.getDeductedCredits.toDouble ==
+ sumOf(b.getDetails,0.0D)(validateServiceEntry)(_ + _))
+ true
+ } catch {
+ case e:Exception =>
+ e.printStackTrace
+ false
+ }
+ }
+
+ def validateResults() : Boolean = {
+ _billEntryMsg match {
+ case None => false
+ case Some(b) => validateBillEntry(b)
+ }
+ //throw new Exception("Not implemented !!!!")
+ }
+
+ def printMessages() = {
+ Console.err.println("Messages sent:")
+ for { m <- JsonLog.get}
+ Console.err.println("%s".format(m)) //"\n==============\n%s\n==============="
+ Console.err.println("\n=========================\n")
+ }
+ def printResponse() = {
+ Console.err.println("Response:\n" + (_billEntryMsg match {
+ case None => "NONE!!!!"
+ case Some(r) => AvroHelpers.jsonStringOfSpecificRecord(r)
+ }))
+ }
+
+ def add(no:Int,typ:String,args:Tuple2[String,String]*) : User =
+ add(no,typ,args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
+
+ def add(no:Int,typ:String,map:Map[String,String]) : User =
+ add(no,typ,{_ => map})
+
+ def add(no:Int,typ:String,map:Int=>Map[String,String]) : User = {
+ for {i <- 1 to no} {
+ val map0 : Map[String,String] = map(i) + ("uid"->uid) + ("month"->month.toString)
+ _resources = Message(typ,map0) :: _resources
+ }
+ this
+ }
+
+ def addVMs(no:Int,cronSpec:String) : User =
+ add(no,"vm",{i =>
+ Map("instanceID"->"cyclades.vm.%d".format(i),
+ "vmName" -> "Virtual Machine #%d".format(i),
+ "status" -> "on", // initially "on" msg
+ "spec" -> cronSpec.format(month))})
+
+ def addFiles(no:Int,action:String/*,value:Int,minVal:Int,maxVal:Int*/,spec:String) : User =
+ add(no,"disk",{i =>
+ //Console.err.println("Adding file : " + "/Papers/file_%d.PDF".format(i))
+ Map("action" -> action,
+ "path"->"/Papers/file_%d.PDF".format(i),
+ //"value"->UID.random(minVal,maxVal).toString,
+ "spec" -> spec.format(month)
+ )
+ })
+
+ def addCredits(no:Int,spec:String) : User = {
+ add(no,"credits",/*"month"->month.toString,"uid"->uid,*/"spec"->spec.format(month)
+ /*,"amount"->amount.toString*/)
+ }
+
+ def run(ordered:Boolean,wait:Int,minFile:Int,maxFile:Int,minAmount:Int,maxAmount:Int,maxJSONRetry :Int,
+ sendViaRabbitMQ:Boolean, sendDebugEnabled : Boolean) = {
+ _messagesSent = Nil
+ _creationMessage.send("month"->month.toString,"uid"->uid,"spec"->"0 0 * %d ?".format(month)) // send once!
+ //Thread.sleep(2000)
+ var iter = _resources.toList
+ while(!iter.isEmpty)
+ iter = (if(!ordered) iter
+ else iter.sortWith{(m1,m2) => (m1.nextTime,m2.nextTime) match {
+ case (Some(l1),Some(l2)) => l1 <= l2
+ case (None,None) => true
+ case (None,Some(l)) => true
+ case (Some(l),None) => false
+ }}).filter({m =>
+ _messagesSent = _messagesSent ::: List(m)
+ val b = m.send("value"->UID.random(minFile,maxFile).toString,
+ "amount"->UID.random(minAmount,maxAmount).toString,
+ "rabbitMQEnabled" -> sendViaRabbitMQ.toString,
+ "debugEnabled" -> sendDebugEnabled.toString
+ //"status" -> UID.random(List("off","on"))
+ )
+ if(b) m match {
+ case _:DiskMessage => _resMsgs += 1
+ case _:VMMessage => _vmMsgs += 1
+ case _:AddCreditsMessage => _addMsgs +=1
+ }
+ b
+ })
+ Thread.sleep(wait)
+ _billEntryMsg = getBillResponse(maxJSONRetry)
+ }
+
+ private[this] def getBillResponse(max:Int) : Option[BillEntryMsg] = {
+ def get () : String = {
+ val fromMillis = _creationMessage._range.from.getTime
+ val toMillis = _creationMessage._range.to.getTime
+ val url = " http://%s/user/%s/bill/%d/%d".format(serverAndPort,uid,fromMillis,toMillis)
+ try{
+ val in = new BufferedReader(
+ new InputStreamReader(
+ new URL(url).openConnection().
+ getInputStream()))
+ var inputLine = ""
+ var ret = ""
+ while ({inputLine = in.readLine();inputLine} != null)
+ ret += (if(ret.isEmpty) "" else "\n")+ inputLine
+ in.close()
+ ret
+ } catch {
+ case e:Exception =>
+ ""
+ }
+ }
+ var resp = ""
+ var count = 0
+ var ret : Option[BillEntryMsg] = None
+ while(resp.isEmpty && count < max){
+ if(count > 0) Console.err.println("Retrying for bill request.")
+ resp = get()
+ if(resp.isEmpty) Thread.sleep(1000)
+ else {
+ try{
+ var b = AvroHelpers.specificRecordOfJsonString(resp, new BillEntryMsg)
+ ret = Some(b)
+ if(b.getStatus().equals("processing")){
+ Thread.sleep(1000)
+ resp = ""
+ }
+ } catch {
+ case e:Exception =>
+ e.printStackTrace
+ resp = ""
+ }
+ }
+ //sleep(1000L)
+ count += 1
+ }
+ ret
+ }
+}
+
+case class Resource(
+ val resType : String, // Message.msgMap.keys
+ val instances: Long,
+ val cronSpec : String
+ )
+extends JsonSupport {}
+
+case class Scenario(
+ val ignoreScenario : Boolean,
+ val printMessages : Boolean,
+ val printResponses: Boolean,
+ val host : String,
+ val port : Long,
+ val sendOrdered : Boolean,
+ val sendViaRabbitMQ : Boolean,
+ val sendDebugEnabled : Boolean,
+ val validationEnabled : Boolean,
+ val billingMonth: Long,
+ val aquariumStartWaitMillis : Long,
+ val aquariumStopWaitMillis : Long,
+ val billResponseWaitMillis : Long,
+ val numberOfUsers : Long,
+ val numberOfResponseRetries : Long,
+ val minFileCredits : Long,
+ val maxFileCredits : Long,
+ val minUserCredits : Long,
+ val maxUserCredits : Long,
+ val resources : List[Resource]
+)
+extends JsonSupport {}
+
+case class Scenarios(
+ val scenarios : List[Scenario] )
+extends JsonSupport {}
+
+object ScenarioRunner {
+ val aquarium = AquariumInstance.aquarium
+
+ def parseScenario(txt:String) : Scenario =
+ StdConverters.AllConverters.convertEx[Scenario](JsonTextFormat(txt))
+
+ def parseScenarios(txt:String) : Scenarios =
+ StdConverters.AllConverters.convertEx[Scenarios](JsonTextFormat(txt))
+
+ def runScenario(txt:String) : Unit = runScenario(parseScenario(txt))
+
+ private[this] def runUser(s:Scenario) : User = {
+ val user = new User("%s:%d".format(s.host,s.port),s.billingMonth.toInt)
+ val (minFileCredits,maxFileCredits) = (s.minFileCredits,s.maxFileCredits)
+ val (minUserCredits,maxUserCredits) = (s.maxUserCredits,s.maxUserCredits)
+ //Cron spec minutes hours day-of-month Month Day-of-Week (we do not specify seconds)
+ //AquariumInstance.run(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt) {
+ for{ r <- s.resources} // create messages
+ r.resType match {
+ case "vm" =>
+ user.addVMs(r.instances.toInt,r.cronSpec)
+ case "disk" =>
+ user.addFiles(r.instances.toInt,"update",r.cronSpec)
+ case "credits" =>
+ user.addCredits(r.instances.toInt,r.cronSpec)
+ }
+ // run scenario
+ user.run(s.sendOrdered,s.billResponseWaitMillis.toInt,s.minFileCredits.toInt,
+ s.maxFileCredits.toInt,s.minUserCredits.toInt,s.maxUserCredits.toInt,
+ s.numberOfResponseRetries.toInt,s.sendViaRabbitMQ,s.sendDebugEnabled)
+ //}
+ user
+ }
+
+ private[this] def runAquarium[A](billWait:Long,stop:Long,default:A)(forkJoinCode: => A) : A = {
+ Console.err.println("Starting aquarium")
+ AquariumInstance.aquarium.start
+ Thread.sleep(billWait)
+ Console.err.println("Starting aquarium (%d seconds) --- DONE".format(billWait/1000))
+ try{
+ forkJoinCode
+ } finally {
+ Console.err.println("Stopping aquarium")
+ AquariumInstance.aquarium.stop
+ Thread.sleep(stop)
+ Console.err.println("Stopping aquarium --- DONE")
+ default
+ }
+ }
+
+ def runScenario(s:Scenario): Unit = {
+ if(s.ignoreScenario == false) {
+ Console.err.println("=================\nRunning scenario:\n %s\n=======================\n".format(s.toJsonString))
+ runAquarium(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt,List[User]()){
+ val tasks = for { u <- 1 to s.numberOfUsers.toInt}
+ yield scala.actors.Futures.future(runUser(s))
+ tasks.map(_()).toList
+ }.foreach{ u =>
+ if(s.printMessages) u.printMessages()
+ if(s.printResponses) u.printResponse()
+ if(s.validationEnabled && u.validateResults() == false)
+ Console.err.println("Validation FAILED for user " + u)
+ }
+ Console.err.println("\n=========================\nStopping scenario\n=======================")
+ }
+ }
+
+ def runScenarios(txt:String) : Unit = runScenarios(parseScenarios(txt))
+
+ def runScenarios(ss:Scenarios) = {
+ Console.err.println("=================\nScenarios:\n %s\n=======================\n".format(ss.toJsonString))
+ ss.scenarios.foreach(runScenario(_))
+ }
+
+}
+
+object UserTest extends Loggable {
+/*
+ JSON example:
+ {
+ "scenarios":[{
+ "ignoreScenario":false,
+ "printMessages":false,
+ "printResponses":true,
+ "host":"localhost",
+ "port":8888,
+ "sendOrdered":true,
+ "sendViaRabbitMQ":false,
+ "sendDebugEnabled":false,
+ "validationEnabled":false,
+ "billingMonth":9,
+ "aquariumStartWaitMillis":2000,
+ "aquariumStopWaitMillis":2000,
+ "billResponseWaitMillis":2000,
+ "numberOfUsers":2,
+ "numberOfResponseRetries":10,
+ "minFileCredits":2000,
+ "maxFileCredits":5000,
+ "minUserCredits":10000,
+ "maxUserCredits":50000,
+ "resources":[{
+ "resType":"credits",
+ "instances":1,
+ "cronSpec":"00 00 10,12 %d ?"
+ },{
+ "resType":"disk",
+ "instances":1,
+ "cronSpec":"00 18 15,20,29,30 %d ?"
+ },{
+ "resType":"vm",
+ "instances":1,
+ "cronSpec":"00 18 14,17,19,20 %d ?"
+ }]
+ }]
+}
+ */
+ val basic = new Scenario(false,false,true,"localhost",8888,true,false,false,false,9,2000,2000,2000,
+ 1,10,2000,5000,10000,50000,List[Resource](
+ new Resource("credits",1, "00 00 10,12 %d ?".format(9)),
+ new Resource("disk",1,"00 18 15,20,29,30 %d ?".format(9)),
+ new Resource("vm",1,"00 18 14,17,19,20 %d ?".format(9))
+ ))
+
+ def main(args: Array[String]) = {
+
+ try{
+ val lines = scala.io.Source.fromFile(args.head).mkString
+ ScenarioRunner.runScenarios(lines)
+ } catch {
+ case e:Exception =>
+ e.printStackTrace()
+ ScenarioRunner.runScenarios(new Scenarios(List(basic)))
+ }
+
+
+/* val user = new User("localhost:8888",9)
+ val (minFileCredits,maxFileCredits) = (2000,5000)
+ val (minUserCredits,maxUserCredits) = (10000,10000)
+ //Cron spec minutes hours day-of-month Month Day-of-Week (we do not specify seconds)
+
+ val json =AquariumInstance.run(2000,2000) {
+ user.
+ addCredits(1,"00 00 10,12 9 ?").
+ addFiles(1,"update",2000,1000,3000,"00 18 15,20,29,30 9 ?").
+ addVMs(1,"00 18 14,17,19,20 9 ?").
+ //addVMs(5,"on","00 18 ? 9 Tue")
+ run(true,2000,minFileCredits,maxFileCredits,minUserCredits,maxUserCredits)
+ }
+ Thread.sleep(2000)
+ Console.err.println("Messages sent:")
+ for { m <- JsonLog.get}
+ Console.err.println("%s".format(m)) //"\n==============\n%s\n==============="
+ Console.err.println("\n=========================\n")
+ Console.err.println("Response:\n" + json)*/
+ }
+
+}
+
+
+/*
object BillTest extends Loggable {
type JSON = String
val aquarium = {
exec("mongo aquarium --eval db.resevents.remove();db.imevents.remove();db.policies.remove();db.userstates.remove()",
Console.err.println(_))
- new AquariumBuilder(props, ResourceLocator.DefaultPolicyModel).
+ new AquariumBuilder(props, ResourceLocator.DefaultPolicyMsg).
//update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))).
build()
val role = "default"
val eventVersion = "1.0"
val eventType = "create"
- (new StdIMEvent(id,occurredMillis,receivedMillis,userID,
- clientID,isActive,role,eventVersion,eventType,
- Map()).toJsonString,mid)
+
+ val msg = MessageFactory.newIMEventMsg(id,occurredMillis,receivedMillis,userID, clientID, isActive,role,eventVersion,eventType)
+ val json = AvroHelpers.jsonStringOfSpecificRecord(msg)
+ (json, mid)
}
private [this] def addCredits(date:DATE,uid:UID,amount:Long) : JSON = {
val userID = "user%d@grnet.gr".format(uid)
val clientID = "astakos"
val isActive = false
- val role = "default"
val eventVersion = "1.0"
- val eventType = "addcredits"
- new StdIMEvent(id,occurredMillis,receivedMillis,userID,
- clientID,isActive,role,eventVersion,eventType,
- Map("credits" -> amount.toString)).toJsonString
+ val resource = "addcredits"
+ val instanceID = "addcredits"
+
+ val msg = MessageFactory.newResourceEventMsg(id, occurredMillis, receivedMillis, userID, clientID, resource, instanceID, amount.toString, eventVersion)
+ val json = AvroHelpers.jsonStringOfSpecificRecord(msg)
+ json
}
private [this] def makePithos(date:DATE,uid:UID,path:String,
val resource ="diskspace"
val instanceID = "1"
val eventVersion = "1.0"
- val details = Map("action" -> "object %s".format(action),
- "total" -> "0.0",
- "user" -> userID,
- "path" -> path)
- new StdResourceEvent(id,occurredMillis,receivedMillis,userID,clientID,
- resource,instanceID,value,eventVersion,details).toJsonString
+ val details = MessageFactory.newDetails(
+ MessageFactory.newStringDetail("action", "object %s".format(action)),
+ MessageFactory.newStringDetail("total", "0.0"),
+ MessageFactory.newStringDetail("user", userID),
+ MessageFactory.newStringDetail("path", path)
+ )
+
+ val msg = MessageFactory.newResourceEventMsg(id, occurredMillis, receivedMillis, userID, clientID, resource, instanceID, value.toString, eventVersion, details)
+ val json = AvroHelpers.jsonStringOfSpecificRecord(msg)
+ json
}
private[this] def sendCreate(date:DATE) : UID = {
}
}
+
private[this] def testCase1() : JSON = {
/* GET BILL FROM TO*/
val billFromDate = "00/00/00/01/08/2012"
runTestCase(testCase1)
}
}
+*/