Fix a type error in test
[aquarium] / src / test / scala / gr / grnet / aquarium / BillTest.scala
1 /*
2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
6 * conditions are met:
7 *
8 *   1. Redistributions of source code must retain the above
9 *      copyright notice, this list of conditions and the following
10 *      disclaimer.
11 *
12 *   2. Redistributions in binary form must reproduce the above
13 *      copyright notice, this list of conditions and the following
14 *      disclaimer in the documentation and/or other materials
15 *      provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 *
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
34 */
35
36 package gr.grnet.aquarium
37
38 import com.ckkloverdos.props.Props
39 import converter.{JsonTextFormat, StdConverters}
40 import gr.grnet.aquarium.message.avro.{AvroHelpers, MessageFactory}
41 import java.io.{InputStreamReader, BufferedReader, File}
42 import java.net.URL
43 import java.util.concurrent.atomic.AtomicLong
44 import gr.grnet.aquarium.util.{Lock, Loggable}
45 import java.util.{Date, Calendar, GregorianCalendar}
46 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
47 import gr.grnet.aquarium.policy.CronSpec
48 import gr.grnet.aquarium.message.avro.gen.{BillEntryMsg, IMEventMsg, ResourceEventMsg}
49 import org.apache.avro.specific.SpecificRecord
50 import util.json.JsonSupport
51
52
53 /*
54 * @author Prodromos Gerakios <pgerakios@grnet.gr>
55 */
56
57
58 object UID {
59   private[this] val counter = new AtomicLong(0L)
60   def next() = counter.getAndIncrement
61   def random(min:Int=Int.MinValue,max:Int=Int.MaxValue) =
62       min + (scala.math.random.toInt % (max+1)) % (max+1)
63
64   def random[A](l:List[A]) : A = {
65     val sz = l.size
66     if(sz==0) throw new Exception("random")
67      l(random(0,sz-1))
68   }
69 }
70
71 object Process {
72   private[this] def exec(cmd : String,func : String=>Unit) : Unit = {
73     val commands = cmd.split(" ")
74     val proc = new ProcessBuilder(commands: _*).redirectErrorStream(true).start();
75     val ins = new java.io.BufferedReader(new java.io.InputStreamReader(proc.getInputStream))
76     val sb = new StringBuilder
77
78     //spin off a thread to read process output.
79     val outputReaderThread = new Thread(new Runnable(){
80       def run : Unit = {
81         var ln : String = null
82         while({ln = ins.readLine; ln != null})
83           func(ln)
84       }
85     })
86     outputReaderThread.start()
87
88     //suspense this main thread until sub process is done.
89     proc.waitFor
90
91     //wait until output is fully read/completed.
92     outputReaderThread.join()
93
94     ins.close()
95   }
96   def exec(cmd:String) : Unit = exec(cmd,Console.err.println(_))
97 }
98
99 object Mongo {
100   def clear = Process.exec("mongo aquarium --eval db.resevents.remove();db.imevents.remove();db.policies.remove();db.userstates.remove()")
101 }
102
103 object AquariumInstance {
104   //val propsfile = new FileStreamResource(new File("aquarium.properties"))
105   var props: Props = ResourceLocator.AquariumProperties
106   // Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
107   val aquarium = {
108     Mongo.clear
109     new AquariumBuilder(props, ResourceLocator.DefaultPolicyMsg).
110       //update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
111       update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))).
112       build()
113   }
114
115   private[this] val count=new java.util.concurrent.atomic.AtomicLong()
116   def run(billWait:Int, stop:Int)(f : => Unit) = {
117     if(count.addAndGet(1) == 1)
118       aquarium.start
119     Thread.sleep(billWait)
120     try{
121       f
122     } finally {
123       Console.err.println("Stopping aquarium")
124       if(count.addAndGet(-1) == 0)
125         aquarium.stop
126       Thread.sleep(stop)
127       Console.err.println("Stopping aquarium --- DONE")
128     }
129   }
130 }
131
132 object JsonLog {
133   private[this] final val lock = new Lock()
134   private[this] var _log : List[String] = Nil
135   def add(json:String) =  lock.withLock(_log = _log ::: List(json))
136   def get() : List[String] = lock.withLock(_log.toList)
137 }
138
139 /*object MessageQueue {
140   private[this] final val lock = new Lock()
141   private[this] var _sortedMsgs  = SortedMap[Timeslot,(String,String,String)]
142 } */
143
144 object MessageService {
145
146   def send(event:SpecificRecord, rabbitMQEnabled : Boolean = false, debugEnabled:Boolean =false) = {
147     val json = AvroHelpers.jsonStringOfSpecificRecord(event)
148     if(rabbitMQEnabled){
149       val (exchangeName,routingKey) = event match {
150         case rc:ResourceEventMsg => rc.getResource match {
151           case "vmtime" =>
152             ("cyclades","cyclades.resource.vmtime")
153           case "diskspace" =>
154             ("pithos","pithos.resource.diskspace")
155           case "addcredits" =>
156             ("astakos","astakos.resource")
157           case x =>
158             throw new Exception("send cast failed: %s".format(x))
159         }
160         case im:IMEventMsg =>
161           ("astakos","astakos.user")
162         case _ =>
163           throw new Exception("send cast failed")
164       }
165       AquariumInstance.aquarium(Aquarium.EnvKeys.rabbitMQProducer).
166         sendMessage(exchangeName,routingKey,json)
167     } else {
168       val uid = event match {
169         case rcevent: ResourceEventMsg =>
170             AquariumInstance.aquarium.resourceEventStore.insertResourceEvent(rcevent)
171             rcevent.getUserID
172         case imevent: IMEventMsg =>
173              AquariumInstance.aquarium.imEventStore.insertIMEvent(imevent)
174              imevent.getUserID
175       }
176       val userActorRef = AquariumInstance.aquarium.akkaService.getOrCreateUserActor(uid)
177       userActorRef ! event
178     }
179     val millis = event match {
180       case rc:ResourceEventMsg => rc.getOccurredMillis
181       case im:IMEventMsg => im.getOccurredMillis
182     }
183     JsonLog.add(/*new Date(millis).toString + " ---- " +*/ json)
184     if(debugEnabled)
185       Console.err.println("Sent message:\n%s - %s\n".format(new Date(millis).toString,json))
186   }
187 }
188
189 abstract class Message {
190   val dbg = true
191   val cal =   new GregorianCalendar
192   var _range : Timeslot = null
193   var _cronSpec : CronSpec = null
194   var _messagesSent = 0
195   //var _done = false
196   var _map = Map[String,String]()
197
198   def updateMap(args:Tuple2[String,String]*) : Message  =
199     updateMap(args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
200
201   def updateMap(map:Map[String,String]) : Message = {
202     def mergeMap[A, B](ms: List[Map[A, B]])(f: (B, B) => B): Map[A, B] =
203       (Map[A, B]() /: (for (m <- ms; kv <- m) yield kv)) { (a, kv) =>
204         a + (if (a.contains(kv._1)) kv._1 -> f(a(kv._1), kv._2) else kv)
205     }
206     _map =  mergeMap(List(_map,map))((v1,v2) => v2)
207     (_map.get("month"),_map.get("spec")) match {
208       case (Some((month0:String)),Some(spec)) =>
209         val month : Int = month0.toInt
210         if((_cronSpec==null ||  _cronSpec.cronSpec != spec ||cal.get(Calendar.MONTH) != month -1)) {
211            val d1 = getDate(1,if(month==12) 1 else month+1,year,0,0,0)
212            val d0 = getDate(1,month,year,0,0,0)
213            _range = Timeslot(d0,d1 - 1000)
214           _cronSpec = new CronSpec(if(spec.isEmpty) "* * * * *" else spec)
215         }
216       case _ => ()
217     }
218     this
219   }
220
221   //def done = _done
222   def sentMessages = _messagesSent
223
224   def nextTime : Option[Long] = nextTime(false)
225
226   def nextTime(update:Boolean) : Option[Long] = {
227     _cronSpec match{
228       case null =>
229         None
230       case _ =>
231         _cronSpec.nextValidDate(_range,cal.getTime) match {
232           case Some(d) =>
233             val millis = d.getTime
234             if(update) cal.setTimeInMillis(millis)
235             Some(millis)
236           case None    =>
237             None
238         }
239     }
240   }
241
242   def year : Int = {
243     cal.setTimeInMillis(System.currentTimeMillis())
244     cal.get(Calendar.YEAR)
245   }
246
247   def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int,sec:Int) : Long = {
248     cal.set(year,month-1,day,hour,min,sec)
249     cal.getTimeInMillis
250   }
251
252   def getMillis : Long = cal.getTimeInMillis
253
254   def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int) : Long =
255     getDate(day,month,year,hour,min,0)
256
257   def setMillis(millis:Long) = {
258     cal.setTimeInMillis(millis)
259   }
260
261   def addMillis(day:Int,hour:Int) = {
262     cal.roll(Calendar.DATE,day)
263     cal.roll(Calendar.DATE,hour)
264   }
265
266   def nextID = UID.next
267
268   def makeEvent(millis:Long,map:Map[String,String]) : SpecificRecord
269
270   def send(args:Tuple2[String,String]*) : Boolean =
271     send(args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
272
273   def send(map:Map[String,String]) : Boolean = {
274     nextTime(true) match {
275       case Some(millis) =>
276         updateMap(map)
277         val event = makeEvent(millis,_map)
278         val ren = _map.getOrElse("rabbitMQEnabled","false").toBoolean
279         val rdb = _map.getOrElse("debugEnabled","false").toBoolean
280         MessageService.send(event,ren,rdb)
281         _messagesSent += 1
282         true
283       case None =>
284         //_done = true
285         false
286     }
287   }
288
289 }
290
291 class DiskMessage extends Message {
292   /*
293    *  map:
294    *      "action" -> "update" , "delete" , "purge"
295    *      "uid"    ->
296    *      "path"   ->
297    *      "value"  ->
298    */
299   def makeEvent(millis:Long,map:Map[String,String]) = {
300       val action = map("action")
301       val uid    = map("uid")
302       val path   = map("path")
303       val value  = map("value")
304       val id = "rc.%d.object.%s".format(nextID,action)
305       val occurredMillis = millis
306       val receivedMillis = millis
307       val userID = uid //"user%s@grnet.gr".format(uid)
308       val clientID = "pithos"
309       val resource ="diskspace"
310       val instanceID = "1"
311       val eventVersion = "1.0"
312       val details = MessageFactory.newDetails(
313         MessageFactory.newStringDetail("action", "object %s".format(action)),
314         MessageFactory.newStringDetail("total", "0.0"),
315         MessageFactory.newStringDetail("user", userID),
316         MessageFactory.newStringDetail("path", path)
317       )
318
319       val msg = MessageFactory.newResourceEventMsg(
320         id,
321         occurredMillis, receivedMillis,
322         userID, clientID,
323         resource, instanceID,
324         value,
325         eventVersion,
326         details,
327         uid
328       )
329
330       msg
331   }
332 }
333
334 class VMMessage extends Message {
335   /*
336    *   map:
337    *      uid        -> unique id for user
338    *      instanceID -> "cyclades.vm.kJSOLek"
339    *      vmName     -> "My Lab VM"
340    *      status     ->  "on", "off" , "destroy"
341    */
342   var _status = "on"
343   def nextStatus = {
344     if(_status=="off") _status = "on" else _status = "off"
345     _status
346   }
347   def makeEvent(millis:Long,map:Map[String,String]) = {
348     val uid    = map("uid")
349     val value  =  /* map("status")*/nextStatus match {
350        case "on" => "1"
351        case "off" => "0"
352        case "destroy" => "2"
353        case x => throw new Exception("VMMessage bad status: %s".format(x))
354       }
355     val id = "rc.%d.vmtime".format(nextID)
356     val occurredMillis = millis
357     val receivedMillis = millis
358     val userID = uid // "user%s@grnet.gr".format(uid)
359     val clientID = "cyclades"
360     val resource ="vmtime"
361     val instanceID = map("instanceID")
362     val eventVersion = "1.0"
363     val details = MessageFactory.newDetails(
364       MessageFactory.newStringDetail("VM Name", map("vmName"))
365     )
366
367     val msg = MessageFactory.newResourceEventMsg(
368       id,
369       occurredMillis, receivedMillis,
370       userID, clientID,
371       resource, instanceID,
372       value,
373       eventVersion,
374       details,
375       uid
376     )
377
378     msg
379   }
380  }
381
382 class CreationMessage extends Message {
383   /*
384    *  map contains:
385    *   uid -> user id
386    */
387   def makeEvent(millis:Long,map:Map[String,String]) = {
388     val uid    = map("uid")     //
389     val id = "im.%d.create.user".format(nextID)
390     val occurredMillis = millis
391     val receivedMillis = millis
392     val userID =  uid //"user%d@grnet.gr".format(mid)
393     val clientID = "astakos"
394     val isActive = false
395     val role = "default"
396     val eventVersion = "1.0"
397     val eventType = "create"
398
399     val msg = MessageFactory.newIMEventMsg(
400       id,
401       occurredMillis, receivedMillis,
402       userID, clientID,
403       isActive,
404       role,
405       eventVersion, eventType,
406       MessageFactory.newDetails(),
407       uid
408     )
409
410     msg
411   }
412 }
413
414 class AddCreditsMessage extends Message {
415   /*
416    *  map contains:
417    *    amount -> "2000"
418    *    uid    -> loverdos1
419    */
420   def makeEvent(millis:Long,map:Map[String,String]) = {
421     val uid    = map("uid")     //
422     val amount = map("amount")
423     val id = "im.%d.add.credits".format(nextID)
424     val occurredMillis = millis
425     val receivedMillis = millis
426     val userID = uid //"user%d@grnet.gr".format(uid)
427     val clientID = "astakos"
428     val isActive = false
429     val role = "default"
430     val eventVersion = "1.0"
431     val eventType = "addcredits"
432     val msg = MessageFactory.newResourceEventMsg(
433       id,
434       occurredMillis, receivedMillis,
435       userID, clientID,
436       "addcredits", "addcredits",
437       amount,
438       eventVersion,
439       MessageFactory.newDetails(),
440       uid
441     )
442
443     msg
444   }
445 }
446
447 object Message {
448   def apply(typ:String,args:Tuple2[String,String]*) : Message =
449     apply(typ,args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
450
451   val msgMap = Map[String,()=>Message](
452     "vm"      -> (() => new VMMessage),
453     "disk"    -> (() => new DiskMessage),
454     "create"  -> (() => new CreationMessage),
455     "credits" -> (() => new AddCreditsMessage)
456   )
457
458   def apply(typ:String,map:Map[String,String]) : Message = {
459     val msg = msgMap.getOrElse(typ,throw new Exception("Invalid type : "+typ))()
460     msg.updateMap(map)
461     msg
462   }
463 }
464
465
466 class User(serverAndPort:String,month:Int) {
467   val uid = "user%d@grnet.gr".format(UID.next)
468   val _creationMessage  : Message = Message("create","uid"->uid,"month"->month.toString,"spec"->"")
469   var _resources : List[Message] = Nil
470   var _billEntryMsg :Option[BillEntryMsg] = None
471
472   override def toString() = uid
473
474   def validateResults() : Boolean = {
475     throw new Exception("Not implemented !!!!")
476   }
477
478   def printResults() = {
479     Console.err.println("Messages sent:")
480     for { m <- JsonLog.get}
481       Console.err.println("%s".format(m)) //"\n==============\n%s\n==============="
482     Console.err.println("\n=========================\n")
483     Console.err.println("Response:\n" + (_billEntryMsg match {
484       case None => "NONE!!!!"
485       case Some(r) => AvroHelpers.jsonStringOfSpecificRecord(r)
486     }))
487   }
488
489   def add(no:Int,typ:String,args:Tuple2[String,String]*) : User =
490     add(no,typ,args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
491
492   def add(no:Int,typ:String,map:Map[String,String]) : User  =
493     add(no,typ,{_ => map})
494
495   def add(no:Int,typ:String,map:Int=>Map[String,String]) : User  = {
496     for {i <- 1 to no} {
497       val map0 : Map[String,String] = map(i) + ("uid"->uid) + ("month"->month.toString)
498       _resources = Message(typ,map0) :: _resources
499     }
500     this
501   }
502
503   def addVMs(no:Int,cronSpec:String) : User =
504     add(no,"vm",{i =>
505          Map("instanceID"->"cyclades.vm.%d".format(i),
506          "vmName"  -> "Virtual Machine #%d".format(i),
507          "status"  -> "on", // initially "on" msg
508          "spec"    -> cronSpec)})
509
510   def addFiles(no:Int,action:String/*,value:Int,minVal:Int,maxVal:Int*/,spec:String) : User =
511     add(no,"disk",{i =>
512        //Console.err.println("Adding file : " + "/Papers/file_%d.PDF".format(i))
513        Map("action" -> action,
514            "path"->"/Papers/file_%d.PDF".format(i),
515            //"value"->UID.random(minVal,maxVal).toString,
516            "spec" -> spec
517           )
518     })
519
520   def addCredits(no:Int,spec:String) : User = {
521     add(no,"credits",/*"month"->month.toString,"uid"->uid,*/"spec"->spec/*,"amount"->amount.toString*/)
522   }
523
524   def run(ordered:Boolean,wait:Int,minFile:Int,maxFile:Int,minAmount:Int,maxAmount:Int,maxJSONRetry :Int,
525           sendViaRabbitMQ:Boolean, sendDebugEnabled : Boolean)  =  {
526     var _messagesSent : List[Message] = Nil
527     _creationMessage.send("month"->month.toString,"uid"->uid,"spec"->"0 0 * %d ?".format(month)) // send once!
528     //Thread.sleep(2000)
529     var iter = _resources.toList
530     while(!iter.isEmpty)
531       iter = (if(!ordered) iter
532        else iter.sortWith{(m1,m2) => (m1.nextTime,m2.nextTime) match {
533         case (Some(l1),Some(l2)) => l1 <= l2
534         case (None,None) => true
535         case (None,Some(l)) => true
536         case (Some(l),None) => false
537       }}).filter({m =>
538         _messagesSent = _messagesSent ::: List(m)
539         m.send("value"->UID.random(minFile,maxFile).toString,
540                "amount"->UID.random(minAmount,maxAmount).toString,
541                "rabbitMQEnabled" -> sendViaRabbitMQ.toString,
542                "debugEnabled" -> sendDebugEnabled.toString
543                 //"status" -> UID.random(List("off","on"))
544         )})
545     Thread.sleep(wait)
546     _billEntryMsg = getBillResponse(maxJSONRetry)
547   }
548
549   private[this] def getBillResponse(max:Int) : Option[BillEntryMsg] = {
550     def get () : String = {
551       val fromMillis = _creationMessage._range.from.getTime
552       val toMillis   = _creationMessage._range.to.getTime
553       val url = " http://%s/user/%s/bill/%d/%d".format(serverAndPort,uid,fromMillis,toMillis)
554       try{
555         val in = new BufferedReader(
556           new InputStreamReader(
557             new URL(url).openConnection().
558               getInputStream()))
559         var inputLine = ""
560         var ret = ""
561         while ({inputLine = in.readLine();inputLine} != null)
562           ret += (if(ret.isEmpty) "" else "\n")+ inputLine
563         in.close()
564         ret
565       } catch {
566         case e:Exception =>
567           ""
568       }
569     }
570     var resp = ""
571     var count = 0
572     var ret : Option[BillEntryMsg] = None
573     while(resp.isEmpty && count < max){
574       if(count > 0) Console.err.println("Retrying for bill request.")
575       resp = get()
576       if(resp.isEmpty) Thread.sleep(1000)
577       else {
578         try{
579           var b = AvroHelpers.specificRecordOfJsonString(resp, new BillEntryMsg)
580           ret = Some(b)
581           if(b.getStatus().equals("processing")){
582             Thread.sleep(1000)
583             resp = ""
584           }
585         }  catch {
586           case e:Exception =>
587               e.printStackTrace
588               resp = ""
589         }
590       }
591       //sleep(1000L)
592       count += 1
593     }
594     ret
595   }
596 }
597
598 case class Resource(
599    val resType  : String, // Message.msgMap.keys
600    val instances: Long,
601    val cronSpec : String
602  )
603 extends JsonSupport {}
604
605 case class Scenario(
606   val ignoreScenario : Boolean,
607   val host : String,
608   val port : Long,
609   val sendOrdered : Boolean,
610   val sendViaRabbitMQ : Boolean,
611   val sendDebugEnabled : Boolean,
612   val validationEnabled : Boolean,
613   val billingMonth: Long,
614   val aquariumStartWaitMillis : Long,
615   val aquariumStopWaitMillis : Long,
616   val billResponseWaitMillis : Long,
617   val numberOfUsers  : Long,
618   val numberOfResponseRetries : Long,
619   val minFileCredits : Long,
620   val maxFileCredits : Long,
621   val minUserCredits : Long,
622   val maxUserCredits : Long,
623   val resources : List[Resource]
624 )
625 extends JsonSupport {}
626
627 case class Scenarios(
628    val scenarios : List[Scenario] )
629 extends JsonSupport {}
630
631 object ScenarioRunner {
632   val aquarium  = AquariumInstance.aquarium
633
634   def parseScenario(txt:String) : Scenario =
635     StdConverters.AllConverters.convertEx[Scenario](JsonTextFormat(txt))
636
637   def parseScenarios(txt:String) : Scenarios =
638     StdConverters.AllConverters.convertEx[Scenarios](JsonTextFormat(txt))
639
640   def runScenario(txt:String) : Unit = runScenario(parseScenario(txt))
641
642   private[this] def runUser(s:Scenario) : User = {
643     val user = new User("%s:%d".format(s.host,s.port),s.billingMonth.toInt)
644     val (minFileCredits,maxFileCredits) = (s.minFileCredits,s.maxFileCredits)
645     val (minUserCredits,maxUserCredits) = (s.maxUserCredits,s.maxUserCredits)
646     //Cron spec  minutes hours day-of-month Month Day-of-Week (we do not specify seconds)
647     AquariumInstance.run(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt) {
648       for{ r <- s.resources}  // create messages
649         r.resType match {
650           case "vm" =>
651             user.addVMs(r.instances.toInt,r.cronSpec)
652           case "disk" =>
653             user.addFiles(r.instances.toInt,"update",r.cronSpec)
654           case "credits" =>
655             user.addCredits(r.instances.toInt,r.cronSpec)
656         }
657       // run scenario
658       user.run(s.sendOrdered,s.billResponseWaitMillis.toInt,s.minFileCredits.toInt,
659                s.maxFileCredits.toInt,s.minUserCredits.toInt,s.maxUserCredits.toInt,
660                s.numberOfResponseRetries.toInt,s.sendViaRabbitMQ,s.sendDebugEnabled)
661     }
662     user
663   }
664
665   def runScenario(s:Scenario): Unit = {
666     if(s.ignoreScenario == false) {
667       Console.err.println("=================\nRunning scenario:\n %s\n=======================\n".format(s.toJsonString))
668       val tasks = for { u <- 1 to s.numberOfUsers.toInt}
669                   yield scala.actors.Futures.future(runUser(s))
670       val users = for { u <- tasks}  yield u()
671       users.foreach {u =>
672          u.printResults()
673          if(s.validationEnabled && u.validateResults() == false)
674            Console.err.println("Validation FAILED for user " + u)
675       }
676       Console.err.println("\n=========================\nStopping scenario\n=======================")
677     }
678   }
679
680   def runScenarios(txt:String) : Unit = runScenarios(parseScenarios(txt))
681
682   def runScenarios(ss:Scenarios) = {
683     Console.err.println("=================\nScenarios:\n %s\n=======================\n".format(ss.toJsonString))
684     ss.scenarios.foreach(runScenario(_))
685   }
686
687 }
688
689 object UserTest extends Loggable {
690
691    //vm,disk,credits
692   //add(1,"credits","month"->month.toString,"uid"->uid,"spec"->spec,"amount"->amount.toString)
693   /*
694     val host : String,
695   val port : Long,
696   val sendOrdered : Boolean,
697   val sendViaRabbitMQ : Boolean,
698   val sendDebugEnabled : Boolean,
699   val validationEnabled : Boolean,
700   val billingMonth: Long,
701   val aquariumStartWaitMillis : Long,
702   val aquariumStopWaitMillis : Long,
703   val billResponseWaitMillis : Long,
704   val numberOfUsers  : Long,
705   val numberOfResponseRetries : Long,
706   val minFileCredits : Long,
707   val maxFileCredits : Long,
708   val minUserCredits : Long,
709   val maxUserCredits : Long,
710   val resources : List[Resource]
711
712    */
713  val basic = new Scenario(false,"localhost",8888,true,false,false,false,9,2000,2000,2000,
714                           1,10,2000,5000,10000,50000,List[Resource](
715                           new Resource("credits",1, "00 00 10,12 9 ?"),
716                           new Resource("disk",1,"00 18 15,20,29,30 9 ?"),
717                           new Resource("vm",1,"00 18 14,17,19,20 9 ?")
718                         ))
719
720  def main(args: Array[String]) = {
721
722    try{
723      val lines = scala.io.Source.fromFile(args.head).mkString
724      ScenarioRunner.runScenarios(lines)
725    } catch {
726      case e:Exception =>
727        e.printStackTrace()
728        ScenarioRunner.runScenarios(new Scenarios(List(basic)))
729    }
730
731
732 /*    val user = new User("localhost:8888",9)
733     val (minFileCredits,maxFileCredits) = (2000,5000)
734     val (minUserCredits,maxUserCredits) = (10000,10000)
735     //Cron spec  minutes hours day-of-month Month Day-of-Week (we do not specify seconds)
736
737    val json =AquariumInstance.run(2000,2000) {
738           user.
739                   addCredits(1,"00 00 10,12 9 ?").
740                   addFiles(1,"update",2000,1000,3000,"00 18 15,20,29,30 9 ?").
741                   addVMs(1,"00 18 14,17,19,20 9 ?").
742                   //addVMs(5,"on","00 18 ? 9 Tue")
743                  run(true,2000,minFileCredits,maxFileCredits,minUserCredits,maxUserCredits)
744    }
745    Thread.sleep(2000)
746    Console.err.println("Messages sent:")
747    for { m <- JsonLog.get}
748      Console.err.println("%s".format(m)) //"\n==============\n%s\n==============="
749    Console.err.println("\n=========================\n")
750    Console.err.println("Response:\n" + json)*/
751  }
752
753 }
754
755
756 /*
757 object BillTest extends Loggable {
758
759   type JSON = String
760   type UID  = Long
761   type DATE = String
762
763   private[this] val counter = new AtomicLong(0L)
764   private[this] def nextID() = counter.getAndIncrement
765
766   private [this] val format = new SimpleDateFormat("HH/mm/s/dd/MM/yyyy");
767
768   val propsfile = new FileStreamResource(new File("aquarium.properties"))
769
770   var props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
771
772   val (astakosExchangeName,astakosRoutingKey) = ("astakos","astakos.user")
773
774   val (pithosExchangeName,pithosRoutingKey) = ("pithos","pithos.resource.diskspace")
775
776   val aquarium = {
777       exec("mongo aquarium --eval db.resevents.remove();db.imevents.remove();db.policies.remove();db.userstates.remove()",
778            Console.err.println(_))
779       new AquariumBuilder(props, ResourceLocator.DefaultPolicyMsg).
780       //update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
781       update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))).
782       build()
783   }
784
785
786   private[this] def exec(cmd : String,func : String=>Unit) : Unit = {
787     val commands = cmd.split(" ")
788     val proc = new ProcessBuilder(commands: _*).redirectErrorStream(true).start();
789     val ins = new java.io.BufferedReader(new java.io.InputStreamReader(proc.getInputStream))
790     val sb = new StringBuilder
791
792     //spin off a thread to read process output.
793     val outputReaderThread = new Thread(new Runnable(){
794       def run : Unit = {
795         var ln : String = null
796         while({ln = ins.readLine; ln != null})
797           func(ln)
798       }
799     })
800     outputReaderThread.start()
801
802     //suspense this main thread until sub process is done.
803     proc.waitFor
804
805     //wait until output is fully read/completed.
806     outputReaderThread.join()
807
808     ins.close()
809   }
810
811
812   private [this] def createUser(date:DATE) : (JSON,UID) = {
813     val mid = nextID
814     val id = "im.%d.create.user".format(mid)
815     val millis = format.parse(date).getTime
816     val occurredMillis = millis
817     val receivedMillis = millis
818     val userID = "user%d@grnet.gr".format(mid)
819     val clientID = "astakos"
820     val isActive = false
821     val role = "default"
822     val eventVersion = "1.0"
823     val eventType = "create"
824
825     val msg = MessageFactory.newIMEventMsg(id,occurredMillis,receivedMillis,userID, clientID, isActive,role,eventVersion,eventType)
826     val json = AvroHelpers.jsonStringOfSpecificRecord(msg)
827     (json, mid)
828   }
829
830   private [this] def addCredits(date:DATE,uid:UID,amount:Long) : JSON = {
831     val id = "im.%d.add.credits".format(nextID)
832     val millis = format.parse(date).getTime
833     val occurredMillis = millis
834     val receivedMillis = millis
835     val userID = "user%d@grnet.gr".format(uid)
836     val clientID = "astakos"
837     val isActive = false
838     val eventVersion = "1.0"
839     val resource = "addcredits"
840     val instanceID = "addcredits"
841
842     val msg = MessageFactory.newResourceEventMsg(id, occurredMillis, receivedMillis, userID, clientID, resource, instanceID, amount.toString, eventVersion)
843     val json = AvroHelpers.jsonStringOfSpecificRecord(msg)
844     json
845   }
846
847   private [this] def makePithos(date:DATE,uid:UID,path:String,
848                                 value:Double,action:String) : JSON = {
849     val id = "rc.%d.object.%s".format(nextID,action)
850     val millis = format.parse(date).getTime
851     val occurredMillis = millis
852     val receivedMillis = millis
853     val userID = "user%d@grnet.gr".format(uid)
854     val clientID = "pithos"
855     val resource ="diskspace"
856     val instanceID = "1"
857     val eventVersion = "1.0"
858     val details = MessageFactory.newDetails(
859       MessageFactory.newStringDetail("action", "object %s".format(action)),
860       MessageFactory.newStringDetail("total", "0.0"),
861       MessageFactory.newStringDetail("user", userID),
862       MessageFactory.newStringDetail("path", path)
863     )
864
865     val msg = MessageFactory.newResourceEventMsg(id, occurredMillis, receivedMillis, userID, clientID, resource, instanceID, value.toString, eventVersion, details)
866     val json = AvroHelpers.jsonStringOfSpecificRecord(msg)
867     json
868   }
869
870   private[this] def sendCreate(date:DATE) : UID = {
871     val (json,uid) = createUser(date)
872     aquarium(Aquarium.EnvKeys.rabbitMQProducer).
873     sendMessage(astakosExchangeName,astakosRoutingKey,json)
874     Console.err.println("Sent message:\n%s\n".format(json))
875     uid
876   }
877
878   private[this] def sendAddCredits(date:DATE,uid:UID,amount:Long) = {
879     val json = addCredits(date,uid,amount)
880     aquarium(Aquarium.EnvKeys.rabbitMQProducer).
881     sendMessage(astakosExchangeName,astakosRoutingKey,
882                 json)
883     Console.err.println("Sent message:\n%s\n".format(json))
884   }
885
886   private[this] def sendPithos(date:DATE,uid:UID,path:String,
887                                value:Double,action:String) = {
888     val json = makePithos(date,uid,path,value,action)
889     aquarium(Aquarium.EnvKeys.rabbitMQProducer).
890     sendMessage(pithosExchangeName,pithosRoutingKey,
891                 json)
892     Console.err.println("Sent message:\n%s\n".format(json))
893   }
894
895   private[this] def jsonOf(url:String) : JSON = {
896      val in = new BufferedReader(
897                          new InputStreamReader(
898                          new URL(url).openConnection().
899                          getInputStream()))
900       var inputLine = ""
901       var ret = ""
902       while ({inputLine = in.readLine();inputLine} != null)
903         ret += (if(ret.isEmpty) "" else "\n")+ inputLine
904       in.close()
905       ret
906   }
907
908   private[this] def getBill(uid:Long,from:String,to:String) : JSON = {
909     val fromMillis = format.parse(from).getTime
910     val toMillis   = format.parse(to).getTime
911     val billURL = " http://localhost:8888/user/user%d@grnet.gr/bill/%d/%d".format(uid,fromMillis,toMillis)
912     try{
913       jsonOf(billURL)
914     } catch {
915       case e:Exception =>
916         ""
917     }
918   }
919
920   private[this] def sleep(l:Long) = {
921   try {
922       Thread.sleep(l)
923     } catch {
924       case ex:InterruptedException =>
925         Thread.currentThread().interrupt()
926     }
927   }
928
929
930   private[this] def testCase1() : JSON  = {
931     /* GET BILL FROM TO*/
932     val billFromDate = "00/00/00/01/08/2012"
933     val billToDate= "23/59/59/31/08/2012"
934     /* USER Creation */
935     val creationDate = "15/00/00/03/08/2012"
936     /* ADD CREDITS */
937     val addCreditsDate = "18/15/00/05/08/2012"
938     val creditsToAdd = 6000
939     /* Pithos STUFF */
940     val pithosPath = "/Papers/GOTO_HARMFUL.PDF"
941
942     val pithosDate1 = "20/30/00/05/08/2012"
943     val pithosAction1 = "update"
944     val pithosValue1 = 2000
945
946
947     val pithosDate2 = "21/05/00/15/08/2012"
948     val pithosAction2 = "update"
949     val pithosValue2 = 4000
950
951
952     val pithosDate3 = "08/05/00/20/08/2012"
953     val pithosAction3 = "update"
954     val pithosValue3 = 100
955
956     val id =
957       sendCreate(creationDate)
958       //Thread.sleep(5000)
959       sendAddCredits(addCreditsDate,id,creditsToAdd)
960       //Thread.sleep(5000)
961       sendPithos(pithosDate1,id,pithosPath,pithosValue1,pithosAction1)
962       //Thread.sleep(5000)
963       sendPithos(pithosDate2,id,pithosPath,pithosValue2,pithosAction2)
964       //
965       sendPithos(pithosDate3,id,pithosPath,pithosValue3,pithosAction3)
966
967
968     Console.err.println("Waiting for stuff to be processed")
969     Thread.sleep(5000)
970
971     var resp = ""
972     var count = 0
973     while(resp.isEmpty && count < 5){
974       if(count > 0) Console.err.println("Retrying for bill request.")
975       resp = getBill(id,billFromDate,billToDate)
976       if(resp.isEmpty) Thread.sleep(1000)
977       //sleep(1000L)
978       count += 1
979     }
980     Console.err.println("Sending URL done")
981     resp
982   }
983
984   def runTestCase(f: => JSON) = {
985     var json = ""
986     aquarium.start
987     Thread.sleep(2000)
988     try{
989       json = f
990     }  catch{
991       case e:Exception =>
992         e.printStackTrace
993     }
994     aquarium.stop
995     Thread.sleep(1000)
996     Console.err.println("Response : " + json )
997   }
998
999   def main(args: Array[String]) = {
1000     //Console.err.println("JSON: " +  (new BillEntry).toJsonString)
1001     runTestCase(testCase1)
1002   }
1003 }
1004 */