2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium
38 import com.ckkloverdos.props.Props
39 import com.ckkloverdos.resource.FileStreamResource
40 import gr.grnet.aquarium.converter.{StdConverters}
41 import gr.grnet.aquarium.message.avro.{AvroHelpers, MessageFactory}
42 import java.io.{InputStreamReader, BufferedReader, File}
44 import java.text.SimpleDateFormat
45 import java.util.concurrent.atomic.AtomicLong
46 import gr.grnet.aquarium.util.{Lock, Loggable}
47 import java.util.{Date, Calendar, GregorianCalendar}
48 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
49 import gr.grnet.aquarium.policy.CronSpec
50 import gr.grnet.aquarium.message.avro.gen.{BillEntryMsg, IMEventMsg, ResourceEventMsg}
51 import org.apache.avro.specific.SpecificRecord
52 import util.json.JsonSupport
57 * @author Prodromos Gerakios <pgerakios@grnet.gr>
62 private[this] val counter = new AtomicLong(0L)
63 def next() = counter.getAndIncrement
64 def random(min:Int=Int.MinValue,max:Int=Int.MaxValue) =
65 min + (scala.math.random.toInt % (max+1)) % (max+1)
67 def random[A](l:List[A]) : A = {
69 if(sz==0) throw new Exception("random")
75 private[this] def exec(cmd : String,func : String=>Unit) : Unit = {
76 val commands = cmd.split(" ")
77 val proc = new ProcessBuilder(commands: _*).redirectErrorStream(true).start();
78 val ins = new java.io.BufferedReader(new java.io.InputStreamReader(proc.getInputStream))
79 val sb = new StringBuilder
81 //spin off a thread to read process output.
82 val outputReaderThread = new Thread(new Runnable(){
84 var ln : String = null
85 while({ln = ins.readLine; ln != null})
89 outputReaderThread.start()
91 //suspense this main thread until sub process is done.
94 //wait until output is fully read/completed.
95 outputReaderThread.join()
99 def exec(cmd:String) : Unit = exec(cmd,Console.err.println(_))
103 def clear = Process.exec("mongo aquarium --eval db.resevents.remove();db.imevents.remove();db.policies.remove();db.userstates.remove()")
106 object AquariumInstance {
107 //val propsfile = new FileStreamResource(new File("aquarium.properties"))
108 var props: Props = ResourceLocator.AquariumProperties
109 // Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
112 new AquariumBuilder(props, ResourceLocator.DefaultPolicyMsg).
113 //update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
114 update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))).
117 def run(billWait:Int, stop:Int)(f : => Unit) = {
119 Thread.sleep(billWait)
123 Console.err.println("Stopping aquarium")
126 Console.err.println("Stopping aquarium --- DONE")
132 private[this] final val lock = new Lock()
133 private[this] var _log : List[String] = Nil
134 def add(json:String) = lock.withLock(_log = _log ::: List(json))
135 def get() : List[String] = lock.withLock(_log.toList)
138 /*object MessageQueue {
139 private[this] final val lock = new Lock()
140 private[this] var _sortedMsgs = SortedMap[Timeslot,(String,String,String)]
143 object MessageService {
145 def send(event:SpecificRecord, rabbitMQEnabled : Boolean = false, debugEnabled:Boolean =false) = {
146 val json = AvroHelpers.jsonStringOfSpecificRecord(event)
148 val (exchangeName,routingKey) = event match {
149 case rc:ResourceEventMsg => rc.getResource match {
151 ("cyclades","cyclades.resource.vmtime")
153 ("pithos","pithos.resource.diskspace")
155 ("astakos","astakos.resource")
157 throw new Exception("send cast failed: %s".format(x))
159 case im:IMEventMsg =>
160 ("astakos","astakos.user")
162 throw new Exception("send cast failed")
164 AquariumInstance.aquarium(Aquarium.EnvKeys.rabbitMQProducer).
165 sendMessage(exchangeName,routingKey,json)
167 val uid = event match {
168 case rcevent: ResourceEventMsg =>
169 AquariumInstance.aquarium.resourceEventStore.insertResourceEvent(rcevent)
171 case imevent: IMEventMsg =>
172 AquariumInstance.aquarium.imEventStore.insertIMEvent(imevent)
175 val userActorRef = AquariumInstance.aquarium.akkaService.getOrCreateUserActor(uid)
178 val millis = event match {
179 case rc:ResourceEventMsg => rc.getOccurredMillis
180 case im:IMEventMsg => im.getOccurredMillis
182 JsonLog.add(/*new Date(millis).toString + " ---- " +*/ json)
184 Console.err.println("Sent message:\n%s - %s\n".format(new Date(millis).toString,json))
188 abstract class Message {
190 val cal = new GregorianCalendar
191 var _range : Timeslot = null
192 var _cronSpec : CronSpec = null
193 var _messagesSent = 0
195 var _map = Map[String,String]()
197 def updateMap(args:Tuple2[String,String]*) : Message =
198 updateMap(args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
200 def updateMap(map:Map[String,String]) : Message = {
201 def mergeMap[A, B](ms: List[Map[A, B]])(f: (B, B) => B): Map[A, B] =
202 (Map[A, B]() /: (for (m <- ms; kv <- m) yield kv)) { (a, kv) =>
203 a + (if (a.contains(kv._1)) kv._1 -> f(a(kv._1), kv._2) else kv)
205 _map = mergeMap(List(_map,map))((v1,v2) => v2)
206 (_map.get("month"),_map.get("spec")) match {
207 case (Some((month0:String)),Some(spec)) =>
208 val month : Int = month0.toInt
209 if((_cronSpec==null || _cronSpec.cronSpec != spec ||cal.get(Calendar.MONTH) != month -1)) {
210 val d1 = getDate(1,if(month==12) 1 else month+1,year,0,0,0)
211 val d0 = getDate(1,month,year,0,0,0)
212 _range = Timeslot(d0,d1 - 1000)
213 _cronSpec = new CronSpec(if(spec.isEmpty) "* * * * *" else spec)
221 def sentMessages = _messagesSent
223 def nextTime : Option[Long] = nextTime(false)
225 def nextTime(update:Boolean) : Option[Long] = {
230 _cronSpec.nextValidDate(_range,cal.getTime) match {
232 val millis = d.getTime
233 if(update) cal.setTimeInMillis(millis)
242 cal.setTimeInMillis(System.currentTimeMillis())
243 cal.get(Calendar.YEAR)
246 def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int,sec:Int) : Long = {
247 cal.set(year,month-1,day,hour,min,sec)
251 def getMillis : Long = cal.getTimeInMillis
253 def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int) : Long =
254 getDate(day,month,year,hour,min,0)
256 def setMillis(millis:Long) = {
257 cal.setTimeInMillis(millis)
260 def addMillis(day:Int,hour:Int) = {
261 cal.roll(Calendar.DATE,day)
262 cal.roll(Calendar.DATE,hour)
265 def nextID = UID.next
267 def makeEvent(millis:Long,map:Map[String,String]) : SpecificRecord
269 def send(args:Tuple2[String,String]*) : Boolean =
270 send(args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
272 def send(map:Map[String,String]) : Boolean = {
273 nextTime(true) match {
276 val event = makeEvent(millis,_map)
277 val ren = _map.getOrElse("rabbitMQEnabled","false").toBoolean
278 val rdb = _map.getOrElse("debugEnabled","false").toBoolean
279 MessageService.send(event,ren,rdb)
290 class DiskMessage extends Message {
293 * "action" -> "update" , "delete" , "purge"
298 def makeEvent(millis:Long,map:Map[String,String]) = {
299 val action = map("action")
301 val path = map("path")
302 val value = map("value")
303 val id = "rc.%d.object.%s".format(nextID,action)
304 val occurredMillis = millis
305 val receivedMillis = millis
306 val userID = uid //"user%s@grnet.gr".format(uid)
307 val clientID = "pithos"
308 val resource ="diskspace"
310 val eventVersion = "1.0"
311 val details = MessageFactory.newDetails(
312 MessageFactory.newStringDetail("action", "object %s".format(action)),
313 MessageFactory.newStringDetail("total", "0.0"),
314 MessageFactory.newStringDetail("user", userID),
315 MessageFactory.newStringDetail("path", path)
318 val msg = MessageFactory.newResourceEventMsg(
320 occurredMillis, receivedMillis,
322 resource, instanceID,
333 class VMMessage extends Message {
336 * uid -> unique id for user
337 * instanceID -> "cyclades.vm.kJSOLek"
338 * vmName -> "My Lab VM"
339 * status -> "on", "off" , "destroy"
343 if(_status=="off") _status = "on" else _status = "off"
346 def makeEvent(millis:Long,map:Map[String,String]) = {
348 val value = /* map("status")*/nextStatus match {
351 case "destroy" => "2"
352 case x => throw new Exception("VMMessage bad status: %s".format(x))
354 val id = "rc.%d.vmtime".format(nextID)
355 val occurredMillis = millis
356 val receivedMillis = millis
357 val userID = uid // "user%s@grnet.gr".format(uid)
358 val clientID = "cyclades"
359 val resource ="vmtime"
360 val instanceID = map("instanceID")
361 val eventVersion = "1.0"
362 val details = MessageFactory.newDetails(
363 MessageFactory.newStringDetail("VM Name", map("vmName"))
366 val msg = MessageFactory.newResourceEventMsg(
368 occurredMillis, receivedMillis,
370 resource, instanceID,
381 class CreationMessage extends Message {
386 def makeEvent(millis:Long,map:Map[String,String]) = {
387 val uid = map("uid") //
388 val id = "im.%d.create.user".format(nextID)
389 val occurredMillis = millis
390 val receivedMillis = millis
391 val userID = uid //"user%d@grnet.gr".format(mid)
392 val clientID = "astakos"
395 val eventVersion = "1.0"
396 val eventType = "create"
398 val msg = MessageFactory.newIMEventMsg(
400 occurredMillis, receivedMillis,
404 eventVersion, eventType,
405 MessageFactory.newDetails(),
413 class AddCreditsMessage extends Message {
419 def makeEvent(millis:Long,map:Map[String,String]) = {
420 val uid = map("uid") //
421 val amount = map("amount")
422 val id = "im.%d.add.credits".format(nextID)
423 val occurredMillis = millis
424 val receivedMillis = millis
425 val userID = uid //"user%d@grnet.gr".format(uid)
426 val clientID = "astakos"
429 val eventVersion = "1.0"
430 val eventType = "addcredits"
431 val msg = MessageFactory.newResourceEventMsg(
433 occurredMillis, receivedMillis,
435 "addcredits", "addcredits",
438 MessageFactory.newDetails(),
447 def apply(typ:String,args:Tuple2[String,String]*) : Message =
448 apply(typ,args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
450 val msgMap = Map[String,()=>Message](
451 "vm" -> (() => new VMMessage),
452 "disk" -> (() => new DiskMessage),
453 "create" -> (() => new CreationMessage),
454 "credits" -> (() => new AddCreditsMessage)
457 def apply(typ:String,map:Map[String,String]) : Message = {
458 val msg = msgMap.getOrElse(typ,throw new Exception("Invalid type : "+typ))()
465 class User(serverAndPort:String,month:Int) {
466 val uid = "user%d@grnet.gr".format(UID.next)
467 val _creationMessage : Message = Message("create","uid"->uid,"month"->month.toString,"spec"->"")
468 var _resources : List[Message] = Nil
469 var _billEntryMsg :Option[BillEntryMsg] = None
471 override def toString() = uid
473 def validateResults() : Boolean = {
474 throw new Exception("Not implemented !!!!")
477 def printResults() = {
478 Console.err.println("Messages sent:")
479 for { m <- JsonLog.get}
480 Console.err.println("%s".format(m)) //"\n==============\n%s\n==============="
481 Console.err.println("\n=========================\n")
482 Console.err.println("Response:\n" + (_billEntryMsg match {
483 case None => "NONE!!!!"
484 case Some(r) => AvroHelpers.jsonStringOfSpecificRecord(r)
488 def add(no:Int,typ:String,args:Tuple2[String,String]*) : User =
489 add(no,typ,args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
491 def add(no:Int,typ:String,map:Map[String,String]) : User =
492 add(no,typ,{_ => map})
494 def add(no:Int,typ:String,map:Int=>Map[String,String]) : User = {
496 val map0 : Map[String,String] = map(i) + ("uid"->uid) + ("month"->month.toString)
497 _resources = Message(typ,map0) :: _resources
502 def addVMs(no:Int,cronSpec:String) : User =
504 Map("instanceID"->"cyclades.vm.%d".format(i),
505 "vmName" -> "Virtual Machine #%d".format(i),
506 "status" -> "on", // initially "on" msg
507 "spec" -> cronSpec)})
509 def addFiles(no:Int,action:String/*,value:Int,minVal:Int,maxVal:Int*/,spec:String) : User =
511 Map("action" -> action,
512 "path"->"/Papers/file_%d.PDF".format(i),
513 //"value"->UID.random(minVal,maxVal).toString,
518 def addCredits(no:Int,spec:String) : User = {
519 add(no,"credits",/*"month"->month.toString,"uid"->uid,*/"spec"->spec/*,"amount"->amount.toString*/)
522 def run(ordered:Boolean,wait:Int,minFile:Int,maxFile:Int,minAmount:Int,maxAmount:Int,maxJSONRetry :Int,
523 sendViaRabbitMQ:Boolean, sendDebugEnabled : Boolean) = {
524 var _messagesSent : List[Message] = Nil
525 _creationMessage.send("month"->month.toString,"uid"->uid,"spec"->"0 0 * %d ?".format(month)) // send once!
527 var iter = _resources.toList
529 iter = (if(!ordered) iter
530 else iter.sortWith{(m1,m2) => (m1.nextTime,m2.nextTime) match {
531 case (Some(l1),Some(l2)) => l1 <= l2
532 case (None,None) => true
533 case (None,Some(l)) => true
534 case (Some(l),None) => false
536 _messagesSent = _messagesSent ::: List(m)
537 m.send("value"->UID.random(minFile,maxFile).toString,
538 "amount"->UID.random(minAmount,maxAmount).toString,
539 "rabbitMQEnabled" -> sendViaRabbitMQ.toString,
540 "debugEnabled" -> sendDebugEnabled.toString
541 //"status" -> UID.random(List("off","on"))
544 _billEntryMsg = getBillResponse(maxJSONRetry)
547 private[this] def getBillResponse(max:Int) : Option[BillEntryMsg] = {
548 def get () : String = {
549 val fromMillis = _creationMessage._range.from.getTime
550 val toMillis = _creationMessage._range.to.getTime
551 val url = " http://%s/user/%s/bill/%d/%d".format(serverAndPort,uid,fromMillis,toMillis)
553 val in = new BufferedReader(
554 new InputStreamReader(
555 new URL(url).openConnection().
559 while ({inputLine = in.readLine();inputLine} != null)
560 ret += (if(ret.isEmpty) "" else "\n")+ inputLine
570 var ret : Option[BillEntryMsg] = None
571 while(resp.isEmpty && count < max){
572 if(count > 0) Console.err.println("Retrying for bill request.")
574 if(resp.isEmpty) Thread.sleep(1000)
577 var b = AvroHelpers.specificRecordOfJsonString(resp, new BillEntryMsg)
579 if(b.getStatus().equals("processing")){
597 val resType : String, // Message.msgMap.keys
599 val cronSpec : String
601 extends JsonSupport {}
604 val ignoreScenario : Boolean,
607 val sendOrdered : Boolean,
608 val sendViaRabbitMQ : Boolean,
609 val sendDebugEnabled : Boolean,
610 val validationEnabled : Boolean,
611 val billingMonth: Long,
612 val aquariumStartWaitMillis : Long,
613 val aquariumStopWaitMillis : Long,
614 val billResponseWaitMillis : Long,
615 val numberOfUsers : Long,
616 val numberOfResponseRetries : Long,
617 val minFileCredits : Long,
618 val maxFileCredits : Long,
619 val minUserCredits : Long,
620 val maxUserCredits : Long,
621 val resources : List[Resource]
623 extends JsonSupport {}
626 val scenarios : List[Scenario] )
627 extends JsonSupport {}
629 object ScenarioRunner {
630 val aquarium = AquariumInstance.aquarium
632 def parseScenario(txt:String) : Scenario =
633 StdConverters.AllConverters.convertEx[Scenario](txt)
635 def parseScenarios(txt:String) : Scenarios =
636 StdConverters.AllConverters.convertEx[Scenarios](txt)
638 def runScenario(txt:String) : Unit = runScenario(parseScenario(txt))
640 private[this] def runUser(s:Scenario) : User = {
641 val user = new User("%s:%d".format(s.host,s.port),s.billingMonth.toInt)
642 val (minFileCredits,maxFileCredits) = (s.minFileCredits,s.maxFileCredits)
643 val (minUserCredits,maxUserCredits) = (s.maxUserCredits,s.maxUserCredits)
644 //Cron spec minutes hours day-of-month Month Day-of-Week (we do not specify seconds)
645 AquariumInstance.run(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt) {
646 for{ r <- s.resources} // create messages
649 user.addVMs(r.instances.toInt,r.cronSpec)
651 user.addFiles(r.instances.toInt,"update",r.cronSpec)
653 user.addCredits(r.instances.toInt,r.cronSpec)
656 user.run(s.sendOrdered,s.billResponseWaitMillis.toInt,s.minFileCredits.toInt,
657 s.maxFileCredits.toInt,s.minUserCredits.toInt,s.maxUserCredits.toInt,
658 s.numberOfResponseRetries.toInt,s.sendViaRabbitMQ,s.sendDebugEnabled)
663 def runScenario(s:Scenario): Unit = {
664 if(s.ignoreScenario == false) {
665 Console.err.println("=================\nRunning scenario:\n %s\n=======================\n".format(s.toJsonString))
666 val tasks = for { u <- 1 to s.numberOfUsers.toInt}
667 yield scala.actors.Futures.future(runUser(s))
668 val users = for { u <- tasks} yield u()
671 if(s.validationEnabled && u.validateResults() == false)
672 Console.err.println("Validation FAILED for user " + u)
674 Console.err.println("\n=========================\nStopping scenario\n=======================")
678 def runScenarios(txt:String) : Unit = runScenarios(parseScenarios(txt))
680 def runScenarios(ss:Scenarios) = {
681 Console.err.println("=================\nScenarios:\n %s\n=======================\n".format(ss.toJsonString))
682 ss.scenarios.foreach(runScenario(_))
687 object UserTest extends Loggable {
690 //add(1,"credits","month"->month.toString,"uid"->uid,"spec"->spec,"amount"->amount.toString)
694 val sendOrdered : Boolean,
695 val sendViaRabbitMQ : Boolean,
696 val sendDebugEnabled : Boolean,
697 val validationEnabled : Boolean,
698 val billingMonth: Long,
699 val aquariumStartWaitMillis : Long,
700 val aquariumStopWaitMillis : Long,
701 val billResponseWaitMillis : Long,
702 val numberOfUsers : Long,
703 val numberOfResponseRetries : Long,
704 val minFileCredits : Long,
705 val maxFileCredits : Long,
706 val minUserCredits : Long,
707 val maxUserCredits : Long,
708 val resources : List[Resource]
711 val basic = new Scenario(false,"localhost",8888,true,false,false,false,9,2000,2000,2000,
712 1,10,2000,5000,10000,50000,List[Resource](
713 new Resource("credits",1, "00 00 10,12 9 ?"),
714 new Resource("disk",1,"00 18 15,20,29,30 9 ?"),
715 new Resource("vm",1,"00 18 14,17,19,20 9 ?")
718 def main(args: Array[String]) = {
721 val lines = scala.io.Source.fromFile(args.head).mkString
722 ScenarioRunner.runScenarios(new Scenarios(List(basic)))
726 ScenarioRunner.runScenarios(new Scenarios(List(basic)))
730 /* val user = new User("localhost:8888",9)
731 val (minFileCredits,maxFileCredits) = (2000,5000)
732 val (minUserCredits,maxUserCredits) = (10000,10000)
733 //Cron spec minutes hours day-of-month Month Day-of-Week (we do not specify seconds)
735 val json =AquariumInstance.run(2000,2000) {
737 addCredits(1,"00 00 10,12 9 ?").
738 addFiles(1,"update",2000,1000,3000,"00 18 15,20,29,30 9 ?").
739 addVMs(1,"00 18 14,17,19,20 9 ?").
740 //addVMs(5,"on","00 18 ? 9 Tue")
741 run(true,2000,minFileCredits,maxFileCredits,minUserCredits,maxUserCredits)
744 Console.err.println("Messages sent:")
745 for { m <- JsonLog.get}
746 Console.err.println("%s".format(m)) //"\n==============\n%s\n==============="
747 Console.err.println("\n=========================\n")
748 Console.err.println("Response:\n" + json)*/
755 object BillTest extends Loggable {
761 private[this] val counter = new AtomicLong(0L)
762 private[this] def nextID() = counter.getAndIncrement
764 private [this] val format = new SimpleDateFormat("HH/mm/s/dd/MM/yyyy");
766 val propsfile = new FileStreamResource(new File("aquarium.properties"))
768 var props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
770 val (astakosExchangeName,astakosRoutingKey) = ("astakos","astakos.user")
772 val (pithosExchangeName,pithosRoutingKey) = ("pithos","pithos.resource.diskspace")
775 exec("mongo aquarium --eval db.resevents.remove();db.imevents.remove();db.policies.remove();db.userstates.remove()",
776 Console.err.println(_))
777 new AquariumBuilder(props, ResourceLocator.DefaultPolicyMsg).
778 //update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
779 update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))).
784 private[this] def exec(cmd : String,func : String=>Unit) : Unit = {
785 val commands = cmd.split(" ")
786 val proc = new ProcessBuilder(commands: _*).redirectErrorStream(true).start();
787 val ins = new java.io.BufferedReader(new java.io.InputStreamReader(proc.getInputStream))
788 val sb = new StringBuilder
790 //spin off a thread to read process output.
791 val outputReaderThread = new Thread(new Runnable(){
793 var ln : String = null
794 while({ln = ins.readLine; ln != null})
798 outputReaderThread.start()
800 //suspense this main thread until sub process is done.
803 //wait until output is fully read/completed.
804 outputReaderThread.join()
810 private [this] def createUser(date:DATE) : (JSON,UID) = {
812 val id = "im.%d.create.user".format(mid)
813 val millis = format.parse(date).getTime
814 val occurredMillis = millis
815 val receivedMillis = millis
816 val userID = "user%d@grnet.gr".format(mid)
817 val clientID = "astakos"
820 val eventVersion = "1.0"
821 val eventType = "create"
823 val msg = MessageFactory.newIMEventMsg(id,occurredMillis,receivedMillis,userID, clientID, isActive,role,eventVersion,eventType)
824 val json = AvroHelpers.jsonStringOfSpecificRecord(msg)
828 private [this] def addCredits(date:DATE,uid:UID,amount:Long) : JSON = {
829 val id = "im.%d.add.credits".format(nextID)
830 val millis = format.parse(date).getTime
831 val occurredMillis = millis
832 val receivedMillis = millis
833 val userID = "user%d@grnet.gr".format(uid)
834 val clientID = "astakos"
836 val eventVersion = "1.0"
837 val resource = "addcredits"
838 val instanceID = "addcredits"
840 val msg = MessageFactory.newResourceEventMsg(id, occurredMillis, receivedMillis, userID, clientID, resource, instanceID, amount.toString, eventVersion)
841 val json = AvroHelpers.jsonStringOfSpecificRecord(msg)
845 private [this] def makePithos(date:DATE,uid:UID,path:String,
846 value:Double,action:String) : JSON = {
847 val id = "rc.%d.object.%s".format(nextID,action)
848 val millis = format.parse(date).getTime
849 val occurredMillis = millis
850 val receivedMillis = millis
851 val userID = "user%d@grnet.gr".format(uid)
852 val clientID = "pithos"
853 val resource ="diskspace"
855 val eventVersion = "1.0"
856 val details = MessageFactory.newDetails(
857 MessageFactory.newStringDetail("action", "object %s".format(action)),
858 MessageFactory.newStringDetail("total", "0.0"),
859 MessageFactory.newStringDetail("user", userID),
860 MessageFactory.newStringDetail("path", path)
863 val msg = MessageFactory.newResourceEventMsg(id, occurredMillis, receivedMillis, userID, clientID, resource, instanceID, value.toString, eventVersion, details)
864 val json = AvroHelpers.jsonStringOfSpecificRecord(msg)
868 private[this] def sendCreate(date:DATE) : UID = {
869 val (json,uid) = createUser(date)
870 aquarium(Aquarium.EnvKeys.rabbitMQProducer).
871 sendMessage(astakosExchangeName,astakosRoutingKey,json)
872 Console.err.println("Sent message:\n%s\n".format(json))
876 private[this] def sendAddCredits(date:DATE,uid:UID,amount:Long) = {
877 val json = addCredits(date,uid,amount)
878 aquarium(Aquarium.EnvKeys.rabbitMQProducer).
879 sendMessage(astakosExchangeName,astakosRoutingKey,
881 Console.err.println("Sent message:\n%s\n".format(json))
884 private[this] def sendPithos(date:DATE,uid:UID,path:String,
885 value:Double,action:String) = {
886 val json = makePithos(date,uid,path,value,action)
887 aquarium(Aquarium.EnvKeys.rabbitMQProducer).
888 sendMessage(pithosExchangeName,pithosRoutingKey,
890 Console.err.println("Sent message:\n%s\n".format(json))
893 private[this] def jsonOf(url:String) : JSON = {
894 val in = new BufferedReader(
895 new InputStreamReader(
896 new URL(url).openConnection().
900 while ({inputLine = in.readLine();inputLine} != null)
901 ret += (if(ret.isEmpty) "" else "\n")+ inputLine
906 private[this] def getBill(uid:Long,from:String,to:String) : JSON = {
907 val fromMillis = format.parse(from).getTime
908 val toMillis = format.parse(to).getTime
909 val billURL = " http://localhost:8888/user/user%d@grnet.gr/bill/%d/%d".format(uid,fromMillis,toMillis)
918 private[this] def sleep(l:Long) = {
922 case ex:InterruptedException =>
923 Thread.currentThread().interrupt()
928 private[this] def testCase1() : JSON = {
929 /* GET BILL FROM TO*/
930 val billFromDate = "00/00/00/01/08/2012"
931 val billToDate= "23/59/59/31/08/2012"
933 val creationDate = "15/00/00/03/08/2012"
935 val addCreditsDate = "18/15/00/05/08/2012"
936 val creditsToAdd = 6000
938 val pithosPath = "/Papers/GOTO_HARMFUL.PDF"
940 val pithosDate1 = "20/30/00/05/08/2012"
941 val pithosAction1 = "update"
942 val pithosValue1 = 2000
945 val pithosDate2 = "21/05/00/15/08/2012"
946 val pithosAction2 = "update"
947 val pithosValue2 = 4000
950 val pithosDate3 = "08/05/00/20/08/2012"
951 val pithosAction3 = "update"
952 val pithosValue3 = 100
955 sendCreate(creationDate)
957 sendAddCredits(addCreditsDate,id,creditsToAdd)
959 sendPithos(pithosDate1,id,pithosPath,pithosValue1,pithosAction1)
961 sendPithos(pithosDate2,id,pithosPath,pithosValue2,pithosAction2)
963 sendPithos(pithosDate3,id,pithosPath,pithosValue3,pithosAction3)
966 Console.err.println("Waiting for stuff to be processed")
971 while(resp.isEmpty && count < 5){
972 if(count > 0) Console.err.println("Retrying for bill request.")
973 resp = getBill(id,billFromDate,billToDate)
974 if(resp.isEmpty) Thread.sleep(1000)
978 Console.err.println("Sending URL done")
982 def runTestCase(f: => JSON) = {
994 Console.err.println("Response : " + json )
997 def main(args: Array[String]) = {
998 //Console.err.println("JSON: " + (new BillEntry).toJsonString)
999 runTestCase(testCase1)