2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium
38 import com.ckkloverdos.props.Props
39 import converter.{JsonTextFormat, StdConverters}
40 import gr.grnet.aquarium.message.avro.{AvroHelpers, MessageFactory}
41 import java.io.{InputStreamReader, BufferedReader, File}
43 import java.util.concurrent.atomic.AtomicLong
44 import gr.grnet.aquarium.util.{Lock, Loggable}
45 import java.util.{Date, Calendar, GregorianCalendar}
46 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
47 import gr.grnet.aquarium.policy.CronSpec
48 import gr.grnet.aquarium.message.avro.gen.{BillEntryMsg, IMEventMsg, ResourceEventMsg}
49 import org.apache.avro.specific.SpecificRecord
50 import util.json.JsonSupport
54 * @author Prodromos Gerakios <pgerakios@grnet.gr>
59 private[this] val counter = new AtomicLong(0L)
60 def next() = counter.getAndIncrement
61 def random(min:Int=Int.MinValue,max:Int=Int.MaxValue) =
62 min + (scala.math.random.toInt % (max+1)) % (max+1)
64 def random[A](l:List[A]) : A = {
66 if(sz==0) throw new Exception("random")
72 private[this] def exec(cmd : String,func : String=>Unit) : Unit = {
73 val commands = cmd.split(" ")
74 val proc = new ProcessBuilder(commands: _*).redirectErrorStream(true).start();
75 val ins = new java.io.BufferedReader(new java.io.InputStreamReader(proc.getInputStream))
76 val sb = new StringBuilder
78 //spin off a thread to read process output.
79 val outputReaderThread = new Thread(new Runnable(){
81 var ln : String = null
82 while({ln = ins.readLine; ln != null})
86 outputReaderThread.start()
88 //suspense this main thread until sub process is done.
91 //wait until output is fully read/completed.
92 outputReaderThread.join()
96 def exec(cmd:String) : Unit = exec(cmd,Console.err.println(_))
100 def clear = Process.exec("mongo aquarium --eval db.resevents.remove();db.imevents.remove();db.policies.remove();db.userstates.remove()")
103 object AquariumInstance {
104 //val propsfile = new FileStreamResource(new File("aquarium.properties"))
105 var props: Props = ResourceLocator.AquariumProperties
106 // Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
109 new AquariumBuilder(props, ResourceLocator.DefaultPolicyMsg).
110 //update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
111 update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))).
115 private[this] val count=new java.util.concurrent.atomic.AtomicLong()
116 def run(billWait:Int, stop:Int)(f : => Unit) = {
117 if(count.addAndGet(1) == 1)
119 Thread.sleep(billWait)
123 Console.err.println("Stopping aquarium")
124 if(count.addAndGet(-1) == 0)
127 Console.err.println("Stopping aquarium --- DONE")
133 private[this] final val lock = new Lock()
134 private[this] var _log : List[String] = Nil
135 def add(json:String) = lock.withLock(_log = _log ::: List(json))
136 def get() : List[String] = lock.withLock(_log.toList)
139 /*object MessageQueue {
140 private[this] final val lock = new Lock()
141 private[this] var _sortedMsgs = SortedMap[Timeslot,(String,String,String)]
144 object MessageService {
146 def send(event:SpecificRecord, rabbitMQEnabled : Boolean = false, debugEnabled:Boolean =false) = {
147 val json = AvroHelpers.jsonStringOfSpecificRecord(event)
149 val (exchangeName,routingKey) = event match {
150 case rc:ResourceEventMsg => rc.getResource match {
152 ("cyclades","cyclades.resource.vmtime")
154 ("pithos","pithos.resource.diskspace")
156 ("astakos","astakos.resource")
158 throw new Exception("send cast failed: %s".format(x))
160 case im:IMEventMsg =>
161 ("astakos","astakos.user")
163 throw new Exception("send cast failed")
165 AquariumInstance.aquarium(Aquarium.EnvKeys.rabbitMQProducer).
166 sendMessage(exchangeName,routingKey,json)
168 val uid = event match {
169 case rcevent: ResourceEventMsg =>
170 AquariumInstance.aquarium.resourceEventStore.insertResourceEvent(rcevent)
172 case imevent: IMEventMsg =>
173 AquariumInstance.aquarium.imEventStore.insertIMEvent(imevent)
176 val userActorRef = AquariumInstance.aquarium.akkaService.getOrCreateUserActor(uid)
179 val millis = event match {
180 case rc:ResourceEventMsg => rc.getOccurredMillis
181 case im:IMEventMsg => im.getOccurredMillis
183 JsonLog.add(/*new Date(millis).toString + " ---- " +*/ json)
185 Console.err.println("Sent message:\n%s - %s\n".format(new Date(millis).toString,json))
189 abstract class Message {
191 val cal = new GregorianCalendar
192 var _range : Timeslot = null
193 var _cronSpec : CronSpec = null
194 var _messagesSent = 0
196 var _map = Map[String,String]()
198 def updateMap(args:Tuple2[String,String]*) : Message =
199 updateMap(args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
201 def updateMap(map:Map[String,String]) : Message = {
202 def mergeMap[A, B](ms: List[Map[A, B]])(f: (B, B) => B): Map[A, B] =
203 (Map[A, B]() /: (for (m <- ms; kv <- m) yield kv)) { (a, kv) =>
204 a + (if (a.contains(kv._1)) kv._1 -> f(a(kv._1), kv._2) else kv)
206 _map = mergeMap(List(_map,map))((v1,v2) => v2)
207 (_map.get("month"),_map.get("spec")) match {
208 case (Some((month0:String)),Some(spec)) =>
209 val month : Int = month0.toInt
210 if((_cronSpec==null || _cronSpec.cronSpec != spec ||cal.get(Calendar.MONTH) != month -1)) {
211 val d1 = getDate(1,if(month==12) 1 else month+1,year,0,0,0)
212 val d0 = getDate(1,month,year,0,0,0)
213 _range = Timeslot(d0,d1 - 1000)
214 _cronSpec = new CronSpec(if(spec.isEmpty) "* * * * *" else spec)
222 def sentMessages = _messagesSent
224 def nextTime : Option[Long] = nextTime(false)
226 def nextTime(update:Boolean) : Option[Long] = {
231 _cronSpec.nextValidDate(_range,cal.getTime) match {
233 val millis = d.getTime
234 if(update) cal.setTimeInMillis(millis)
243 cal.setTimeInMillis(System.currentTimeMillis())
244 cal.get(Calendar.YEAR)
247 def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int,sec:Int) : Long = {
248 cal.set(year,month-1,day,hour,min,sec)
252 def getMillis : Long = cal.getTimeInMillis
254 def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int) : Long =
255 getDate(day,month,year,hour,min,0)
257 def setMillis(millis:Long) = {
258 cal.setTimeInMillis(millis)
261 def addMillis(day:Int,hour:Int) = {
262 cal.roll(Calendar.DATE,day)
263 cal.roll(Calendar.DATE,hour)
266 def nextID = UID.next
268 def makeEvent(millis:Long,map:Map[String,String]) : SpecificRecord
270 def send(args:Tuple2[String,String]*) : Boolean =
271 send(args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
273 def send(map:Map[String,String]) : Boolean = {
274 nextTime(true) match {
277 val event = makeEvent(millis,_map)
278 val ren = _map.getOrElse("rabbitMQEnabled","false").toBoolean
279 val rdb = _map.getOrElse("debugEnabled","false").toBoolean
280 MessageService.send(event,ren,rdb)
291 class DiskMessage extends Message {
294 * "action" -> "update" , "delete" , "purge"
299 def makeEvent(millis:Long,map:Map[String,String]) = {
300 val action = map("action")
302 val path = map("path")
303 val value = map("value")
304 val id = "rc.%d.object.%s".format(nextID,action)
305 val occurredMillis = millis
306 val receivedMillis = millis
307 val userID = uid //"user%s@grnet.gr".format(uid)
308 val clientID = "pithos"
309 val resource ="diskspace"
311 val eventVersion = "1.0"
312 val details = MessageFactory.newDetails(
313 MessageFactory.newStringDetail("action", "object %s".format(action)),
314 MessageFactory.newStringDetail("total", "0.0"),
315 MessageFactory.newStringDetail("user", userID),
316 MessageFactory.newStringDetail("path", path)
319 val msg = MessageFactory.newResourceEventMsg(
321 occurredMillis, receivedMillis,
323 resource, instanceID,
334 class VMMessage extends Message {
337 * uid -> unique id for user
338 * instanceID -> "cyclades.vm.kJSOLek"
339 * vmName -> "My Lab VM"
340 * status -> "on", "off" , "destroy"
344 if(_status=="off") _status = "on" else _status = "off"
347 def makeEvent(millis:Long,map:Map[String,String]) = {
349 val value = /* map("status")*/nextStatus match {
352 case "destroy" => "2"
353 case x => throw new Exception("VMMessage bad status: %s".format(x))
355 val id = "rc.%d.vmtime".format(nextID)
356 val occurredMillis = millis
357 val receivedMillis = millis
358 val userID = uid // "user%s@grnet.gr".format(uid)
359 val clientID = "cyclades"
360 val resource ="vmtime"
361 val instanceID = map("instanceID")
362 val eventVersion = "1.0"
363 val details = MessageFactory.newDetails(
364 MessageFactory.newStringDetail("VM Name", map("vmName"))
367 val msg = MessageFactory.newResourceEventMsg(
369 occurredMillis, receivedMillis,
371 resource, instanceID,
382 class CreationMessage extends Message {
387 def makeEvent(millis:Long,map:Map[String,String]) = {
388 val uid = map("uid") //
389 val id = "im.%d.create.user".format(nextID)
390 val occurredMillis = millis
391 val receivedMillis = millis
392 val userID = uid //"user%d@grnet.gr".format(mid)
393 val clientID = "astakos"
396 val eventVersion = "1.0"
397 val eventType = "create"
399 val msg = MessageFactory.newIMEventMsg(
401 occurredMillis, receivedMillis,
405 eventVersion, eventType,
406 MessageFactory.newDetails(),
414 class AddCreditsMessage extends Message {
420 def makeEvent(millis:Long,map:Map[String,String]) = {
421 val uid = map("uid") //
422 val amount = map("amount")
423 val id = "im.%d.add.credits".format(nextID)
424 val occurredMillis = millis
425 val receivedMillis = millis
426 val userID = uid //"user%d@grnet.gr".format(uid)
427 val clientID = "astakos"
430 val eventVersion = "1.0"
431 val eventType = "addcredits"
432 val msg = MessageFactory.newResourceEventMsg(
434 occurredMillis, receivedMillis,
436 "addcredits", "addcredits",
439 MessageFactory.newDetails(),
448 def apply(typ:String,args:Tuple2[String,String]*) : Message =
449 apply(typ,args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
451 val msgMap = Map[String,()=>Message](
452 "vm" -> (() => new VMMessage),
453 "disk" -> (() => new DiskMessage),
454 "create" -> (() => new CreationMessage),
455 "credits" -> (() => new AddCreditsMessage)
458 def apply(typ:String,map:Map[String,String]) : Message = {
459 val msg = msgMap.getOrElse(typ,throw new Exception("Invalid type : "+typ))()
466 class User(serverAndPort:String,month:Int) {
467 val uid = "user%d@grnet.gr".format(UID.next)
468 val _creationMessage : Message = Message("create","uid"->uid,"month"->month.toString,"spec"->"")
469 var _resources : List[Message] = Nil
470 var _billEntryMsg :Option[BillEntryMsg] = None
472 override def toString() = uid
474 def validateResults() : Boolean = {
475 throw new Exception("Not implemented !!!!")
478 def printResults() = {
479 Console.err.println("Messages sent:")
480 for { m <- JsonLog.get}
481 Console.err.println("%s".format(m)) //"\n==============\n%s\n==============="
482 Console.err.println("\n=========================\n")
483 Console.err.println("Response:\n" + (_billEntryMsg match {
484 case None => "NONE!!!!"
485 case Some(r) => AvroHelpers.jsonStringOfSpecificRecord(r)
489 def add(no:Int,typ:String,args:Tuple2[String,String]*) : User =
490 add(no,typ,args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
492 def add(no:Int,typ:String,map:Map[String,String]) : User =
493 add(no,typ,{_ => map})
495 def add(no:Int,typ:String,map:Int=>Map[String,String]) : User = {
497 val map0 : Map[String,String] = map(i) + ("uid"->uid) + ("month"->month.toString)
498 _resources = Message(typ,map0) :: _resources
503 def addVMs(no:Int,cronSpec:String) : User =
505 Map("instanceID"->"cyclades.vm.%d".format(i),
506 "vmName" -> "Virtual Machine #%d".format(i),
507 "status" -> "on", // initially "on" msg
508 "spec" -> cronSpec)})
510 def addFiles(no:Int,action:String/*,value:Int,minVal:Int,maxVal:Int*/,spec:String) : User =
512 //Console.err.println("Adding file : " + "/Papers/file_%d.PDF".format(i))
513 Map("action" -> action,
514 "path"->"/Papers/file_%d.PDF".format(i),
515 //"value"->UID.random(minVal,maxVal).toString,
520 def addCredits(no:Int,spec:String) : User = {
521 add(no,"credits",/*"month"->month.toString,"uid"->uid,*/"spec"->spec/*,"amount"->amount.toString*/)
524 def run(ordered:Boolean,wait:Int,minFile:Int,maxFile:Int,minAmount:Int,maxAmount:Int,maxJSONRetry :Int,
525 sendViaRabbitMQ:Boolean, sendDebugEnabled : Boolean) = {
526 var _messagesSent : List[Message] = Nil
527 _creationMessage.send("month"->month.toString,"uid"->uid,"spec"->"0 0 * %d ?".format(month)) // send once!
529 var iter = _resources.toList
531 iter = (if(!ordered) iter
532 else iter.sortWith{(m1,m2) => (m1.nextTime,m2.nextTime) match {
533 case (Some(l1),Some(l2)) => l1 <= l2
534 case (None,None) => true
535 case (None,Some(l)) => true
536 case (Some(l),None) => false
538 _messagesSent = _messagesSent ::: List(m)
539 m.send("value"->UID.random(minFile,maxFile).toString,
540 "amount"->UID.random(minAmount,maxAmount).toString,
541 "rabbitMQEnabled" -> sendViaRabbitMQ.toString,
542 "debugEnabled" -> sendDebugEnabled.toString
543 //"status" -> UID.random(List("off","on"))
546 _billEntryMsg = getBillResponse(maxJSONRetry)
549 private[this] def getBillResponse(max:Int) : Option[BillEntryMsg] = {
550 def get () : String = {
551 val fromMillis = _creationMessage._range.from.getTime
552 val toMillis = _creationMessage._range.to.getTime
553 val url = " http://%s/user/%s/bill/%d/%d".format(serverAndPort,uid,fromMillis,toMillis)
555 val in = new BufferedReader(
556 new InputStreamReader(
557 new URL(url).openConnection().
561 while ({inputLine = in.readLine();inputLine} != null)
562 ret += (if(ret.isEmpty) "" else "\n")+ inputLine
572 var ret : Option[BillEntryMsg] = None
573 while(resp.isEmpty && count < max){
574 if(count > 0) Console.err.println("Retrying for bill request.")
576 if(resp.isEmpty) Thread.sleep(1000)
579 var b = AvroHelpers.specificRecordOfJsonString(resp, new BillEntryMsg)
581 if(b.getStatus().equals("processing")){
599 val resType : String, // Message.msgMap.keys
601 val cronSpec : String
603 extends JsonSupport {}
606 val ignoreScenario : Boolean,
609 val sendOrdered : Boolean,
610 val sendViaRabbitMQ : Boolean,
611 val sendDebugEnabled : Boolean,
612 val validationEnabled : Boolean,
613 val billingMonth: Long,
614 val aquariumStartWaitMillis : Long,
615 val aquariumStopWaitMillis : Long,
616 val billResponseWaitMillis : Long,
617 val numberOfUsers : Long,
618 val numberOfResponseRetries : Long,
619 val minFileCredits : Long,
620 val maxFileCredits : Long,
621 val minUserCredits : Long,
622 val maxUserCredits : Long,
623 val resources : List[Resource]
625 extends JsonSupport {}
627 case class Scenarios(
628 val scenarios : List[Scenario] )
629 extends JsonSupport {}
631 object ScenarioRunner {
632 val aquarium = AquariumInstance.aquarium
634 def parseScenario(txt:String) : Scenario =
635 StdConverters.AllConverters.convertEx[Scenario](JsonTextFormat(txt))
637 def parseScenarios(txt:String) : Scenarios =
638 StdConverters.AllConverters.convertEx[Scenarios](JsonTextFormat(txt))
640 def runScenario(txt:String) : Unit = runScenario(parseScenario(txt))
642 private[this] def runUser(s:Scenario) : User = {
643 val user = new User("%s:%d".format(s.host,s.port),s.billingMonth.toInt)
644 val (minFileCredits,maxFileCredits) = (s.minFileCredits,s.maxFileCredits)
645 val (minUserCredits,maxUserCredits) = (s.maxUserCredits,s.maxUserCredits)
646 //Cron spec minutes hours day-of-month Month Day-of-Week (we do not specify seconds)
647 AquariumInstance.run(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt) {
648 for{ r <- s.resources} // create messages
651 user.addVMs(r.instances.toInt,r.cronSpec)
653 user.addFiles(r.instances.toInt,"update",r.cronSpec)
655 user.addCredits(r.instances.toInt,r.cronSpec)
658 user.run(s.sendOrdered,s.billResponseWaitMillis.toInt,s.minFileCredits.toInt,
659 s.maxFileCredits.toInt,s.minUserCredits.toInt,s.maxUserCredits.toInt,
660 s.numberOfResponseRetries.toInt,s.sendViaRabbitMQ,s.sendDebugEnabled)
665 def runScenario(s:Scenario): Unit = {
666 if(s.ignoreScenario == false) {
667 Console.err.println("=================\nRunning scenario:\n %s\n=======================\n".format(s.toJsonString))
668 val tasks = for { u <- 1 to s.numberOfUsers.toInt}
669 yield scala.actors.Futures.future(runUser(s))
670 val users = for { u <- tasks} yield u()
673 if(s.validationEnabled && u.validateResults() == false)
674 Console.err.println("Validation FAILED for user " + u)
676 Console.err.println("\n=========================\nStopping scenario\n=======================")
680 def runScenarios(txt:String) : Unit = runScenarios(parseScenarios(txt))
682 def runScenarios(ss:Scenarios) = {
683 Console.err.println("=================\nScenarios:\n %s\n=======================\n".format(ss.toJsonString))
684 ss.scenarios.foreach(runScenario(_))
689 object UserTest extends Loggable {
692 //add(1,"credits","month"->month.toString,"uid"->uid,"spec"->spec,"amount"->amount.toString)
696 val sendOrdered : Boolean,
697 val sendViaRabbitMQ : Boolean,
698 val sendDebugEnabled : Boolean,
699 val validationEnabled : Boolean,
700 val billingMonth: Long,
701 val aquariumStartWaitMillis : Long,
702 val aquariumStopWaitMillis : Long,
703 val billResponseWaitMillis : Long,
704 val numberOfUsers : Long,
705 val numberOfResponseRetries : Long,
706 val minFileCredits : Long,
707 val maxFileCredits : Long,
708 val minUserCredits : Long,
709 val maxUserCredits : Long,
710 val resources : List[Resource]
713 val basic = new Scenario(false,"localhost",8888,true,false,false,false,9,2000,2000,2000,
714 1,10,2000,5000,10000,50000,List[Resource](
715 new Resource("credits",1, "00 00 10,12 9 ?"),
716 new Resource("disk",1,"00 18 15,20,29,30 9 ?"),
717 new Resource("vm",1,"00 18 14,17,19,20 9 ?")
720 def main(args: Array[String]) = {
723 val lines = scala.io.Source.fromFile(args.head).mkString
724 ScenarioRunner.runScenarios(lines)
728 ScenarioRunner.runScenarios(new Scenarios(List(basic)))
732 /* val user = new User("localhost:8888",9)
733 val (minFileCredits,maxFileCredits) = (2000,5000)
734 val (minUserCredits,maxUserCredits) = (10000,10000)
735 //Cron spec minutes hours day-of-month Month Day-of-Week (we do not specify seconds)
737 val json =AquariumInstance.run(2000,2000) {
739 addCredits(1,"00 00 10,12 9 ?").
740 addFiles(1,"update",2000,1000,3000,"00 18 15,20,29,30 9 ?").
741 addVMs(1,"00 18 14,17,19,20 9 ?").
742 //addVMs(5,"on","00 18 ? 9 Tue")
743 run(true,2000,minFileCredits,maxFileCredits,minUserCredits,maxUserCredits)
746 Console.err.println("Messages sent:")
747 for { m <- JsonLog.get}
748 Console.err.println("%s".format(m)) //"\n==============\n%s\n==============="
749 Console.err.println("\n=========================\n")
750 Console.err.println("Response:\n" + json)*/
757 object BillTest extends Loggable {
763 private[this] val counter = new AtomicLong(0L)
764 private[this] def nextID() = counter.getAndIncrement
766 private [this] val format = new SimpleDateFormat("HH/mm/s/dd/MM/yyyy");
768 val propsfile = new FileStreamResource(new File("aquarium.properties"))
770 var props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
772 val (astakosExchangeName,astakosRoutingKey) = ("astakos","astakos.user")
774 val (pithosExchangeName,pithosRoutingKey) = ("pithos","pithos.resource.diskspace")
777 exec("mongo aquarium --eval db.resevents.remove();db.imevents.remove();db.policies.remove();db.userstates.remove()",
778 Console.err.println(_))
779 new AquariumBuilder(props, ResourceLocator.DefaultPolicyMsg).
780 //update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
781 update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))).
786 private[this] def exec(cmd : String,func : String=>Unit) : Unit = {
787 val commands = cmd.split(" ")
788 val proc = new ProcessBuilder(commands: _*).redirectErrorStream(true).start();
789 val ins = new java.io.BufferedReader(new java.io.InputStreamReader(proc.getInputStream))
790 val sb = new StringBuilder
792 //spin off a thread to read process output.
793 val outputReaderThread = new Thread(new Runnable(){
795 var ln : String = null
796 while({ln = ins.readLine; ln != null})
800 outputReaderThread.start()
802 //suspense this main thread until sub process is done.
805 //wait until output is fully read/completed.
806 outputReaderThread.join()
812 private [this] def createUser(date:DATE) : (JSON,UID) = {
814 val id = "im.%d.create.user".format(mid)
815 val millis = format.parse(date).getTime
816 val occurredMillis = millis
817 val receivedMillis = millis
818 val userID = "user%d@grnet.gr".format(mid)
819 val clientID = "astakos"
822 val eventVersion = "1.0"
823 val eventType = "create"
825 val msg = MessageFactory.newIMEventMsg(id,occurredMillis,receivedMillis,userID, clientID, isActive,role,eventVersion,eventType)
826 val json = AvroHelpers.jsonStringOfSpecificRecord(msg)
830 private [this] def addCredits(date:DATE,uid:UID,amount:Long) : JSON = {
831 val id = "im.%d.add.credits".format(nextID)
832 val millis = format.parse(date).getTime
833 val occurredMillis = millis
834 val receivedMillis = millis
835 val userID = "user%d@grnet.gr".format(uid)
836 val clientID = "astakos"
838 val eventVersion = "1.0"
839 val resource = "addcredits"
840 val instanceID = "addcredits"
842 val msg = MessageFactory.newResourceEventMsg(id, occurredMillis, receivedMillis, userID, clientID, resource, instanceID, amount.toString, eventVersion)
843 val json = AvroHelpers.jsonStringOfSpecificRecord(msg)
847 private [this] def makePithos(date:DATE,uid:UID,path:String,
848 value:Double,action:String) : JSON = {
849 val id = "rc.%d.object.%s".format(nextID,action)
850 val millis = format.parse(date).getTime
851 val occurredMillis = millis
852 val receivedMillis = millis
853 val userID = "user%d@grnet.gr".format(uid)
854 val clientID = "pithos"
855 val resource ="diskspace"
857 val eventVersion = "1.0"
858 val details = MessageFactory.newDetails(
859 MessageFactory.newStringDetail("action", "object %s".format(action)),
860 MessageFactory.newStringDetail("total", "0.0"),
861 MessageFactory.newStringDetail("user", userID),
862 MessageFactory.newStringDetail("path", path)
865 val msg = MessageFactory.newResourceEventMsg(id, occurredMillis, receivedMillis, userID, clientID, resource, instanceID, value.toString, eventVersion, details)
866 val json = AvroHelpers.jsonStringOfSpecificRecord(msg)
870 private[this] def sendCreate(date:DATE) : UID = {
871 val (json,uid) = createUser(date)
872 aquarium(Aquarium.EnvKeys.rabbitMQProducer).
873 sendMessage(astakosExchangeName,astakosRoutingKey,json)
874 Console.err.println("Sent message:\n%s\n".format(json))
878 private[this] def sendAddCredits(date:DATE,uid:UID,amount:Long) = {
879 val json = addCredits(date,uid,amount)
880 aquarium(Aquarium.EnvKeys.rabbitMQProducer).
881 sendMessage(astakosExchangeName,astakosRoutingKey,
883 Console.err.println("Sent message:\n%s\n".format(json))
886 private[this] def sendPithos(date:DATE,uid:UID,path:String,
887 value:Double,action:String) = {
888 val json = makePithos(date,uid,path,value,action)
889 aquarium(Aquarium.EnvKeys.rabbitMQProducer).
890 sendMessage(pithosExchangeName,pithosRoutingKey,
892 Console.err.println("Sent message:\n%s\n".format(json))
895 private[this] def jsonOf(url:String) : JSON = {
896 val in = new BufferedReader(
897 new InputStreamReader(
898 new URL(url).openConnection().
902 while ({inputLine = in.readLine();inputLine} != null)
903 ret += (if(ret.isEmpty) "" else "\n")+ inputLine
908 private[this] def getBill(uid:Long,from:String,to:String) : JSON = {
909 val fromMillis = format.parse(from).getTime
910 val toMillis = format.parse(to).getTime
911 val billURL = " http://localhost:8888/user/user%d@grnet.gr/bill/%d/%d".format(uid,fromMillis,toMillis)
920 private[this] def sleep(l:Long) = {
924 case ex:InterruptedException =>
925 Thread.currentThread().interrupt()
930 private[this] def testCase1() : JSON = {
931 /* GET BILL FROM TO*/
932 val billFromDate = "00/00/00/01/08/2012"
933 val billToDate= "23/59/59/31/08/2012"
935 val creationDate = "15/00/00/03/08/2012"
937 val addCreditsDate = "18/15/00/05/08/2012"
938 val creditsToAdd = 6000
940 val pithosPath = "/Papers/GOTO_HARMFUL.PDF"
942 val pithosDate1 = "20/30/00/05/08/2012"
943 val pithosAction1 = "update"
944 val pithosValue1 = 2000
947 val pithosDate2 = "21/05/00/15/08/2012"
948 val pithosAction2 = "update"
949 val pithosValue2 = 4000
952 val pithosDate3 = "08/05/00/20/08/2012"
953 val pithosAction3 = "update"
954 val pithosValue3 = 100
957 sendCreate(creationDate)
959 sendAddCredits(addCreditsDate,id,creditsToAdd)
961 sendPithos(pithosDate1,id,pithosPath,pithosValue1,pithosAction1)
963 sendPithos(pithosDate2,id,pithosPath,pithosValue2,pithosAction2)
965 sendPithos(pithosDate3,id,pithosPath,pithosValue3,pithosAction3)
968 Console.err.println("Waiting for stuff to be processed")
973 while(resp.isEmpty && count < 5){
974 if(count > 0) Console.err.println("Retrying for bill request.")
975 resp = getBill(id,billFromDate,billToDate)
976 if(resp.isEmpty) Thread.sleep(1000)
980 Console.err.println("Sending URL done")
984 def runTestCase(f: => JSON) = {
996 Console.err.println("Response : " + json )
999 def main(args: Array[String]) = {
1000 //Console.err.println("JSON: " + (new BillEntry).toJsonString)
1001 runTestCase(testCase1)