e1f8b9fed4ca61a6f93d9745c5e5a74375f6916c
[aquarium] / src / test / scala / gr / grnet / aquarium / BillTest.scala
1 /*
2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
3 *
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
6 * conditions are met:
7 *
8 *   1. Redistributions of source code must retain the above
9 *      copyright notice, this list of conditions and the following
10 *      disclaimer.
11 *
12 *   2. Redistributions in binary form must reproduce the above
13 *      copyright notice, this list of conditions and the following
14 *      disclaimer in the documentation and/or other materials
15 *      provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 *
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
34 */
35
36 package gr.grnet.aquarium
37
38 import com.ckkloverdos.props.Props
39 import com.ckkloverdos.resource.FileStreamResource
40 import gr.grnet.aquarium.converter.{StdConverters}
41 import gr.grnet.aquarium.message.avro.{AvroHelpers, MessageFactory}
42 import java.io.{InputStreamReader, BufferedReader, File}
43 import java.net.URL
44 import java.text.SimpleDateFormat
45 import java.util.concurrent.atomic.AtomicLong
46 import gr.grnet.aquarium.util.{Lock, Loggable}
47 import java.util.{Date, Calendar, GregorianCalendar}
48 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
49 import gr.grnet.aquarium.policy.CronSpec
50 import gr.grnet.aquarium.message.avro.gen.{BillEntryMsg, IMEventMsg, ResourceEventMsg}
51 import org.apache.avro.specific.SpecificRecord
52 import util.json.JsonSupport
53 import actors.Future
54
55
56 /*
57 * @author Prodromos Gerakios <pgerakios@grnet.gr>
58 */
59
60
61 object UID {
62   private[this] val counter = new AtomicLong(0L)
63   def next() = counter.getAndIncrement
64   def random(min:Int=Int.MinValue,max:Int=Int.MaxValue) =
65       min + (scala.math.random.toInt % (max+1)) % (max+1)
66
67   def random[A](l:List[A]) : A = {
68     val sz = l.size
69     if(sz==0) throw new Exception("random")
70      l(random(0,sz-1))
71   }
72 }
73
74 object Process {
75   private[this] def exec(cmd : String,func : String=>Unit) : Unit = {
76     val commands = cmd.split(" ")
77     val proc = new ProcessBuilder(commands: _*).redirectErrorStream(true).start();
78     val ins = new java.io.BufferedReader(new java.io.InputStreamReader(proc.getInputStream))
79     val sb = new StringBuilder
80
81     //spin off a thread to read process output.
82     val outputReaderThread = new Thread(new Runnable(){
83       def run : Unit = {
84         var ln : String = null
85         while({ln = ins.readLine; ln != null})
86           func(ln)
87       }
88     })
89     outputReaderThread.start()
90
91     //suspense this main thread until sub process is done.
92     proc.waitFor
93
94     //wait until output is fully read/completed.
95     outputReaderThread.join()
96
97     ins.close()
98   }
99   def exec(cmd:String) : Unit = exec(cmd,Console.err.println(_))
100 }
101
102 object Mongo {
103   def clear = Process.exec("mongo aquarium --eval db.resevents.remove();db.imevents.remove();db.policies.remove();db.userstates.remove()")
104 }
105
106 object AquariumInstance {
107   //val propsfile = new FileStreamResource(new File("aquarium.properties"))
108   var props: Props = ResourceLocator.AquariumProperties
109   // Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
110   val aquarium = {
111     Mongo.clear
112     new AquariumBuilder(props, ResourceLocator.DefaultPolicyMsg).
113       //update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
114       update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))).
115       build()
116   }
117   def run(billWait:Int, stop:Int)(f : => Unit) = {
118     aquarium.start
119     Thread.sleep(billWait)
120     try{
121       f
122     } finally {
123       Console.err.println("Stopping aquarium")
124       aquarium.stop
125       Thread.sleep(stop)
126       Console.err.println("Stopping aquarium --- DONE")
127     }
128   }
129 }
130
131 object JsonLog {
132   private[this] final val lock = new Lock()
133   private[this] var _log : List[String] = Nil
134   def add(json:String) =  lock.withLock(_log = _log ::: List(json))
135   def get() : List[String] = lock.withLock(_log.toList)
136 }
137
138 /*object MessageQueue {
139   private[this] final val lock = new Lock()
140   private[this] var _sortedMsgs  = SortedMap[Timeslot,(String,String,String)]
141 } */
142
143 object MessageService {
144
145   def send(event:SpecificRecord, rabbitMQEnabled : Boolean = false, debugEnabled:Boolean =false) = {
146     val json = AvroHelpers.jsonStringOfSpecificRecord(event)
147     if(rabbitMQEnabled){
148       val (exchangeName,routingKey) = event match {
149         case rc:ResourceEventMsg => rc.getResource match {
150           case "vmtime" =>
151             ("cyclades","cyclades.resource.vmtime")
152           case "diskspace" =>
153             ("pithos","pithos.resource.diskspace")
154           case "addcredits" =>
155             ("astakos","astakos.resource")
156           case x =>
157             throw new Exception("send cast failed: %s".format(x))
158         }
159         case im:IMEventMsg =>
160           ("astakos","astakos.user")
161         case _ =>
162           throw new Exception("send cast failed")
163       }
164       AquariumInstance.aquarium(Aquarium.EnvKeys.rabbitMQProducer).
165         sendMessage(exchangeName,routingKey,json)
166     } else {
167       val uid = event match {
168         case rcevent: ResourceEventMsg =>
169             AquariumInstance.aquarium.resourceEventStore.insertResourceEvent(rcevent)
170             rcevent.getUserID
171         case imevent: IMEventMsg =>
172              AquariumInstance.aquarium.imEventStore.insertIMEvent(imevent)
173              imevent.getUserID
174       }
175       val userActorRef = AquariumInstance.aquarium.akkaService.getOrCreateUserActor(uid)
176       userActorRef ! event
177     }
178     val millis = event match {
179       case rc:ResourceEventMsg => rc.getOccurredMillis
180       case im:IMEventMsg => im.getOccurredMillis
181     }
182     JsonLog.add(/*new Date(millis).toString + " ---- " +*/ json)
183     if(debugEnabled)
184       Console.err.println("Sent message:\n%s - %s\n".format(new Date(millis).toString,json))
185   }
186 }
187
188 abstract class Message {
189   val dbg = true
190   val cal =   new GregorianCalendar
191   var _range : Timeslot = null
192   var _cronSpec : CronSpec = null
193   var _messagesSent = 0
194   //var _done = false
195   var _map = Map[String,String]()
196
197   def updateMap(args:Tuple2[String,String]*) : Message  =
198     updateMap(args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
199
200   def updateMap(map:Map[String,String]) : Message = {
201     def mergeMap[A, B](ms: List[Map[A, B]])(f: (B, B) => B): Map[A, B] =
202       (Map[A, B]() /: (for (m <- ms; kv <- m) yield kv)) { (a, kv) =>
203         a + (if (a.contains(kv._1)) kv._1 -> f(a(kv._1), kv._2) else kv)
204     }
205     _map =  mergeMap(List(_map,map))((v1,v2) => v2)
206     (_map.get("month"),_map.get("spec")) match {
207       case (Some((month0:String)),Some(spec)) =>
208         val month : Int = month0.toInt
209         if((_cronSpec==null ||  _cronSpec.cronSpec != spec ||cal.get(Calendar.MONTH) != month -1)) {
210            val d1 = getDate(1,if(month==12) 1 else month+1,year,0,0,0)
211            val d0 = getDate(1,month,year,0,0,0)
212            _range = Timeslot(d0,d1 - 1000)
213           _cronSpec = new CronSpec(if(spec.isEmpty) "* * * * *" else spec)
214         }
215       case _ => ()
216     }
217     this
218   }
219
220   //def done = _done
221   def sentMessages = _messagesSent
222
223   def nextTime : Option[Long] = nextTime(false)
224
225   def nextTime(update:Boolean) : Option[Long] = {
226     _cronSpec match{
227       case null =>
228         None
229       case _ =>
230         _cronSpec.nextValidDate(_range,cal.getTime) match {
231           case Some(d) =>
232             val millis = d.getTime
233             if(update) cal.setTimeInMillis(millis)
234             Some(millis)
235           case None    =>
236             None
237         }
238     }
239   }
240
241   def year : Int = {
242     cal.setTimeInMillis(System.currentTimeMillis())
243     cal.get(Calendar.YEAR)
244   }
245
246   def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int,sec:Int) : Long = {
247     cal.set(year,month-1,day,hour,min,sec)
248     cal.getTimeInMillis
249   }
250
251   def getMillis : Long = cal.getTimeInMillis
252
253   def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int) : Long =
254     getDate(day,month,year,hour,min,0)
255
256   def setMillis(millis:Long) = {
257     cal.setTimeInMillis(millis)
258   }
259
260   def addMillis(day:Int,hour:Int) = {
261     cal.roll(Calendar.DATE,day)
262     cal.roll(Calendar.DATE,hour)
263   }
264
265   def nextID = UID.next
266
267   def makeEvent(millis:Long,map:Map[String,String]) : SpecificRecord
268
269   def send(args:Tuple2[String,String]*) : Boolean =
270     send(args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
271
272   def send(map:Map[String,String]) : Boolean = {
273     nextTime(true) match {
274       case Some(millis) =>
275         updateMap(map)
276         val event = makeEvent(millis,_map)
277         val ren = _map.getOrElse("rabbitMQEnabled","false").toBoolean
278         val rdb = _map.getOrElse("debugEnabled","false").toBoolean
279         MessageService.send(event,ren,rdb)
280         _messagesSent += 1
281         true
282       case None =>
283         //_done = true
284         false
285     }
286   }
287
288 }
289
290 class DiskMessage extends Message {
291   /*
292    *  map:
293    *      "action" -> "update" , "delete" , "purge"
294    *      "uid"    ->
295    *      "path"   ->
296    *      "value"  ->
297    */
298   def makeEvent(millis:Long,map:Map[String,String]) = {
299       val action = map("action")
300       val uid    = map("uid")
301       val path   = map("path")
302       val value  = map("value")
303       val id = "rc.%d.object.%s".format(nextID,action)
304       val occurredMillis = millis
305       val receivedMillis = millis
306       val userID = uid //"user%s@grnet.gr".format(uid)
307       val clientID = "pithos"
308       val resource ="diskspace"
309       val instanceID = "1"
310       val eventVersion = "1.0"
311       val details = MessageFactory.newDetails(
312         MessageFactory.newStringDetail("action", "object %s".format(action)),
313         MessageFactory.newStringDetail("total", "0.0"),
314         MessageFactory.newStringDetail("user", userID),
315         MessageFactory.newStringDetail("path", path)
316       )
317
318       val msg = MessageFactory.newResourceEventMsg(
319         id,
320         occurredMillis, receivedMillis,
321         userID, clientID,
322         resource, instanceID,
323         value,
324         eventVersion,
325         details,
326         uid
327       )
328
329       msg
330   }
331 }
332
333 class VMMessage extends Message {
334   /*
335    *   map:
336    *      uid        -> unique id for user
337    *      instanceID -> "cyclades.vm.kJSOLek"
338    *      vmName     -> "My Lab VM"
339    *      status     ->  "on", "off" , "destroy"
340    */
341   var _status = "on"
342   def nextStatus = {
343     if(_status=="off") _status = "on" else _status = "off"
344     _status
345   }
346   def makeEvent(millis:Long,map:Map[String,String]) = {
347     val uid    = map("uid")
348     val value  =  /* map("status")*/nextStatus match {
349        case "on" => "1"
350        case "off" => "0"
351        case "destroy" => "2"
352        case x => throw new Exception("VMMessage bad status: %s".format(x))
353       }
354     val id = "rc.%d.vmtime".format(nextID)
355     val occurredMillis = millis
356     val receivedMillis = millis
357     val userID = uid // "user%s@grnet.gr".format(uid)
358     val clientID = "cyclades"
359     val resource ="vmtime"
360     val instanceID = map("instanceID")
361     val eventVersion = "1.0"
362     val details = MessageFactory.newDetails(
363       MessageFactory.newStringDetail("VM Name", map("vmName"))
364     )
365
366     val msg = MessageFactory.newResourceEventMsg(
367       id,
368       occurredMillis, receivedMillis,
369       userID, clientID,
370       resource, instanceID,
371       value,
372       eventVersion,
373       details,
374       uid
375     )
376
377     msg
378   }
379  }
380
381 class CreationMessage extends Message {
382   /*
383    *  map contains:
384    *   uid -> user id
385    */
386   def makeEvent(millis:Long,map:Map[String,String]) = {
387     val uid    = map("uid")     //
388     val id = "im.%d.create.user".format(nextID)
389     val occurredMillis = millis
390     val receivedMillis = millis
391     val userID =  uid //"user%d@grnet.gr".format(mid)
392     val clientID = "astakos"
393     val isActive = false
394     val role = "default"
395     val eventVersion = "1.0"
396     val eventType = "create"
397
398     val msg = MessageFactory.newIMEventMsg(
399       id,
400       occurredMillis, receivedMillis,
401       userID, clientID,
402       isActive,
403       role,
404       eventVersion, eventType,
405       MessageFactory.newDetails(),
406       uid
407     )
408
409     msg
410   }
411 }
412
413 class AddCreditsMessage extends Message {
414   /*
415    *  map contains:
416    *    amount -> "2000"
417    *    uid    -> loverdos1
418    */
419   def makeEvent(millis:Long,map:Map[String,String]) = {
420     val uid    = map("uid")     //
421     val amount = map("amount")
422     val id = "im.%d.add.credits".format(nextID)
423     val occurredMillis = millis
424     val receivedMillis = millis
425     val userID = uid //"user%d@grnet.gr".format(uid)
426     val clientID = "astakos"
427     val isActive = false
428     val role = "default"
429     val eventVersion = "1.0"
430     val eventType = "addcredits"
431     val msg = MessageFactory.newResourceEventMsg(
432       id,
433       occurredMillis, receivedMillis,
434       userID, clientID,
435       "addcredits", "addcredits",
436       amount,
437       eventVersion,
438       MessageFactory.newDetails(),
439       uid
440     )
441
442     msg
443   }
444 }
445
446 object Message {
447   def apply(typ:String,args:Tuple2[String,String]*) : Message =
448     apply(typ,args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
449
450   val msgMap = Map[String,()=>Message](
451     "vm"      -> (() => new VMMessage),
452     "disk"    -> (() => new DiskMessage),
453     "create"  -> (() => new CreationMessage),
454     "credits" -> (() => new AddCreditsMessage)
455   )
456
457   def apply(typ:String,map:Map[String,String]) : Message = {
458     val msg = msgMap.getOrElse(typ,throw new Exception("Invalid type : "+typ))()
459     msg.updateMap(map)
460     msg
461   }
462 }
463
464
465 class User(serverAndPort:String,month:Int) {
466   val uid = "user%d@grnet.gr".format(UID.next)
467   val _creationMessage  : Message = Message("create","uid"->uid,"month"->month.toString,"spec"->"")
468   var _resources : List[Message] = Nil
469   var _billEntryMsg :Option[BillEntryMsg] = None
470
471   override def toString() = uid
472
473   def validateResults() : Boolean = {
474     throw new Exception("Not implemented !!!!")
475   }
476
477   def printResults() = {
478     Console.err.println("Messages sent:")
479     for { m <- JsonLog.get}
480       Console.err.println("%s".format(m)) //"\n==============\n%s\n==============="
481     Console.err.println("\n=========================\n")
482     Console.err.println("Response:\n" + (_billEntryMsg match {
483       case None => "NONE!!!!"
484       case Some(r) => AvroHelpers.jsonStringOfSpecificRecord(r)
485     }))
486   }
487
488   def add(no:Int,typ:String,args:Tuple2[String,String]*) : User =
489     add(no,typ,args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
490
491   def add(no:Int,typ:String,map:Map[String,String]) : User  =
492     add(no,typ,{_ => map})
493
494   def add(no:Int,typ:String,map:Int=>Map[String,String]) : User  = {
495     for {i <- 1 to no} {
496       val map0 : Map[String,String] = map(i) + ("uid"->uid) + ("month"->month.toString)
497       _resources = Message(typ,map0) :: _resources
498     }
499     this
500   }
501
502   def addVMs(no:Int,cronSpec:String) : User =
503     add(no,"vm",{i =>
504          Map("instanceID"->"cyclades.vm.%d".format(i),
505          "vmName"  -> "Virtual Machine #%d".format(i),
506          "status"  -> "on", // initially "on" msg
507          "spec"    -> cronSpec)})
508
509   def addFiles(no:Int,action:String/*,value:Int,minVal:Int,maxVal:Int*/,spec:String) : User =
510     add(no,"disk",{i =>
511        Map("action" -> action,
512            "path"->"/Papers/file_%d.PDF".format(i),
513            //"value"->UID.random(minVal,maxVal).toString,
514            "spec" -> spec
515           )
516     })
517
518   def addCredits(no:Int,spec:String) : User = {
519     add(no,"credits",/*"month"->month.toString,"uid"->uid,*/"spec"->spec/*,"amount"->amount.toString*/)
520   }
521
522   def run(ordered:Boolean,wait:Int,minFile:Int,maxFile:Int,minAmount:Int,maxAmount:Int,maxJSONRetry :Int,
523           sendViaRabbitMQ:Boolean, sendDebugEnabled : Boolean)  =  {
524     var _messagesSent : List[Message] = Nil
525     _creationMessage.send("month"->month.toString,"uid"->uid,"spec"->"0 0 * %d ?".format(month)) // send once!
526     //Thread.sleep(2000)
527     var iter = _resources.toList
528     while(!iter.isEmpty)
529       iter = (if(!ordered) iter
530        else iter.sortWith{(m1,m2) => (m1.nextTime,m2.nextTime) match {
531         case (Some(l1),Some(l2)) => l1 <= l2
532         case (None,None) => true
533         case (None,Some(l)) => true
534         case (Some(l),None) => false
535       }}).filter({m =>
536         _messagesSent = _messagesSent ::: List(m)
537         m.send("value"->UID.random(minFile,maxFile).toString,
538                "amount"->UID.random(minAmount,maxAmount).toString,
539                "rabbitMQEnabled" -> sendViaRabbitMQ.toString,
540                "debugEnabled" -> sendDebugEnabled.toString
541                 //"status" -> UID.random(List("off","on"))
542         )})
543     Thread.sleep(wait)
544     _billEntryMsg = getBillResponse(maxJSONRetry)
545   }
546
547   private[this] def getBillResponse(max:Int) : Option[BillEntryMsg] = {
548     def get () : String = {
549       val fromMillis = _creationMessage._range.from.getTime
550       val toMillis   = _creationMessage._range.to.getTime
551       val url = " http://%s/user/%s/bill/%d/%d".format(serverAndPort,uid,fromMillis,toMillis)
552       try{
553         val in = new BufferedReader(
554           new InputStreamReader(
555             new URL(url).openConnection().
556               getInputStream()))
557         var inputLine = ""
558         var ret = ""
559         while ({inputLine = in.readLine();inputLine} != null)
560           ret += (if(ret.isEmpty) "" else "\n")+ inputLine
561         in.close()
562         ret
563       } catch {
564         case e:Exception =>
565           ""
566       }
567     }
568     var resp = ""
569     var count = 0
570     var ret : Option[BillEntryMsg] = None
571     while(resp.isEmpty && count < max){
572       if(count > 0) Console.err.println("Retrying for bill request.")
573       resp = get()
574       if(resp.isEmpty) Thread.sleep(1000)
575       else {
576         try{
577           var b = AvroHelpers.specificRecordOfJsonString(resp, new BillEntryMsg)
578           ret = Some(b)
579           if(b.getStatus().equals("processing")){
580             Thread.sleep(1000)
581             resp = ""
582           }
583         }  catch {
584           case e:Exception =>
585               e.printStackTrace
586               resp = ""
587         }
588       }
589       //sleep(1000L)
590       count += 1
591     }
592     ret
593   }
594 }
595
596 class Resource(
597    val resType  : String, // Message.msgMap.keys
598    val instances: Long,
599    val cronSpec : String
600  )
601 extends JsonSupport {}
602
603 class Scenario(
604   val ignoreScenario : Boolean,
605   val host : String,
606   val port : Long,
607   val sendOrdered : Boolean,
608   val sendViaRabbitMQ : Boolean,
609   val sendDebugEnabled : Boolean,
610   val validationEnabled : Boolean,
611   val billingMonth: Long,
612   val aquariumStartWaitMillis : Long,
613   val aquariumStopWaitMillis : Long,
614   val billResponseWaitMillis : Long,
615   val numberOfUsers  : Long,
616   val numberOfResponseRetries : Long,
617   val minFileCredits : Long,
618   val maxFileCredits : Long,
619   val minUserCredits : Long,
620   val maxUserCredits : Long,
621   val resources : List[Resource]
622 )
623 extends JsonSupport {}
624
625 class Scenarios(
626    val scenarios : List[Scenario] )
627 extends JsonSupport {}
628
629 object ScenarioRunner {
630   val aquarium  = AquariumInstance.aquarium
631
632   def parseScenario(txt:String) : Scenario =
633     StdConverters.AllConverters.convertEx[Scenario](txt)
634
635   def parseScenarios(txt:String) : Scenarios =
636     StdConverters.AllConverters.convertEx[Scenarios](txt)
637
638   def runScenario(txt:String) : Unit = runScenario(parseScenario(txt))
639
640   private[this] def runUser(s:Scenario) : User = {
641     val user = new User("%s:%d".format(s.host,s.port),s.billingMonth.toInt)
642     val (minFileCredits,maxFileCredits) = (s.minFileCredits,s.maxFileCredits)
643     val (minUserCredits,maxUserCredits) = (s.maxUserCredits,s.maxUserCredits)
644     //Cron spec  minutes hours day-of-month Month Day-of-Week (we do not specify seconds)
645     AquariumInstance.run(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt) {
646       for{ r <- s.resources}  // create messages
647         r.resType match {
648           case "vm" =>
649             user.addVMs(r.instances.toInt,r.cronSpec)
650           case "disk" =>
651             user.addFiles(r.instances.toInt,"update",r.cronSpec)
652           case "credits" =>
653             user.addCredits(r.instances.toInt,r.cronSpec)
654         }
655       // run scenario
656       user.run(s.sendOrdered,s.billResponseWaitMillis.toInt,s.minFileCredits.toInt,
657                s.maxFileCredits.toInt,s.minUserCredits.toInt,s.maxUserCredits.toInt,
658                s.numberOfResponseRetries.toInt,s.sendViaRabbitMQ,s.sendDebugEnabled)
659     }
660     user
661   }
662
663   def runScenario(s:Scenario): Unit = {
664     if(s.ignoreScenario == false) {
665       Console.err.println("=================\nRunning scenario:\n %s\n=======================\n".format(s.toJsonString))
666       val tasks = for { u <- 1 to s.numberOfUsers.toInt}
667                   yield scala.actors.Futures.future(runUser(s))
668       val users = for { u <- tasks}  yield u()
669       users.foreach {u =>
670          u.printResults()
671          if(s.validationEnabled && u.validateResults() == false)
672            Console.err.println("Validation FAILED for user " + u)
673       }
674       Console.err.println("\n=========================\nStopping scenario\n=======================")
675     }
676   }
677
678   def runScenarios(txt:String) : Unit = runScenarios(parseScenarios(txt))
679
680   def runScenarios(ss:Scenarios) = {
681     Console.err.println("=================\nScenarios:\n %s\n=======================\n".format(ss.toJsonString))
682     ss.scenarios.foreach(runScenario(_))
683   }
684
685 }
686
687 object UserTest extends Loggable {
688
689    //vm,disk,credits
690   //add(1,"credits","month"->month.toString,"uid"->uid,"spec"->spec,"amount"->amount.toString)
691   /*
692     val host : String,
693   val port : Long,
694   val sendOrdered : Boolean,
695   val sendViaRabbitMQ : Boolean,
696   val sendDebugEnabled : Boolean,
697   val validationEnabled : Boolean,
698   val billingMonth: Long,
699   val aquariumStartWaitMillis : Long,
700   val aquariumStopWaitMillis : Long,
701   val billResponseWaitMillis : Long,
702   val numberOfUsers  : Long,
703   val numberOfResponseRetries : Long,
704   val minFileCredits : Long,
705   val maxFileCredits : Long,
706   val minUserCredits : Long,
707   val maxUserCredits : Long,
708   val resources : List[Resource]
709
710    */
711  val basic = new Scenario(false,"localhost",8888,true,false,false,false,9,2000,2000,2000,
712                           1,10,2000,5000,10000,50000,List[Resource](
713                           new Resource("credits",1, "00 00 10,12 9 ?"),
714                           new Resource("disk",1,"00 18 15,20,29,30 9 ?"),
715                           new Resource("vm",1,"00 18 14,17,19,20 9 ?")
716                         ))
717
718  def main(args: Array[String]) = {
719
720    try{
721      val lines = scala.io.Source.fromFile(args.head).mkString
722      ScenarioRunner.runScenarios(new Scenarios(List(basic)))
723    } catch {
724      case e:Exception =>
725        e.printStackTrace()
726        ScenarioRunner.runScenarios(new Scenarios(List(basic)))
727    }
728
729
730 /*    val user = new User("localhost:8888",9)
731     val (minFileCredits,maxFileCredits) = (2000,5000)
732     val (minUserCredits,maxUserCredits) = (10000,10000)
733     //Cron spec  minutes hours day-of-month Month Day-of-Week (we do not specify seconds)
734
735    val json =AquariumInstance.run(2000,2000) {
736           user.
737                   addCredits(1,"00 00 10,12 9 ?").
738                   addFiles(1,"update",2000,1000,3000,"00 18 15,20,29,30 9 ?").
739                   addVMs(1,"00 18 14,17,19,20 9 ?").
740                   //addVMs(5,"on","00 18 ? 9 Tue")
741                  run(true,2000,minFileCredits,maxFileCredits,minUserCredits,maxUserCredits)
742    }
743    Thread.sleep(2000)
744    Console.err.println("Messages sent:")
745    for { m <- JsonLog.get}
746      Console.err.println("%s".format(m)) //"\n==============\n%s\n==============="
747    Console.err.println("\n=========================\n")
748    Console.err.println("Response:\n" + json)*/
749  }
750
751 }
752
753
754 /*
755 object BillTest extends Loggable {
756
757   type JSON = String
758   type UID  = Long
759   type DATE = String
760
761   private[this] val counter = new AtomicLong(0L)
762   private[this] def nextID() = counter.getAndIncrement
763
764   private [this] val format = new SimpleDateFormat("HH/mm/s/dd/MM/yyyy");
765
766   val propsfile = new FileStreamResource(new File("aquarium.properties"))
767
768   var props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
769
770   val (astakosExchangeName,astakosRoutingKey) = ("astakos","astakos.user")
771
772   val (pithosExchangeName,pithosRoutingKey) = ("pithos","pithos.resource.diskspace")
773
774   val aquarium = {
775       exec("mongo aquarium --eval db.resevents.remove();db.imevents.remove();db.policies.remove();db.userstates.remove()",
776            Console.err.println(_))
777       new AquariumBuilder(props, ResourceLocator.DefaultPolicyMsg).
778       //update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
779       update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))).
780       build()
781   }
782
783
784   private[this] def exec(cmd : String,func : String=>Unit) : Unit = {
785     val commands = cmd.split(" ")
786     val proc = new ProcessBuilder(commands: _*).redirectErrorStream(true).start();
787     val ins = new java.io.BufferedReader(new java.io.InputStreamReader(proc.getInputStream))
788     val sb = new StringBuilder
789
790     //spin off a thread to read process output.
791     val outputReaderThread = new Thread(new Runnable(){
792       def run : Unit = {
793         var ln : String = null
794         while({ln = ins.readLine; ln != null})
795           func(ln)
796       }
797     })
798     outputReaderThread.start()
799
800     //suspense this main thread until sub process is done.
801     proc.waitFor
802
803     //wait until output is fully read/completed.
804     outputReaderThread.join()
805
806     ins.close()
807   }
808
809
810   private [this] def createUser(date:DATE) : (JSON,UID) = {
811     val mid = nextID
812     val id = "im.%d.create.user".format(mid)
813     val millis = format.parse(date).getTime
814     val occurredMillis = millis
815     val receivedMillis = millis
816     val userID = "user%d@grnet.gr".format(mid)
817     val clientID = "astakos"
818     val isActive = false
819     val role = "default"
820     val eventVersion = "1.0"
821     val eventType = "create"
822
823     val msg = MessageFactory.newIMEventMsg(id,occurredMillis,receivedMillis,userID, clientID, isActive,role,eventVersion,eventType)
824     val json = AvroHelpers.jsonStringOfSpecificRecord(msg)
825     (json, mid)
826   }
827
828   private [this] def addCredits(date:DATE,uid:UID,amount:Long) : JSON = {
829     val id = "im.%d.add.credits".format(nextID)
830     val millis = format.parse(date).getTime
831     val occurredMillis = millis
832     val receivedMillis = millis
833     val userID = "user%d@grnet.gr".format(uid)
834     val clientID = "astakos"
835     val isActive = false
836     val eventVersion = "1.0"
837     val resource = "addcredits"
838     val instanceID = "addcredits"
839
840     val msg = MessageFactory.newResourceEventMsg(id, occurredMillis, receivedMillis, userID, clientID, resource, instanceID, amount.toString, eventVersion)
841     val json = AvroHelpers.jsonStringOfSpecificRecord(msg)
842     json
843   }
844
845   private [this] def makePithos(date:DATE,uid:UID,path:String,
846                                 value:Double,action:String) : JSON = {
847     val id = "rc.%d.object.%s".format(nextID,action)
848     val millis = format.parse(date).getTime
849     val occurredMillis = millis
850     val receivedMillis = millis
851     val userID = "user%d@grnet.gr".format(uid)
852     val clientID = "pithos"
853     val resource ="diskspace"
854     val instanceID = "1"
855     val eventVersion = "1.0"
856     val details = MessageFactory.newDetails(
857       MessageFactory.newStringDetail("action", "object %s".format(action)),
858       MessageFactory.newStringDetail("total", "0.0"),
859       MessageFactory.newStringDetail("user", userID),
860       MessageFactory.newStringDetail("path", path)
861     )
862
863     val msg = MessageFactory.newResourceEventMsg(id, occurredMillis, receivedMillis, userID, clientID, resource, instanceID, value.toString, eventVersion, details)
864     val json = AvroHelpers.jsonStringOfSpecificRecord(msg)
865     json
866   }
867
868   private[this] def sendCreate(date:DATE) : UID = {
869     val (json,uid) = createUser(date)
870     aquarium(Aquarium.EnvKeys.rabbitMQProducer).
871     sendMessage(astakosExchangeName,astakosRoutingKey,json)
872     Console.err.println("Sent message:\n%s\n".format(json))
873     uid
874   }
875
876   private[this] def sendAddCredits(date:DATE,uid:UID,amount:Long) = {
877     val json = addCredits(date,uid,amount)
878     aquarium(Aquarium.EnvKeys.rabbitMQProducer).
879     sendMessage(astakosExchangeName,astakosRoutingKey,
880                 json)
881     Console.err.println("Sent message:\n%s\n".format(json))
882   }
883
884   private[this] def sendPithos(date:DATE,uid:UID,path:String,
885                                value:Double,action:String) = {
886     val json = makePithos(date,uid,path,value,action)
887     aquarium(Aquarium.EnvKeys.rabbitMQProducer).
888     sendMessage(pithosExchangeName,pithosRoutingKey,
889                 json)
890     Console.err.println("Sent message:\n%s\n".format(json))
891   }
892
893   private[this] def jsonOf(url:String) : JSON = {
894      val in = new BufferedReader(
895                          new InputStreamReader(
896                          new URL(url).openConnection().
897                          getInputStream()))
898       var inputLine = ""
899       var ret = ""
900       while ({inputLine = in.readLine();inputLine} != null)
901         ret += (if(ret.isEmpty) "" else "\n")+ inputLine
902       in.close()
903       ret
904   }
905
906   private[this] def getBill(uid:Long,from:String,to:String) : JSON = {
907     val fromMillis = format.parse(from).getTime
908     val toMillis   = format.parse(to).getTime
909     val billURL = " http://localhost:8888/user/user%d@grnet.gr/bill/%d/%d".format(uid,fromMillis,toMillis)
910     try{
911       jsonOf(billURL)
912     } catch {
913       case e:Exception =>
914         ""
915     }
916   }
917
918   private[this] def sleep(l:Long) = {
919   try {
920       Thread.sleep(l)
921     } catch {
922       case ex:InterruptedException =>
923         Thread.currentThread().interrupt()
924     }
925   }
926
927
928   private[this] def testCase1() : JSON  = {
929     /* GET BILL FROM TO*/
930     val billFromDate = "00/00/00/01/08/2012"
931     val billToDate= "23/59/59/31/08/2012"
932     /* USER Creation */
933     val creationDate = "15/00/00/03/08/2012"
934     /* ADD CREDITS */
935     val addCreditsDate = "18/15/00/05/08/2012"
936     val creditsToAdd = 6000
937     /* Pithos STUFF */
938     val pithosPath = "/Papers/GOTO_HARMFUL.PDF"
939
940     val pithosDate1 = "20/30/00/05/08/2012"
941     val pithosAction1 = "update"
942     val pithosValue1 = 2000
943
944
945     val pithosDate2 = "21/05/00/15/08/2012"
946     val pithosAction2 = "update"
947     val pithosValue2 = 4000
948
949
950     val pithosDate3 = "08/05/00/20/08/2012"
951     val pithosAction3 = "update"
952     val pithosValue3 = 100
953
954     val id =
955       sendCreate(creationDate)
956       //Thread.sleep(5000)
957       sendAddCredits(addCreditsDate,id,creditsToAdd)
958       //Thread.sleep(5000)
959       sendPithos(pithosDate1,id,pithosPath,pithosValue1,pithosAction1)
960       //Thread.sleep(5000)
961       sendPithos(pithosDate2,id,pithosPath,pithosValue2,pithosAction2)
962       //
963       sendPithos(pithosDate3,id,pithosPath,pithosValue3,pithosAction3)
964
965
966     Console.err.println("Waiting for stuff to be processed")
967     Thread.sleep(5000)
968
969     var resp = ""
970     var count = 0
971     while(resp.isEmpty && count < 5){
972       if(count > 0) Console.err.println("Retrying for bill request.")
973       resp = getBill(id,billFromDate,billToDate)
974       if(resp.isEmpty) Thread.sleep(1000)
975       //sleep(1000L)
976       count += 1
977     }
978     Console.err.println("Sending URL done")
979     resp
980   }
981
982   def runTestCase(f: => JSON) = {
983     var json = ""
984     aquarium.start
985     Thread.sleep(2000)
986     try{
987       json = f
988     }  catch{
989       case e:Exception =>
990         e.printStackTrace
991     }
992     aquarium.stop
993     Thread.sleep(1000)
994     Console.err.println("Response : " + json )
995   }
996
997   def main(args: Array[String]) = {
998     //Console.err.println("JSON: " +  (new BillEntry).toJsonString)
999     runTestCase(testCase1)
1000   }
1001 }
1002 */