Finished scenarios and minor fixes.
authorProdromos Gerakios <pgerakios@grnet.gr>
Thu, 6 Sep 2012 14:36:12 +0000 (17:36 +0300)
committerProdromos Gerakios <pgerakios@grnet.gr>
Thu, 6 Sep 2012 14:36:12 +0000 (17:36 +0300)
src/main/scala/gr/grnet/aquarium/policy/CronSpec.scala
src/test/scala/gr/grnet/aquarium/BillTest.scala

index 2af9fd1..7c9b80c 100644 (file)
@@ -77,8 +77,10 @@ case class CronSpec(cronSpec: String) {
         val (min,max,e) = (min0.getTime,max0.getTime,d1.getTime)
         if(e < min || e>max)
           None
-        else
-          Some({assert(d1.getTime>=d.getTime);d1})
+        else {
+          assert(d1.getTime>=d.getTime)
+          Some(d1)
+        }
     }
 
   override def toString : String = cronSpec
index 8d607be..36ea7fe 100644 (file)
@@ -2,15 +2,21 @@ package gr.grnet.aquarium
 
 import com.ckkloverdos.resource.FileStreamResource
 import converter.StdConverters
+import event.model.ExternalEventModel
 import event.model.im.StdIMEvent
 import event.model.resource.StdResourceEvent
 import java.io.{InputStreamReader, BufferedReader, File}
 import com.ckkloverdos.props.Props
+import logic.accounting.dsl.Timeslot
 import store.memory.MemStoreProvider
 import java.util.concurrent.atomic.AtomicLong
 import java.text.SimpleDateFormat
 import java.net.{URLConnection, URL}
 import util.Loggable
+import java.util.{GregorianCalendar, Date,Calendar}
+import gr.grnet.aquarium.policy.CronSpec
+import scala.Tuple2
+import scala.Tuple2
 
 /*
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
@@ -51,6 +57,441 @@ import util.Loggable
 /*
 * @author Prodromos Gerakios <pgerakios@grnet.gr>
 */
