MongoDBStore adds indices to mongo collections exactly Once (new class). Removed...
[aquarium] / src / test / scala / gr / grnet / aquarium / BillTest.scala
index 8d607be..5ef8571 100644 (file)
@@ -1,17 +1,3 @@
-package gr.grnet.aquarium
-
-import com.ckkloverdos.resource.FileStreamResource
-import converter.StdConverters
-import event.model.im.StdIMEvent
-import event.model.resource.StdResourceEvent
-import java.io.{InputStreamReader, BufferedReader, File}
-import com.ckkloverdos.props.Props
-import store.memory.MemStoreProvider
-import java.util.concurrent.atomic.AtomicLong
-import java.text.SimpleDateFormat
-import java.net.{URLConnection, URL}
-import util.Loggable
-
 /*
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
 *
@@ -47,10 +33,927 @@ import util.Loggable
 * or implied, of GRNET S.A.
 */
 
+package gr.grnet.aquarium
+
+import com.ckkloverdos.props.Props
+import converter.{JsonTextFormat, StdConverters}
+import gr.grnet.aquarium.message.avro.{AvroHelpers, MessageFactory}
+import java.io.{InputStreamReader, BufferedReader, File}
+import java.net.URL
+import java.util.concurrent.atomic.AtomicLong
+import gr.grnet.aquarium.util.{Lock, Loggable}
+import java.util.{Date, Calendar, GregorianCalendar}
+import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
+import message.avro.gen._
+import org.apache.avro.specific.SpecificRecord
+import policy.CronSpec
+import util.json.JsonSupport
+import scala.Some
+import scala.Tuple2
+import java.util.concurrent.locks.ReentrantLock
+
 
 /*
 * @author Prodromos Gerakios <pgerakios@grnet.gr>
 */