+
+
+object UID {
+  private[this] val counter = new AtomicLong(0L)
+  def next() = counter.getAndIncrement
+  def random(min:Int=Int.MinValue,max:Int=Int.MaxValue) =
+      min + (scala.math.random.toInt % (max+1)) % (max+1)
+
+  def random[A](l:List[A]) : A = {
+    val sz = l.size
+    if(sz==0) throw new Exception("random")
+     l(random(0,sz-1))
+  }
+}
+
+object Process {
+  private[this] def exec(cmd : String,func : String=>Unit) : Unit = {
+    val commands = cmd.split(" ")
+    val proc = new ProcessBuilder(commands: _*).redirectErrorStream(true).start();
+    val ins = new java.io.BufferedReader(new java.io.InputStreamReader(proc.getInputStream))
+    val sb = new StringBuilder
+
+    //spin off a thread to read process output.
+    val outputReaderThread = new Thread(new Runnable(){
+      def run : Unit = {
+        var ln : String = null
+        while({ln = ins.readLine; ln != null})
+          func(ln)
+      }
+    })
+    outputReaderThread.start()
+
+    //suspense this main thread until sub process is done.
+    proc.waitFor
+
+    //wait until output is fully read/completed.
+    outputReaderThread.join()
+
+    ins.close()
+  }
+  def exec(cmd:String) : Unit = exec(cmd,Console.err.println(_))
+}
+
+object Mongo {
+  def clear = Process.exec("mongo aquarium --eval db.resevents.remove();db.imevents.remove();db.policies.remove();db.userstates.remove()")
+}
+
+object AquariumInstance {
+  val propsfile = new FileStreamResource(new File("aquarium.properties"))
+  var props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
+  val aquarium = {
+    Mongo.clear
+    new AquariumBuilder(props, ResourceLocator.DefaultPolicyModel).
+      //update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
+      update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))).
+      build()
+  }
+  def run(f : => String) : String = {
+    var _ret = ""
+    aquarium.start
+    Thread.sleep(4)
+    try{
+      _ret = f
+    } finally {
+      Console.err.println("Stopping aquarium")
+      Thread.sleep(15)
+      Console.err.println("Stopping aquarium --- DONE")
+      aquarium.stop
+    }
+    _ret
+  }
+}
+
+
+abstract class Message {
+  val dbg = true
+  val cal =   new GregorianCalendar
+  var _range : Timeslot = null
+  var _cronSpec : CronSpec = null
+  var _messagesSent = 0
+  var _done = false
+  var _map = Map[String,String]()
+
+  def updateMap(args:Tuple2[String,String]*) : Message  =
+    updateMap(args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
+
+  def updateMap(map:Map[String,String]) : Message = {
+    def mergeMap[A, B](ms: List[Map[A, B]])(f: (B, B) => B): Map[A, B] =
+      (Map[A, B]() /: (for (m <- ms; kv <- m) yield kv)) { (a, kv) =>
+        a + (if (a.contains(kv._1)) kv._1 -> f(a(kv._1), kv._2) else kv)
+    }
+    _map =  mergeMap(List(_map,map))((v1,v2) => v2)
+    (_map.get("month"),_map.get("spec")) match {
+      case (Some((month0:String)),Some(spec)) =>
+        val month : Int = month0.toInt
+        if((_cronSpec==null ||  _cronSpec.cronSpec != spec ||cal.get(Calendar.MONTH) != month -1)) {
+           val d1 = getDate(1,if(month==12) 1 else month+1,year,0,0,0)
+           val d0 = getDate(1,month,year,0,0,0)
+           _range = Timeslot(d0,d1 - 1000)
+          _cronSpec = new CronSpec(if(spec.isEmpty) "* * * * *" else spec)
+        }
+      case _ => ()
+    }
+    this
+  }
+
+  def done = _done
+  def sentMessages = _messagesSent
+
+  def nextTime : Option[Long] = {
+    _cronSpec match{
+      case null =>
+        None
+      case _ =>
+        _cronSpec.nextValidDate(_range,cal.getTime) match {
+          case Some(d) =>
+            val millis = d.getTime
+            cal.setTimeInMillis(millis)
+            Some(millis)
+          case None    =>
+            None
+        }
+    }
+  }
+
+  def year : Int = {
+    cal.setTimeInMillis(System.currentTimeMillis())
+    cal.get(Calendar.YEAR)
+  }
+
+  def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int,sec:Int) : Long = {
+    cal.set(year,month-1,day,hour,min,sec)
+    cal.getTimeInMillis
+  }
+
+  def getMillis : Long = cal.getTimeInMillis
+
+  def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int) : Long =
+    getDate(day,month,year,hour,min,0)
+
+  def setMillis(millis:Long) = {
+    cal.setTimeInMillis(millis)
+  }
+
+  def addMillis(day:Int,hour:Int) = {
+    cal.roll(Calendar.DATE,day)
+    cal.roll(Calendar.DATE,hour)
+  }
+
+  def nextID = UID.next
+
+  def makeEvent(millis:Long,map:Map[String,String]) : ExternalEventModel
+
+  def send(args:Tuple2[String,String]*) : Boolean =
+    send(args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
+
+  def send(map:Map[String,String]) : Boolean = {
+    nextTime match {
+      case Some(millis) =>
+        updateMap(map)
+        val event = makeEvent(millis,_map)
+        val (exchangeName,routingKey) = event match {
+          case rc:StdResourceEvent => rc.resource match {
+            case "vmtime" =>
+              ("cyclades","cyclades.resource.vmtime")
+            case "diskspace" =>
+              ("pithos","pithos.resource.diskspace")
+            case _ =>
+              throw new Exception("send cast failed")
+          }
+          case im:StdIMEvent =>
+            ("astakos","astakos.user")
+          case _ =>
+            throw new Exception("send cast failed")
+        }
+        val json = event.toJsonString
+        AquariumInstance.aquarium(Aquarium.EnvKeys.rabbitMQProducer).
+        sendMessage(exchangeName,routingKey,json)
+        if(dbg)Console.err.println("Sent message:\n%s\n".format(json))
+        _messagesSent += 1
+        true
+      case None =>
+        _done = true
+        false
+    }
+  }
+
+}
+
+class DiskMessage extends Message {
+  /*
+   *  map:
+   *      "action" -> "update" , "delete" , "purge"
+   *      "uid"    ->
+   *      "path"   ->
+   *      "value"  ->
+   */
+  def makeEvent(millis:Long,map:Map[String,String]) : ExternalEventModel = {
+      val action = map("action")
+      val uid    = map("uid")
+      val path   = map("path")
+      val value  = map("value").toLong
+      val id = "rc.%d.object.%s".format(nextID,action)
+      val occurredMillis = millis
+      val receivedMillis = millis
+      val userID = uid //"user%s@grnet.gr".format(uid)
+      val clientID = "pithos"
+      val resource ="diskspace"
+      val instanceID = "1"
+      val eventVersion = "1.0"
+      val details = Map("action" -> "object %s".format(action),
+                        "total"  -> "0.0",
+                        "user"   -> userID,
+                        "path"   -> path)
+      new StdResourceEvent(id,occurredMillis,receivedMillis,userID,
+                           clientID,resource,instanceID,value,
+                           eventVersion,details)
+  }
+}
+
+class VMMessage extends Message {
+  /*
+   *   map:
+   *      uid        -> unique id for user
+   *      instanceID -> "cyclades.vm.kJSOLek"
+   *      vmName     -> "My Lab VM"
+   *      status     ->  "on", "off" , "destroy"
+   */
+  var _status = "off"
+  def nextStatus = {
+    if(_status=="off") _status = "on" else _status = "off"
+    _status
+  }
+  def makeEvent(millis:Long,map:Map[String,String]) : ExternalEventModel = {
+    val uid    = map("uid")
+    val value  = nextStatus /* map("status") match {
+       case "on" => 1.0
+       case "off" => 0.0
+       case "destroy" => 2.0
+       case x => throw new Exception("VMMessage bad status: %s".format(x))
+      }*/
+    val id = "rc.%d.vmtime".format(nextID)
+    val occurredMillis = millis
+    val receivedMillis = millis
+    val userID = uid // "user%s@grnet.gr".format(uid)
+    val clientID = "cyclades"
+    val resource ="vmtime"
+    val instanceID = map("instanceID")
+    val eventVersion = "1.0"
+    val details = Map("VM Name" -> map("vmName"))
+    new StdResourceEvent(id,occurredMillis,receivedMillis,userID,clientID,
+                         resource,instanceID,value.toDouble,eventVersion,details)
+  }
+ }
+
+class CreationMessage extends Message {
+  /*
+   *  map contains:
+   *   uid -> user id
+   */
+  def makeEvent(millis:Long,map:Map[String,String]) : ExternalEventModel = {
+    val uid    = map("uid")     //
+    val id = "im.%d.create.user".format(nextID)
+    val occurredMillis = millis
+    val receivedMillis = millis
+    val userID =  uid //"user%d@grnet.gr".format(mid)
+    val clientID = "astakos"
+    val isActive = false
+    val role = "default"
+    val eventVersion = "1.0"
+    val eventType = "create"
+    new StdIMEvent(id,occurredMillis,receivedMillis,userID,
+                  clientID,isActive,role,eventVersion,eventType,
+                  Map())
+  }
+}
+
+class AddCreditsMessage extends Message {
+  /*
+   *  map contains:
+   *    amount -> "2000"
+   *    uid    -> loverdos1
+   */
+  def makeEvent(millis:Long,map:Map[String,String]) : ExternalEventModel = {
+    val uid    = map("uid")     //
+    val amount = map("amount")
+    val id = "im.%d.add.credits".format(nextID)
+    val occurredMillis = millis
+    val receivedMillis = millis
+    val userID = uid //"user%d@grnet.gr".format(uid)
+    val clientID = "astakos"
+    val isActive = false
+    val role = "default"
+    val eventVersion = "1.0"
+    val eventType = "addcredits"
+    new StdIMEvent(id,occurredMillis,receivedMillis,userID,
+                   clientID,isActive,role,eventVersion,eventType,
+                   Map("credits" -> amount.toString))
+  }
+}
+
+object Message {
+  def apply(typ:String,args:Tuple2[String,String]*) : Message =
+    apply(typ,args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
+
+  def apply(typ:String,map:Map[String,String]) : Message = {
+    val msg =  typ match {
+      case "vm" => new VMMessage
+      case "disk" =>   new DiskMessage
+      case "create" => new CreationMessage
+      case "credits" => new AddCreditsMessage
+      case _ => throw new Exception("unknown type")
+    }
+    msg.updateMap(map)
+    msg
+  }
+}
+
+
+class User(serverAndPort:String,month:Int) {
+  val uid = "user%d@grnet.gr".format(UID.next)
+  val _creationMessage  : Message = Message("create","uid"->uid,"month"->month.toString,"spec"->"")
+  var _resources : List[Message] = Nil
+
+
+
+  def add(no:Int,typ:String,args:Tuple2[String,String]*) : User =
+    add(no,typ,args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
+
+  def add(no:Int,typ:String,map:Map[String,String]) : User  =
+    add(no,typ,{_ => map})
+
+  def add(no:Int,typ:String,map:Int=>Map[String,String]) : User  = {
+    for {i <- 1 to no} {
+      val map0 : Map[String,String] = map(i) + ("uid"->uid) + ("month"->month.toString)
+      _resources = Message(typ,map0) :: _resources
+    }
+    this
+  }
+
+  def addVMs(no:Int,status:String,cronSpec:String) : User =
+    add(no,"vm",{i =>
+         Map("instanceID"->"cyclades.vm.%d".format(i),
+         "vmName"  -> "Virtual Machine #%d".format(i),
+         "status"  -> status,
+         "spec"    -> cronSpec)})
+
+  def addFiles(no:Int,action:String,value:Int,minVal:Int,maxVal:Int,spec:String) : User =
+    add(no,"disk",{i =>
+       Map("action" -> action,
+           "path"->"/Papers/file_%d.PDF".format(i),
+           "value"->UID.random(minVal,maxVal).toString,
+           "spec" -> spec
+          )
+    })
+
+  def addCredits(amount:Int,spec:String) : User = {
+    add(1,"credits","month"->month.toString,"uid"->uid,"spec"->spec,"amount"->amount.toString)
+  }
+
+  def run(minFile:Int,maxFile:Int,minAmount:Int,maxAmount:Int,maxJSONRetry:Int=10) : String =  {
+    _creationMessage.send("month"->month.toString,"uid"->uid,"spec"->"0 0 * %d ?".format(month)) // send once!
+    var iter = _resources.toList
+    var done = false
+    while(!iter.isEmpty){
+      iter = _resources.filterNot(_.done)
+      for{i<-iter}
+        i.send("value"->UID.random(minFile,maxFile).toString,
+               "amount"->UID.random(minAmount,maxAmount).toString //,
+               //"status" -> UID.random(List("off","on"))
+               )
+    }
+    getJSON(maxJSONRetry)
+  }
+
+  def getJSON(max:Int=10) : String = {
+    def get () : String = {
+      val fromMillis = _creationMessage._range.from.getTime
+      val toMillis   = _creationMessage._range.to.getTime
+      val url = " http://%s/user/%s/bill/%d/%d".format(serverAndPort,uid,fromMillis,toMillis)
+      try{
+        val in = new BufferedReader(
+          new InputStreamReader(
+            new URL(url).openConnection().
+              getInputStream()))
+        var inputLine = ""
+        var ret = ""
+        while ({inputLine = in.readLine();inputLine} != null)
+          ret += (if(ret.isEmpty) "" else "\n")+ inputLine
+        in.close()
+        ret
+      } catch {
+        case e:Exception =>
+          ""
+      }
+    }
+    var resp = ""
+    var count = 0
+    while(resp.isEmpty && count < max){
+      if(count > 0) Console.err.println("Retrying for bill request.")
+      resp = get()
+      if(resp.isEmpty) Thread.sleep(1000)
+      //sleep(1000L)
+      count += 1
+    }
+    resp
+  }
+}
+
+object UserTest extends Loggable {
+
+ val aquarium  = AquariumInstance.aquarium
+
+ def main(args: Array[String]) = {
+    val user = new User("localhost:8888",9)
+    val (minFileCredits,maxFileCredits) = (2000,5000)
+    val (minUserCredits,maxUserCredits) = (10000,10000)
+    //Cron spec  minutes hours day-of-month Month Day-of-Week (we do not specify seconds)
+
+   val json =AquariumInstance.run {
+          user.
+                    addCredits(10000,"00 00 ? 9 Sat").
+                    addFiles(1,"update",2000,1000,3000,"00 18 ? 9 Tue").
+                    //addVMs(1,"on","00 18 ? 9 Mon").
+                    //addVMs(5,"on","00 18 ? 9 Tue")
+                    run(minFileCredits,maxFileCredits,minUserCredits,maxUserCredits)
+   }
+   Thread.sleep(2000)
+   Console.err.println("Response:\n" + json)
+ }
+
+}
+
+
+
 object BillTest extends Loggable {
 
   type JSON = String
@@ -217,6 +658,7 @@ object BillTest extends Loggable {
     }
   }
 
+
   private[this] def testCase1() : JSON  = {
     /* GET BILL FROM TO*/
     val billFromDate = "00/00/00/01/08/2012"