+
+
+object UID {
+
+  private[this] var privCounters = Map[String,Long]()
+  private[this] val lock = new Lock()
+
+  def next(s:String) : Long = {
+     val l = lock.withLock{
+      privCounters.get(s) match {
+        case None => 1
+        case Some(l) => l+1
+      }
+    }
+    privCounters = privCounters + ((s,l))
+    l
+  }
+
+  private[this] val counter = new AtomicLong(0L)
+  def next() = counter.getAndIncrement
+  def random(min:Int=Int.MinValue,max:Int=Int.MaxValue) =
+      min + (scala.math.random.toInt % (max+1)) % (max+1)
+
+  def random[A](l:List[A]) : A = {
+    val sz = l.size
+    if(sz==0) throw new Exception("random")
+     l(random(0,sz-1))
+  }
+}
+
+object Process {
+  private[this] def exec(cmd : String,func : String=>Unit) : Unit = {
+    val commands = cmd.split(" ")
+    val proc = new ProcessBuilder(commands: _*).redirectErrorStream(true).start();
+    val ins = new java.io.BufferedReader(new java.io.InputStreamReader(proc.getInputStream))
+    val sb = new StringBuilder
+
+    //spin off a thread to read process output.
+    val outputReaderThread = new Thread(new Runnable(){
+      def run : Unit = {
+        var ln : String = null
+        while({ln = ins.readLine; ln != null})
+          func(ln)
+      }
+    })
+    outputReaderThread.start()
+
+    //suspense this main thread until sub process is done.
+    proc.waitFor
+
+    //wait until output is fully read/completed.
+    outputReaderThread.join()
+
+    ins.close()
+  }
+  def exec(cmd:String) : Unit = exec(cmd,Console.err.println(_))
+}
+
+object Mongo {
+  def clear = Process.exec("mongo aquarium --eval db.resevents.remove();db.imevents.remove();db.policies.remove();db.userstates.remove()")
+}
+
+object AquariumInstance {
+  //val propsfile = new FileStreamResource(new File("aquarium.properties"))
+  var props: Props = ResourceLocator.AquariumProperties
+  // Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
+  val aquarium = {
+    Mongo.clear
+    new AquariumBuilder(props, ResourceLocator.DefaultPolicyMsg).
+      //update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
+      update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))).
+      build()
+  }
+
+  private[this] val count=new java.util.concurrent.atomic.AtomicLong()
+  private[this] val ready=new java.util.concurrent.atomic.AtomicBoolean(false)
+
+  def run(billWait:Int, stop:Int)(f : => Unit) = {
+    if(count.addAndGet(1) == 1){
+      Console.err.println("Starting aquarium")
+      aquarium.start
+      Thread.sleep(billWait)
+      Console.err.println("Starting aquarium  (%d seconds) --- DONE".format(billWait/1000))
+      this.synchronized{
+        ready.set(true)
+        this.synchronized(this.notifyAll)
+      }
+    }
+    try{
+      this.synchronized{
+        while(!ready.get) this.wait
+      }
+    } finally {
+        if(count.addAndGet(-1) == 0){
+       Console.err.println("Stopping aquarium")
+       aquarium.stop
+       Thread.sleep(stop)
+       Console.err.println("Stopping aquarium --- DONE")
+      }
+    }
+  }
+}
+
+object JsonLog {
+  private[this] final val lock = new Lock()
+  private[this] var _log : List[String] = Nil
+  def add(json:String) =  lock.withLock(_log = _log ::: List(json))
+  def get() : List[String] = lock.withLock(_log.toList)
+}
+
+/*object MessageQueue {
+  private[this] final val lock = new Lock()
+  private[this] var _sortedMsgs  = SortedMap[Timeslot,(String,String,String)]
+} */
+
+object MessageService {
+  private[this] val lock = new Lock
+
+  def send(event:SpecificRecord, rabbitMQEnabled : Boolean = false, debugEnabled:Boolean =false) = {
+    val json = AvroHelpers.jsonStringOfSpecificRecord(event)
+    if(rabbitMQEnabled){
+      val (exchangeName,routingKey) = event match {
+        case rc:ResourceEventMsg => rc.getResource match {
+          case "vmtime" =>
+            ("cyclades","cyclades.resource.vmtime")
+          case "diskspace" =>
+            ("pithos","pithos.resource.diskspace")
+          case "addcredits" =>
+            ("astakos","astakos.resource")
+          case x =>
+            throw new Exception("send cast failed: %s".format(x))
+        }
+        case im:IMEventMsg =>
+          ("astakos","astakos.user")
+        case _ =>
+          throw new Exception("send cast failed")
+      }
+      AquariumInstance.aquarium(Aquarium.EnvKeys.rabbitMQProducer).
+        sendMessage(exchangeName,routingKey,json)
+    } else {
+      val uid = event match {
+        case rcevent: ResourceEventMsg =>
+            AquariumInstance.aquarium.resourceEventStore.insertResourceEvent(rcevent)
+            rcevent.getUserID
+        case imevent: IMEventMsg =>
+             AquariumInstance.aquarium.imEventStore.insertIMEvent(imevent)
+             imevent.getUserID
+      }
+      val userActorRef = lock.withLock(AquariumInstance.aquarium.akkaService.getOrCreateUserActor(uid))
+      userActorRef ! event
+    }
+    val millis = event match {
+      case rc:ResourceEventMsg => rc.getOccurredMillis
+      case im:IMEventMsg => im.getOccurredMillis
+    }
+    JsonLog.add(/*new Date(millis).toString + " ---- " +*/ json)
+    if(debugEnabled)
+      Console.err.println("Sent message:\n%s - %s\n".format(new Date(millis).toString,json))
+  }
+}
+
+abstract class Message {
+  val dbg = true
+  val cal =   new GregorianCalendar
+  var _range : Timeslot = null
+  var _cronSpec : CronSpec = null
+  var _messagesSent = 0
+  //var _done = false
+  var _map = Map[String,String]()
+
+  def updateMap(args:Tuple2[String,String]*) : Message  =
+    updateMap(args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
+
+  def updateMap(map:Map[String,String]) : Message = {
+    def mergeMap[A, B](ms: List[Map[A, B]])(f: (B, B) => B): Map[A, B] =
+      (Map[A, B]() /: (for (m <- ms; kv <- m) yield kv)) { (a, kv) =>
+        a + (if (a.contains(kv._1)) kv._1 -> f(a(kv._1), kv._2) else kv)
+    }
+    _map =  mergeMap(List(_map,map))((v1,v2) => v2)
+    (_map.get("month"),_map.get("spec")) match {
+      case (Some((month0:String)),Some(spec)) =>
+        val month : Int = month0.toInt
+        if((_cronSpec==null ||  _cronSpec.cronSpec != spec ||cal.get(Calendar.MONTH) != month -1)) {
+           val d1 = getDate(1,if(month==12) 1 else month+1,year,0,0,0)
+           val d0 = getDate(1,month,year,0,0,0)
+           _range = Timeslot((d0/1000)*1000,(d1/1000)*1000 - 1000)
+           cal.setTimeInMillis(d0)
+          _cronSpec = new CronSpec(if(spec.isEmpty) "* * * * *" else spec)
+        }
+      case _ => ()
+    }
+    this
+  }
+
+  //def done = _done
+  def sentMessages = _messagesSent
+
+  def nextTime : Option[Long] = nextTime(false)
+
+  def nextTime(update:Boolean) : Option[Long] = {
+    _cronSpec match{
+      case null =>
+        None
+      case _ =>
+        _cronSpec.nextValidDate(_range,cal.getTime) match {
+          case Some(d) =>
+            val millis = d.getTime
+            if(update) cal.setTimeInMillis(millis)
+            Some(millis)
+          case None    =>
+            None
+        }
+    }
+  }
+
+  def year : Int = {
+    val tmp = getMillis
+    cal.setTimeInMillis(System.currentTimeMillis())
+    val ret = cal.get(Calendar.YEAR)
+    cal.setTimeInMillis(tmp)
+    ret
+  }
+
+  def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int,sec:Int) : Long = {
+    val tmp = getMillis
+    cal.set(year,month-1,day,hour,min,sec)
+    val ret = cal.getTimeInMillis
+    cal.setTimeInMillis(tmp)
+    ret
+  }
+
+  def getMillis : Long = cal.getTimeInMillis
+
+  def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int) : Long =
+    getDate(day,month,year,hour,min,0)
+
+  /*def setMillis(millis:Long) = {
+    cal.setTimeInMillis(millis)
+  }*/
+
+  /*def addMillis(day:Int,hour:Int) = {
+    cal.roll(Calendar.DATE,day)
+    cal.roll(Calendar.DATE,hour)
+  }*/
+
+  def nextID = UID.next(this.getClass().getName) /*this match {
+    case _:DiskMessage => UID.next("DiskMessage")
+    case _:CreationMessage => UID.next("CreationMessage")
+    case _:VMMessage => UID.next("VMMessage")
+    case _:AddCreditsMessage =>  UID.next("AddCreditsMessage")
+  }*/
+
+  def makeEvent(millis:Long,map:Map[String,String]) : SpecificRecord
+
+  def send(args:Tuple2[String,String]*) : Boolean =
+    send(args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
+
+  def send(map:Map[String,String]) : Boolean = {
+    nextTime(true) match {
+      case Some(millis) =>
+        updateMap(map)
+        val event = makeEvent(millis,_map)
+        val ren = _map.getOrElse("rabbitMQEnabled","false").toBoolean
+        val rdb = _map.getOrElse("debugEnabled","false").toBoolean
+        MessageService.send(event,ren,rdb)
+        _messagesSent += 1
+        true
+      case None =>
+        //_done = true
+        false
+    }
+  }
+
+}
+
+class DiskMessage extends Message {
+  /*
+   *  map:
+   *      "action" -> "update" , "delete" , "purge"
+   *      "uid"    ->
+   *      "path"   ->
+   *      "value"  ->
+   */
+  def makeEvent(millis:Long,map:Map[String,String]) = {
+      val action = map("action")
+      val uid    = map("uid")
+      val path   = map("path")
+      val value  = map("value")
+      val id = "rc.%d.object.%s".format(nextID,action)
+      val occurredMillis = millis
+      val receivedMillis = millis
+      val userID = uid //"user%s@grnet.gr".format(uid)
+      val clientID = "pithos"
+      val resource ="diskspace"
+      val instanceID = "1"
+      val eventVersion = "1.0"
+      val details = MessageFactory.newDetails(
+        MessageFactory.newStringDetail("action", "object %s".format(action)),
+        MessageFactory.newStringDetail("total", "0.0"),
+        MessageFactory.newStringDetail("user", userID),
+        MessageFactory.newStringDetail("path", path)
+      )
+
+      val msg = MessageFactory.newResourceEventMsg(
+        id,
+        occurredMillis, receivedMillis,
+        userID, clientID,
+        resource, instanceID,
+        value,
+        eventVersion,
+        details,
+        uid
+      )
+
+      msg
+  }
+}
+
+class VMMessage extends Message {
+  /*
+   *   map:
+   *      uid        -> unique id for user
+   *      instanceID -> "cyclades.vm.kJSOLek"
+   *      vmName     -> "My Lab VM"
+   *      status     ->  "on", "off" , "destroy"
+   */
+  var _status = "on"
+  def nextStatus = {
+    if(_status=="off") _status = "on" else _status = "off"
+    _status
+  }
+  def makeEvent(millis:Long,map:Map[String,String]) = {
+    val uid    = map("uid")
+    val value  =  /* map("status")*/nextStatus match {
+       case "on" => "1"
+       case "off" => "0"
+       case "destroy" => "2"
+       case x => throw new Exception("VMMessage bad status: %s".format(x))
+      }
+    val id = "rc.%d.vmtime".format(nextID)
+    val occurredMillis = millis
+    val receivedMillis = millis
+    val userID = uid // "user%s@grnet.gr".format(uid)
+    val clientID = "cyclades"
+    val resource ="vmtime"
+    val instanceID = map("instanceID")
+    val eventVersion = "1.0"
+    val details = MessageFactory.newDetails(
+      MessageFactory.newStringDetail("VM Name", map("vmName"))
+    )
+
+    val msg = MessageFactory.newResourceEventMsg(
+      id,
+      occurredMillis, receivedMillis,
+      userID, clientID,
+      resource, instanceID,
+      value,
+      eventVersion,
+      details,
+      uid
+    )
+
+    msg
+  }
+ }
+
+class CreationMessage extends Message {
+  /*
+   *  map contains:
+   *   uid -> user id
+   */
+  def makeEvent(millis:Long,map:Map[String,String]) = {
+    val uid    = map("uid")     //
+    val id = "im.%d.create.user".format(nextID)
+    val occurredMillis = millis
+    val receivedMillis = millis
+    val userID =  uid //"user%d@grnet.gr".format(mid)
+    val clientID = "astakos"
+    val isActive = false
+    val role = "default"
+    val eventVersion = "1.0"
+    val eventType = "create"
+
+    val msg = MessageFactory.newIMEventMsg(
+      id,
+      occurredMillis, receivedMillis,
+      userID, clientID,
+      isActive,
+      role,
+      eventVersion, eventType,
+      MessageFactory.newDetails(),
+      uid
+    )
+    msg
+  }
+}
+
+class AddCreditsMessage extends Message {
+  /*
+   *  map contains:
+   *    amount -> "2000"
+   *    uid    -> loverdos1
+   */
+  def makeEvent(millis:Long,map:Map[String,String]) = {
+    val uid    = map("uid")     //
+    val amount = map("amount")
+    val id = "im.%d.add.credits".format(nextID)
+    val occurredMillis = millis
+    val receivedMillis = millis
+    val userID = uid //"user%d@grnet.gr".format(uid)
+    val clientID = "astakos"
+    val isActive = false
+    val role = "default"
+    val eventVersion = "1.0"
+    val eventType = "addcredits"
+    val msg = MessageFactory.newResourceEventMsg(
+      id,
+      occurredMillis, receivedMillis,
+      userID, clientID,
+      "addcredits", "addcredits",
+      amount,
+      eventVersion,
+      MessageFactory.newDetails(),
+      uid
+    )
+    msg
+  }
+}
+
+object Message {
+  def apply(typ:String,args:Tuple2[String,String]*) : Message =
+    apply(typ,args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
+
+  val msgMap = Map[String,()=>Message](
+    "vm"      -> (() => new VMMessage),
+    "disk"    -> (() => new DiskMessage),
+    "create"  -> (() => new CreationMessage),
+    "credits" -> (() => new AddCreditsMessage)
+  )
+
+  def apply(typ:String,map:Map[String,String]) : Message = {
+    val msg = msgMap.getOrElse(typ,throw new Exception("Invalid type : "+typ))()
+    msg.updateMap(map)
+    msg
+  }
+}
+
+
+class User(serverAndPort:String,month:Int) {
+  val uid = "user%d@grnet.gr".format(UID.next)
+  val _creationMessage  : Message = Message("create","uid"->uid,"month"->month.toString,"spec"->"")
+  var _resources : List[Message] = Nil
+  var _billEntryMsg :Option[BillEntryMsg] = None
+  var _resMsgs = 0
+  var _vmMsgs = 0
+  var _addMsgs = 0
+  var _messagesSent : List[Message] = Nil
+
+  override def toString() = uid
+
+  def scalaList[A](s:java.util.List[A]) : List[A] = {
+    import scala.collection.JavaConverters.asScalaBufferConverter
+    s.asScala.toList
+  }
+
+  def sumOf[A,D](l: java.util.List[A],start:D)(f:A=>D)(add:(D,D)=>D) : D =
+    scalaList(l).map(f).foldLeft(start) {case (sum,v) =>  add(sum,v) }
+
+
+  def checkSum[A,D](s:D,l: java.util.List[A],start:D)(f:A=>D)(add:(D,D)=>D) =
+    check(s == sumOf(l,start)(f)(add))
+
+  def check(b: => Boolean) = {
+    if(!b)
+      throw new Exception("Invalid property")
+  }
+
+
+  type S[A] = {def getTotalCredits : String
+               def getTotalElapsedTime:String
+               def getTotalUnits:String
+               def getDetails:java.util.List[A]}
+  type V = (Double,Long,Double)
+
+  def valuesOf[A,T<:S[A]](t:T) : (V,java.util.List[A]) =
+    ((t.getTotalCredits.toDouble,
+      t.getTotalElapsedTime.toLong,
+      t.getTotalUnits.toDouble),
+      t.getDetails)
+
+  def checkS[A,T<:S[A]](s:T)(f:A=>V)(add:(V,V)=>V) = {
+    val zero = (0D,0L,0D)
+    val (v0,v1) = valuesOf[A,T](s)
+    checkSum(v0,v1,zero)(f)(add)
+  }
+
+  def add(a:V,b:V) : V= {
+    val (a1,a2,a3) = a
+    val (b1,b2,b3) = b
+    //if(pos && b1 < 0.0D) a  else
+      (a1+b1,a2+b2,a3+b3)
+  }
+
+  val zero  = (0.0D,0L,0.0D)
+
+  def filterMessagesSent(serviceName:String) : List[Message] =
+    _messagesSent.filter { (_,serviceName) match  {
+       case (_:DiskMessage,"diskspace") => true
+       case (_:VMMessage,"vmtime") => true
+       case (_:AddCreditsMessage,"addcredits") => true
+       case _ => false
+    }}
+
+  def checkMessages(serviceName:String,c:List[ChargeEntryMsg],m:List[Message]) = {
+     if(m.length == 0) check(c.length == 0)
+     else check(c.length == (serviceName match  {
+       case "diskspace" =>  m.length - 1
+       case "vmtime" => m.length - 1
+       case "addcredits" => m.length
+     }))
+  }
+
+  def validateChargeEntry(c:ChargeEntryMsg) : V = {
+    (c.getTotalCredits.toDouble,c.getTotalElapsedTime.toLong,c.getTotalUnits.toDouble)
+  }
+
+  def validateEventEntry(serviceName:String,e:EventEntryMsg) : V = {
+    val v1 = scalaList(e.getDetails)
+    val v2 = filterMessagesSent(serviceName)
+    checkMessages(serviceName,v1,v2)
+    //val v3  = (e.getTotalCredits.toDouble,e.getTotalElapsedTime.toLong,e.getTotalUnits.toDouble)
+    val v4 = sumOf(e.getDetails,zero)(validateChargeEntry)(add)
+    v4
+  }
+
+  def validateResourceEntry(serviceName:String,r:ResourceEntryMsg) : V = {
+    val v0  = (r.getTotalCredits.toDouble,r.getTotalElapsedTime.toLong,r.getTotalUnits.toDouble)
+    val v1 = sumOf(r.getDetails,zero)(validateEventEntry(serviceName,_))(add)
+    check(v0 == v1)
+    v0
+  }
+
+  def validateServiceEntry(s:ServiceEntryMsg) : Double = {
+    val v0  = (s.getTotalCredits.toDouble,s.getTotalElapsedTime.toLong,s.getTotalUnits.toDouble)
+    val v1 = sumOf(s.getDetails,zero)(validateResourceEntry(s.getServiceName,_))(add)
+    check(v0 == v1)
+    v0._1
+  }
+
+  def validateBillEntry(b:BillEntryMsg) : Boolean = {
+    try{
+      check(b.getStatus == "ok")
+      check(uid == b.getUserID)
+      check(_creationMessage._range.from.getTime == b.getStartTime().toLong &&
+            _creationMessage._range.to.getTime == b.getEndTime().toLong)
+      check(b.getDeductedCredits.toDouble ==
+            sumOf(b.getDetails,0.0D)(validateServiceEntry)(_ + _))
+      true
+    } catch {
+      case e:Exception =>
+        e.printStackTrace
+        false
+    }
+  }
+
+  def validateResults() : Boolean = {
+    _billEntryMsg match {
+      case None => false
+      case Some(b) => validateBillEntry(b)
+    }
+    //throw new Exception("Not implemented !!!!")
+  }
+
+  def printMessages() = {
+    Console.err.println("Messages sent:")
+    for { m <- JsonLog.get}
+      Console.err.println("%s".format(m)) //"\n==============\n%s\n==============="
+    Console.err.println("\n=========================\n")
+  }
+  def printResponse() = {
+    Console.err.println("Response:\n" + (_billEntryMsg match {
+      case None => "NONE!!!!"
+      case Some(r) => AvroHelpers.jsonStringOfSpecificRecord(r)
+    }))
+  }
+
+  def add(no:Int,typ:String,args:Tuple2[String,String]*) : User =
+    add(no,typ,args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
+
+  def add(no:Int,typ:String,map:Map[String,String]) : User  =
+    add(no,typ,{_ => map})
+
+  def add(no:Int,typ:String,map:Int=>Map[String,String]) : User  = {
+    for {i <- 1 to no} {
+      val map0 : Map[String,String] = map(i) + ("uid"->uid) + ("month"->month.toString)
+      _resources = Message(typ,map0) :: _resources
+    }
+    this
+  }
+
+  def addVMs(no:Int,cronSpec:String) : User =
+    add(no,"vm",{i =>
+         Map("instanceID"->"cyclades.vm.%d".format(i),
+         "vmName"  -> "Virtual Machine #%d".format(i),
+         "status"  -> "on", // initially "on" msg
+         "spec"    -> cronSpec.format(month))})
+
+  def addFiles(no:Int,action:String/*,value:Int,minVal:Int,maxVal:Int*/,spec:String) : User =
+    add(no,"disk",{i =>
+       //Console.err.println("Adding file : " + "/Papers/file_%d.PDF".format(i))
+       Map("action" -> action,
+           "path"->"/Papers/file_%d.PDF".format(i),
+           //"value"->UID.random(minVal,maxVal).toString,
+           "spec" -> spec.format(month)
+          )
+    })
+
+  def addCredits(no:Int,spec:String) : User = {
+    add(no,"credits",/*"month"->month.toString,"uid"->uid,*/"spec"->spec.format(month)
+       /*,"amount"->amount.toString*/)
+  }
+
+  def run(ordered:Boolean,wait:Int,minFile:Int,maxFile:Int,minAmount:Int,maxAmount:Int,maxJSONRetry :Int,
+          sendViaRabbitMQ:Boolean, sendDebugEnabled : Boolean)  =  {
+    _messagesSent = Nil
+    _creationMessage.send("month"->month.toString,"uid"->uid,"spec"->"0 0 * %d ?".format(month)) // send once!
+    //Thread.sleep(2000)
+    var iter = _resources.toList
+    while(!iter.isEmpty)
+      iter = (if(!ordered) iter
+       else iter.sortWith{(m1,m2) => (m1.nextTime,m2.nextTime) match {
+        case (Some(l1),Some(l2)) => l1 <= l2
+        case (None,None) => true
+        case (None,Some(l)) => true
+        case (Some(l),None) => false
+      }}).filter({m =>
+        _messagesSent = _messagesSent ::: List(m)
+        val b = m.send("value"->UID.random(minFile,maxFile).toString,
+                       "amount"->UID.random(minAmount,maxAmount).toString,
+                       "rabbitMQEnabled" -> sendViaRabbitMQ.toString,
+                       "debugEnabled" -> sendDebugEnabled.toString
+                       //"status" -> UID.random(List("off","on"))
+                      )
+        if(b) m match {
+          case _:DiskMessage => _resMsgs += 1
+          case _:VMMessage => _vmMsgs += 1
+          case _:AddCreditsMessage => _addMsgs +=1
+        }
+        b
+      })
+    Thread.sleep(wait)
+    _billEntryMsg = getBillResponse(maxJSONRetry)
+  }
+
+  private[this] def getBillResponse(max:Int) : Option[BillEntryMsg] = {
+    def get () : String = {
+      val fromMillis = _creationMessage._range.from.getTime
+      val toMillis   = _creationMessage._range.to.getTime
+      val url = " http://%s/user/%s/bill/%d/%d".format(serverAndPort,uid,fromMillis,toMillis)
+      try{
+        val in = new BufferedReader(
+          new InputStreamReader(
+            new URL(url).openConnection().
+              getInputStream()))
+        var inputLine = ""
+        var ret = ""
+        while ({inputLine = in.readLine();inputLine} != null)
+          ret += (if(ret.isEmpty) "" else "\n")+ inputLine
+        in.close()
+        ret
+      } catch {
+        case e:Exception =>
+          ""
+      }
+    }
+    var resp = ""
+    var count = 0
+    var ret : Option[BillEntryMsg] = None
+    while(resp.isEmpty && count < max){
+      if(count > 0) Console.err.println("Retrying for bill request.")
+      resp = get()
+      if(resp.isEmpty) Thread.sleep(1000)
+      else {
+        try{
+          var b = AvroHelpers.specificRecordOfJsonString(resp, new BillEntryMsg)
+          ret = Some(b)
+          if(b.getStatus().equals("processing")){
+            Thread.sleep(1000)
+            resp = ""
+          }
+        }  catch {
+          case e:Exception =>
+              e.printStackTrace
+              resp = ""
+        }
+      }
+      //sleep(1000L)
+      count += 1
+    }
+    ret
+  }
+}
+
+case class Resource(
+   val resType  : String, // Message.msgMap.keys
+   val instances: Long,
+   val cronSpec : String
+ )
+extends JsonSupport {}
+
+case class Scenario(
+  val ignoreScenario : Boolean,
+  val printMessages : Boolean,
+  val printResponses: Boolean,
+  val host : String,
+  val port : Long,
+  val sendOrdered : Boolean,
+  val sendViaRabbitMQ : Boolean,
+  val sendDebugEnabled : Boolean,
+  val validationEnabled : Boolean,
+  val billingMonth: Long,
+  val aquariumStartWaitMillis : Long,
+  val aquariumStopWaitMillis : Long,
+  val billResponseWaitMillis : Long,
+  val numberOfUsers  : Long,
+  val numberOfResponseRetries : Long,
+  val minFileCredits : Long,
+  val maxFileCredits : Long,
+  val minUserCredits : Long,
+  val maxUserCredits : Long,
+  val resources : List[Resource]
+)
+extends JsonSupport {}
+
+case class Scenarios(
+   val scenarios : List[Scenario] )
+extends JsonSupport {}
+
+object ScenarioRunner {
+  val aquarium  = AquariumInstance.aquarium
+
+  def parseScenario(txt:String) : Scenario =
+    StdConverters.AllConverters.convertEx[Scenario](JsonTextFormat(txt))
+
+  def parseScenarios(txt:String) : Scenarios =
+    StdConverters.AllConverters.convertEx[Scenarios](JsonTextFormat(txt))
+
+  def runScenario(txt:String) : Unit = runScenario(parseScenario(txt))
+
+  private[this] def runUser(s:Scenario) : User = {
+    val user = new User("%s:%d".format(s.host,s.port),s.billingMonth.toInt)
+    val (minFileCredits,maxFileCredits) = (s.minFileCredits,s.maxFileCredits)
+    val (minUserCredits,maxUserCredits) = (s.maxUserCredits,s.maxUserCredits)
+    //Cron spec  minutes hours day-of-month Month Day-of-Week (we do not specify seconds)
+    //AquariumInstance.run(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt) {
+      for{ r <- s.resources}  // create messages
+        r.resType match {
+          case "vm" =>
+            user.addVMs(r.instances.toInt,r.cronSpec)
+          case "disk" =>
+            user.addFiles(r.instances.toInt,"update",r.cronSpec)
+          case "credits" =>
+            user.addCredits(r.instances.toInt,r.cronSpec)
+        }
+      // run scenario
+      user.run(s.sendOrdered,s.billResponseWaitMillis.toInt,s.minFileCredits.toInt,
+               s.maxFileCredits.toInt,s.minUserCredits.toInt,s.maxUserCredits.toInt,
+               s.numberOfResponseRetries.toInt,s.sendViaRabbitMQ,s.sendDebugEnabled)
+    //}
+    user
+  }
+
+  private[this] def runAquarium[A](billWait:Long,stop:Long,default:A)(forkJoinCode: => A) : A = {
+     Console.err.println("Starting aquarium")
+     AquariumInstance.aquarium.start
+     Thread.sleep(billWait)
+     Console.err.println("Starting aquarium  (%d seconds) --- DONE".format(billWait/1000))
+     try{
+       forkJoinCode
+     } finally {
+       Console.err.println("Stopping aquarium")
+       AquariumInstance.aquarium.stop
+       Thread.sleep(stop)
+       Console.err.println("Stopping aquarium --- DONE")
+       default
+     }
+  }
+
+  def runScenario(s:Scenario): Unit = {
+    if(s.ignoreScenario == false) {
+      Console.err.println("=================\nRunning scenario:\n %s\n=======================\n".format(s.toJsonString))
+      runAquarium(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt,List[User]()){
+        val tasks = for { u <- 1 to s.numberOfUsers.toInt}
+                    yield scala.actors.Futures.future(runUser(s))
+        tasks.map(_()).toList
+      }.foreach{ u =>
+        if(s.printMessages) u.printMessages()
+        if(s.printResponses) u.printResponse()
+        if(s.validationEnabled && u.validateResults() == false)
+          Console.err.println("Validation FAILED for user " + u)
+      }
+      Console.err.println("\n=========================\nStopping scenario\n=======================")
+    }
+  }
+
+  def runScenarios(txt:String) : Unit = runScenarios(parseScenarios(txt))
+
+  def runScenarios(ss:Scenarios) = {
+    Console.err.println("=================\nScenarios:\n %s\n=======================\n".format(ss.toJsonString))
+    ss.scenarios.foreach(runScenario(_))
+  }
+
+}
+
+object UserTest extends Loggable {
+/*
+    JSON example:
+  {
+  "scenarios":[{
+    "ignoreScenario":false,
+    "printMessages":false,
+    "printResponses":true,
+    "host":"localhost",
+    "port":8888,
+    "sendOrdered":true,
+    "sendViaRabbitMQ":false,
+    "sendDebugEnabled":false,
+    "validationEnabled":false,
+    "billingMonth":9,
+    "aquariumStartWaitMillis":2000,
+    "aquariumStopWaitMillis":2000,
+    "billResponseWaitMillis":2000,
+    "numberOfUsers":2,
+    "numberOfResponseRetries":10,
+    "minFileCredits":2000,
+    "maxFileCredits":5000,
+    "minUserCredits":10000,
+    "maxUserCredits":50000,
+    "resources":[{
+      "resType":"credits",
+      "instances":1,
+      "cronSpec":"00 00 10,12 %d ?"
+    },{
+      "resType":"disk",
+      "instances":1,
+      "cronSpec":"00 18 15,20,29,30 %d ?"
+    },{
+      "resType":"vm",
+      "instances":1,
+      "cronSpec":"00 18 14,17,19,20 %d ?"
+    }]
+  }]
+}
+ */
+ val basic = new Scenario(false,false,true,"localhost",8888,true,false,false,false,9,2000,2000,2000,
+                          1,10,2000,5000,10000,50000,List[Resource](
+                          new Resource("credits",1, "00 00 10,12 %d ?".format(9)),
+                          new Resource("disk",1,"00 18 15,20,29,30 %d ?".format(9)),
+                          new Resource("vm",1,"00 18 14,17,19,20 %d ?".format(9))
+                        ))
+
+ def main(args: Array[String]) = {
+
+   try{
+     val lines = scala.io.Source.fromFile(args.head).mkString
+     ScenarioRunner.runScenarios(lines)
+   } catch {
+     case e:Exception =>
+       e.printStackTrace()
+       ScenarioRunner.runScenarios(new Scenarios(List(basic)))
+   }
+
+
+/*    val user = new User("localhost:8888",9)
+    val (minFileCredits,maxFileCredits) = (2000,5000)
+    val (minUserCredits,maxUserCredits) = (10000,10000)
+    //Cron spec  minutes hours day-of-month Month Day-of-Week (we do not specify seconds)
+
+   val json =AquariumInstance.run(2000,2000) {
+          user.
+                  addCredits(1,"00 00 10,12 9 ?").
+                  addFiles(1,"update",2000,1000,3000,"00 18 15,20,29,30 9 ?").
+                  addVMs(1,"00 18 14,17,19,20 9 ?").
+                  //addVMs(5,"on","00 18 ? 9 Tue")
+                 run(true,2000,minFileCredits,maxFileCredits,minUserCredits,maxUserCredits)
+   }
+   Thread.sleep(2000)
+   Console.err.println("Messages sent:")
+   for { m <- JsonLog.get}
+     Console.err.println("%s".format(m)) //"\n==============\n%s\n==============="
+   Console.err.println("\n=========================\n")
+   Console.err.println("Response:\n" + json)*/
+ }
+
+}
+
+
+/*
 object BillTest extends Loggable {
 
   type JSON = String
@@ -73,7 +976,7 @@ object BillTest extends Loggable {
   val aquarium = {
       exec("mongo aquarium --eval db.resevents.remove();db.imevents.remove();db.policies.remove();db.userstates.remove()",
            Console.err.println(_))
-      new AquariumBuilder(props, ResourceLocator.DefaultPolicyModel).
+      new AquariumBuilder(props, ResourceLocator.DefaultPolicyMsg).
       //update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
       update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))).
       build()
@@ -118,9 +1021,10 @@ object BillTest extends Loggable {
     val role = "default"
     val eventVersion = "1.0"
     val eventType = "create"
-    (new StdIMEvent(id,occurredMillis,receivedMillis,userID,
-                   clientID,isActive,role,eventVersion,eventType,
-                   Map()).toJsonString,mid)
+
+    val msg = MessageFactory.newIMEventMsg(id,occurredMillis,receivedMillis,userID, clientID, isActive,role,eventVersion,eventType)
+    val json = AvroHelpers.jsonStringOfSpecificRecord(msg)
+    (json, mid)
   }
 
   private [this] def addCredits(date:DATE,uid:UID,amount:Long) : JSON = {
@@ -131,12 +1035,13 @@ object BillTest extends Loggable {
     val userID = "user%d@grnet.gr".format(uid)
     val clientID = "astakos"
     val isActive = false
-    val role = "default"
     val eventVersion = "1.0"
-    val eventType = "addcredits"
-    new StdIMEvent(id,occurredMillis,receivedMillis,userID,
-                   clientID,isActive,role,eventVersion,eventType,
-                   Map("credits" -> amount.toString)).toJsonString
+    val resource = "addcredits"
+    val instanceID = "addcredits"
+
+    val msg = MessageFactory.newResourceEventMsg(id, occurredMillis, receivedMillis, userID, clientID, resource, instanceID, amount.toString, eventVersion)
+    val json = AvroHelpers.jsonStringOfSpecificRecord(msg)
+    json
   }
 
   private [this] def makePithos(date:DATE,uid:UID,path:String,
@@ -150,12 +1055,16 @@ object BillTest extends Loggable {
     val resource ="diskspace"
     val instanceID = "1"
     val eventVersion = "1.0"
-    val details = Map("action" -> "object %s".format(action),
-                      "total"  -> "0.0",
-                      "user"   -> userID,
-                      "path"   -> path)
-    new StdResourceEvent(id,occurredMillis,receivedMillis,userID,clientID,
-                         resource,instanceID,value,eventVersion,details).toJsonString
+    val details = MessageFactory.newDetails(
+      MessageFactory.newStringDetail("action", "object %s".format(action)),
+      MessageFactory.newStringDetail("total", "0.0"),
+      MessageFactory.newStringDetail("user", userID),
+      MessageFactory.newStringDetail("path", path)
+    )
+
+    val msg = MessageFactory.newResourceEventMsg(id, occurredMillis, receivedMillis, userID, clientID, resource, instanceID, value.toString, eventVersion, details)
+    val json = AvroHelpers.jsonStringOfSpecificRecord(msg)
+    json
   }
 
   private[this] def sendCreate(date:DATE) : UID = {
@@ -217,6 +1126,7 @@ object BillTest extends Loggable {
     }
   }
 
+
   private[this] def testCase1() : JSON  = {
     /* GET BILL FROM TO*/
     val billFromDate = "00/00/00/01/08/2012"
@@ -291,3 +1201,4 @@ object BillTest extends Loggable {
     runTestCase(testCase1)
   }
 }
+*/