root / src / test / scala / gr / grnet / aquarium / BillTest.scala @ 53389ed0
History | View | Annotate | Download (37.6 kB)
1 |
/* |
---|---|
2 |
* Copyright 2011-2012 GRNET S.A. All rights reserved. |
3 |
* |
4 |
* Redistribution and use in source and binary forms, with or |
5 |
* without modification, are permitted provided that the following |
6 |
* conditions are met: |
7 |
* |
8 |
* 1. Redistributions of source code must retain the above |
9 |
* copyright notice, this list of conditions and the following |
10 |
* disclaimer. |
11 |
* |
12 |
* 2. Redistributions in binary form must reproduce the above |
13 |
* copyright notice, this list of conditions and the following |
14 |
* disclaimer in the documentation and/or other materials |
15 |
* provided with the distribution. |
16 |
* |
17 |
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
18 |
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
19 |
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
20 |
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
21 |
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
22 |
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
23 |
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
24 |
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
25 |
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
26 |
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
27 |
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
28 |
* POSSIBILITY OF SUCH DAMAGE. |
29 |
* |
30 |
* The views and conclusions contained in the software and |
31 |
* documentation are those of the authors and should not be |
32 |
* interpreted as representing official policies, either expressed |
33 |
* or implied, of GRNET S.A. |
34 |
*/ |
35 |
|
36 |
package gr.grnet.aquarium |
37 |
|
38 |
import com.ckkloverdos.props.Props |
39 |
import converter.{JsonTextFormat, StdConverters} |
40 |
import gr.grnet.aquarium.message.avro.{AvroHelpers, MessageFactory} |
41 |
import java.io.{InputStreamReader, BufferedReader, File} |
42 |
import java.net.URL |
43 |
import java.util.concurrent.atomic.AtomicLong |
44 |
import gr.grnet.aquarium.util.{Lock, Loggable} |
45 |
import java.util.{Date, Calendar, GregorianCalendar} |
46 |
import gr.grnet.aquarium.logic.accounting.dsl.Timeslot |
47 |
import message.avro.gen._ |
48 |
import org.apache.avro.specific.SpecificRecord |
49 |
import policy.CronSpec |
50 |
import util.json.JsonSupport |
51 |
import scala.Some |
52 |
import scala.Tuple2 |
53 |
import java.util.concurrent.locks.ReentrantLock |
54 |
|
55 |
|
56 |
/* |
57 |
* @author Prodromos Gerakios <pgerakios@grnet.gr> |
58 |
*/ |
59 |
|
60 |
|
61 |
object UID { |
62 |
|
63 |
private[this] var privCounters = Map[String,Long]() |
64 |
private[this] val lock = new Lock() |
65 |
|
66 |
def next(s:String) : Long = { |
67 |
val l = lock.withLock{ |
68 |
privCounters.get(s) match { |
69 |
case None => 1 |
70 |
case Some(l) => l+1 |
71 |
} |
72 |
} |
73 |
privCounters = privCounters + ((s,l)) |
74 |
l |
75 |
} |
76 |
|
77 |
private[this] val counter = new AtomicLong(0L) |
78 |
def next() = counter.getAndIncrement |
79 |
def random(min:Int=Int.MinValue,max:Int=Int.MaxValue) = |
80 |
min + (scala.math.random.toInt % (max+1)) % (max+1) |
81 |
|
82 |
def random[A](l:List[A]) : A = { |
83 |
val sz = l.size |
84 |
if(sz==0) throw new Exception("random") |
85 |
l(random(0,sz-1)) |
86 |
} |
87 |
} |
88 |
|
89 |
object Process { |
90 |
private[this] def exec(cmd : String,func : String=>Unit) : Unit = { |
91 |
val commands = cmd.split(" ") |
92 |
val proc = new ProcessBuilder(commands: _*).redirectErrorStream(true).start(); |
93 |
val ins = new java.io.BufferedReader(new java.io.InputStreamReader(proc.getInputStream)) |
94 |
val sb = new StringBuilder |
95 |
|
96 |
//spin off a thread to read process output. |
97 |
val outputReaderThread = new Thread(new Runnable(){ |
98 |
def run : Unit = { |
99 |
var ln : String = null |
100 |
while({ln = ins.readLine; ln != null}) |
101 |
func(ln) |
102 |
} |
103 |
}) |
104 |
outputReaderThread.start() |
105 |
|
106 |
//suspense this main thread until sub process is done. |
107 |
proc.waitFor |
108 |
|
109 |
//wait until output is fully read/completed. |
110 |
outputReaderThread.join() |
111 |
|
112 |
ins.close() |
113 |
} |
114 |
def exec(cmd:String) : Unit = exec(cmd,Console.err.println(_)) |
115 |
} |
116 |
|
117 |
object Mongo { |
118 |
def clear = Process.exec("mongo aquarium --eval db.resevents.remove();db.imevents.remove();db.policies.remove();db.userstates.remove()") |
119 |
} |
120 |
|
121 |
object AquariumInstance { |
122 |
//val propsfile = new FileStreamResource(new File("aquarium.properties")) |
123 |
var props: Props = ResourceLocator.AquariumProperties |
124 |
// Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters)) |
125 |
val aquarium = { |
126 |
Mongo.clear |
127 |
new AquariumBuilder(props, ResourceLocator.DefaultPolicyMsg). |
128 |
//update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider). |
129 |
update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))). |
130 |
build() |
131 |
} |
132 |
|
133 |
private[this] val count=new java.util.concurrent.atomic.AtomicLong() |
134 |
private[this] val ready=new java.util.concurrent.atomic.AtomicBoolean(false) |
135 |
|
136 |
def run(billWait:Int, stop:Int)(f : => Unit) = { |
137 |
if(count.addAndGet(1) == 1){ |
138 |
Console.err.println("Starting aquarium") |
139 |
aquarium.start |
140 |
Thread.sleep(billWait) |
141 |
Console.err.println("Starting aquarium (%d seconds) --- DONE".format(billWait/1000)) |
142 |
this.synchronized{ |
143 |
ready.set(true) |
144 |
this.synchronized(this.notifyAll) |
145 |
} |
146 |
} |
147 |
try{ |
148 |
this.synchronized{ |
149 |
while(!ready.get) this.wait |
150 |
} |
151 |
} finally { |
152 |
if(count.addAndGet(-1) == 0){ |
153 |
Console.err.println("Stopping aquarium") |
154 |
aquarium.stop |
155 |
Thread.sleep(stop) |
156 |
Console.err.println("Stopping aquarium --- DONE") |
157 |
} |
158 |
} |
159 |
} |
160 |
} |
161 |
|
162 |
object JsonLog { |
163 |
private[this] final val lock = new Lock() |
164 |
private[this] var _log : List[String] = Nil |
165 |
def add(json:String) = lock.withLock(_log = _log ::: List(json)) |
166 |
def get() : List[String] = lock.withLock(_log.toList) |
167 |
} |
168 |
|
169 |
/*object MessageQueue { |
170 |
private[this] final val lock = new Lock() |
171 |
private[this] var _sortedMsgs = SortedMap[Timeslot,(String,String,String)] |
172 |
} */ |
173 |
|
174 |
object MessageService { |
175 |
private[this] val lock = new Lock |
176 |
|
177 |
def send(event:SpecificRecord, rabbitMQEnabled : Boolean = false, debugEnabled:Boolean =false) = { |
178 |
val json = AvroHelpers.jsonStringOfSpecificRecord(event) |
179 |
if(rabbitMQEnabled){ |
180 |
val (exchangeName,routingKey) = event match { |
181 |
case rc:ResourceEventMsg => rc.getResource match { |
182 |
case "vmtime" => |
183 |
("cyclades","cyclades.resource.vmtime") |
184 |
case "diskspace" => |
185 |
("pithos","pithos.resource.diskspace") |
186 |
case "addcredits" => |
187 |
("astakos","astakos.resource") |
188 |
case x => |
189 |
throw new Exception("send cast failed: %s".format(x)) |
190 |
} |
191 |
case im:IMEventMsg => |
192 |
("astakos","astakos.user") |
193 |
case _ => |
194 |
throw new Exception("send cast failed") |
195 |
} |
196 |
AquariumInstance.aquarium(Aquarium.EnvKeys.rabbitMQProducer). |
197 |
sendMessage(exchangeName,routingKey,json) |
198 |
} else { |
199 |
val uid = event match { |
200 |
case rcevent: ResourceEventMsg => |
201 |
AquariumInstance.aquarium.resourceEventStore.insertResourceEvent(rcevent) |
202 |
rcevent.getUserID |
203 |
case imevent: IMEventMsg => |
204 |
AquariumInstance.aquarium.imEventStore.insertIMEvent(imevent) |
205 |
imevent.getUserID |
206 |
} |
207 |
val userActorRef = lock.withLock(AquariumInstance.aquarium.akkaService.getOrCreateUserActor(uid)) |
208 |
userActorRef ! event |
209 |
} |
210 |
val millis = event match { |
211 |
case rc:ResourceEventMsg => rc.getOccurredMillis |
212 |
case im:IMEventMsg => im.getOccurredMillis |
213 |
} |
214 |
JsonLog.add(/*new Date(millis).toString + " ---- " +*/ json) |
215 |
if(debugEnabled) |
216 |
Console.err.println("Sent message:\n%s - %s\n".format(new Date(millis).toString,json)) |
217 |
} |
218 |
} |
219 |
|
220 |
abstract class Message { |
221 |
val dbg = true |
222 |
val cal = new GregorianCalendar |
223 |
var _range : Timeslot = null |
224 |
var _cronSpec : CronSpec = null |
225 |
var _messagesSent = 0 |
226 |
//var _done = false |
227 |
var _map = Map[String,String]() |
228 |
|
229 |
def updateMap(args:Tuple2[String,String]*) : Message = |
230 |
updateMap(args.foldLeft(Map[String,String]())({(map,arg)=> map + arg})) |
231 |
|
232 |
def updateMap(map:Map[String,String]) : Message = { |
233 |
def mergeMap[A, B](ms: List[Map[A, B]])(f: (B, B) => B): Map[A, B] = |
234 |
(Map[A, B]() /: (for (m <- ms; kv <- m) yield kv)) { (a, kv) => |
235 |
a + (if (a.contains(kv._1)) kv._1 -> f(a(kv._1), kv._2) else kv) |
236 |
} |
237 |
_map = mergeMap(List(_map,map))((v1,v2) => v2) |
238 |
(_map.get("month"),_map.get("spec")) match { |
239 |
case (Some((month0:String)),Some(spec)) => |
240 |
val month : Int = month0.toInt |
241 |
if((_cronSpec==null || _cronSpec.cronSpec != spec ||cal.get(Calendar.MONTH) != month -1)) { |
242 |
val d1 = getDate(1,if(month==12) 1 else month+1,year,0,0,0) |
243 |
val d0 = getDate(1,month,year,0,0,0) |
244 |
_range = Timeslot((d0/1000)*1000,(d1/1000)*1000 - 1000) |
245 |
cal.setTimeInMillis(d0) |
246 |
_cronSpec = new CronSpec(if(spec.isEmpty) "* * * * *" else spec) |
247 |
} |
248 |
case _ => () |
249 |
} |
250 |
this |
251 |
} |
252 |
|
253 |
//def done = _done |
254 |
def sentMessages = _messagesSent |
255 |
|
256 |
def nextTime : Option[Long] = nextTime(false) |
257 |
|
258 |
def nextTime(update:Boolean) : Option[Long] = { |
259 |
_cronSpec match{ |
260 |
case null => |
261 |
None |
262 |
case _ => |
263 |
_cronSpec.nextValidDate(_range,cal.getTime) match { |
264 |
case Some(d) => |
265 |
val millis = d.getTime |
266 |
if(update) cal.setTimeInMillis(millis) |
267 |
Some(millis) |
268 |
case None => |
269 |
None |
270 |
} |
271 |
} |
272 |
} |
273 |
|
274 |
def year : Int = { |
275 |
val tmp = getMillis |
276 |
cal.setTimeInMillis(System.currentTimeMillis()) |
277 |
val ret = cal.get(Calendar.YEAR) |
278 |
cal.setTimeInMillis(tmp) |
279 |
ret |
280 |
} |
281 |
|
282 |
def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int,sec:Int) : Long = { |
283 |
val tmp = getMillis |
284 |
cal.set(year,month-1,day,hour,min,sec) |
285 |
val ret = cal.getTimeInMillis |
286 |
cal.setTimeInMillis(tmp) |
287 |
ret |
288 |
} |
289 |
|
290 |
def getMillis : Long = cal.getTimeInMillis |
291 |
|
292 |
def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int) : Long = |
293 |
getDate(day,month,year,hour,min,0) |
294 |
|
295 |
/*def setMillis(millis:Long) = { |
296 |
cal.setTimeInMillis(millis) |
297 |
}*/ |
298 |
|
299 |
/*def addMillis(day:Int,hour:Int) = { |
300 |
cal.roll(Calendar.DATE,day) |
301 |
cal.roll(Calendar.DATE,hour) |
302 |
}*/ |
303 |
|
304 |
def nextID = UID.next(this.getClass().getName) /*this match { |
305 |
case _:DiskMessage => UID.next("DiskMessage") |
306 |
case _:CreationMessage => UID.next("CreationMessage") |
307 |
case _:VMMessage => UID.next("VMMessage") |
308 |
case _:AddCreditsMessage => UID.next("AddCreditsMessage") |
309 |
}*/ |
310 |
|
311 |
def makeEvent(millis:Long,map:Map[String,String]) : SpecificRecord |
312 |
|
313 |
def send(args:Tuple2[String,String]*) : Boolean = |
314 |
send(args.foldLeft(Map[String,String]())({(map,arg)=> map + arg})) |
315 |
|
316 |
def send(map:Map[String,String]) : Boolean = { |
317 |
nextTime(true) match { |
318 |
case Some(millis) => |
319 |
updateMap(map) |
320 |
val event = makeEvent(millis,_map) |
321 |
val ren = _map.getOrElse("rabbitMQEnabled","false").toBoolean |
322 |
val rdb = _map.getOrElse("debugEnabled","false").toBoolean |
323 |
MessageService.send(event,ren,rdb) |
324 |
_messagesSent += 1 |
325 |
true |
326 |
case None => |
327 |
//_done = true |
328 |
false |
329 |
} |
330 |
} |
331 |
|
332 |
} |
333 |
|
334 |
class DiskMessage extends Message { |
335 |
/* |
336 |
* map: |
337 |
* "action" -> "update" , "delete" , "purge" |
338 |
* "uid" -> |
339 |
* "path" -> |
340 |
* "value" -> |
341 |
*/ |
342 |
def makeEvent(millis:Long,map:Map[String,String]) = { |
343 |
val action = map("action") |
344 |
val uid = map("uid") |
345 |
val path = map("path") |
346 |
val value = map("value") |
347 |
val id = "rc.%d.object.%s".format(nextID,action) |
348 |
val occurredMillis = millis |
349 |
val receivedMillis = millis |
350 |
val userID = uid //"user%s@grnet.gr".format(uid) |
351 |
val clientID = "pithos" |
352 |
val resource ="diskspace" |
353 |
val instanceID = "1" |
354 |
val eventVersion = "1.0" |
355 |
val details = MessageFactory.newDetails( |
356 |
MessageFactory.newStringDetail("action", "object %s".format(action)), |
357 |
MessageFactory.newStringDetail("total", "0.0"), |
358 |
MessageFactory.newStringDetail("user", userID), |
359 |
MessageFactory.newStringDetail("path", path) |
360 |
) |
361 |
|
362 |
val msg = MessageFactory.newResourceEventMsg( |
363 |
id, |
364 |
occurredMillis, receivedMillis, |
365 |
userID, clientID, |
366 |
resource, instanceID, |
367 |
value, |
368 |
eventVersion, |
369 |
details, |
370 |
uid |
371 |
) |
372 |
|
373 |
msg |
374 |
} |
375 |
} |
376 |
|
377 |
class VMMessage extends Message { |
378 |
/* |
379 |
* map: |
380 |
* uid -> unique id for user |
381 |
* instanceID -> "cyclades.vm.kJSOLek" |
382 |
* vmName -> "My Lab VM" |
383 |
* status -> "on", "off" , "destroy" |
384 |
*/ |
385 |
var _status = "on" |
386 |
def nextStatus = { |
387 |
if(_status=="off") _status = "on" else _status = "off" |
388 |
_status |
389 |
} |
390 |
def makeEvent(millis:Long,map:Map[String,String]) = { |
391 |
val uid = map("uid") |
392 |
val value = /* map("status")*/nextStatus match { |
393 |
case "on" => "1" |
394 |
case "off" => "0" |
395 |
case "destroy" => "2" |
396 |
case x => throw new Exception("VMMessage bad status: %s".format(x)) |
397 |
} |
398 |
val id = "rc.%d.vmtime".format(nextID) |
399 |
val occurredMillis = millis |
400 |
val receivedMillis = millis |
401 |
val userID = uid // "user%s@grnet.gr".format(uid) |
402 |
val clientID = "cyclades" |
403 |
val resource ="vmtime" |
404 |
val instanceID = map("instanceID") |
405 |
val eventVersion = "1.0" |
406 |
val details = MessageFactory.newDetails( |
407 |
MessageFactory.newStringDetail("VM Name", map("vmName")) |
408 |
) |
409 |
|
410 |
val msg = MessageFactory.newResourceEventMsg( |
411 |
id, |
412 |
occurredMillis, receivedMillis, |
413 |
userID, clientID, |
414 |
resource, instanceID, |
415 |
value, |
416 |
eventVersion, |
417 |
details, |
418 |
uid |
419 |
) |
420 |
|
421 |
msg |
422 |
} |
423 |
} |
424 |
|
425 |
class CreationMessage extends Message { |
426 |
/* |
427 |
* map contains: |
428 |
* uid -> user id |
429 |
*/ |
430 |
def makeEvent(millis:Long,map:Map[String,String]) = { |
431 |
val uid = map("uid") // |
432 |
val id = "im.%d.create.user".format(nextID) |
433 |
val occurredMillis = millis |
434 |
val receivedMillis = millis |
435 |
val userID = uid //"user%d@grnet.gr".format(mid) |
436 |
val clientID = "astakos" |
437 |
val isActive = false |
438 |
val role = "default" |
439 |
val eventVersion = "1.0" |
440 |
val eventType = "create" |
441 |
|
442 |
val msg = MessageFactory.newIMEventMsg( |
443 |
id, |
444 |
occurredMillis, receivedMillis, |
445 |
userID, clientID, |
446 |
isActive, |
447 |
role, |
448 |
eventVersion, eventType, |
449 |
MessageFactory.newDetails(), |
450 |
uid |
451 |
) |
452 |
msg |
453 |
} |
454 |
} |
455 |
|
456 |
class AddCreditsMessage extends Message { |
457 |
/* |
458 |
* map contains: |
459 |
* amount -> "2000" |
460 |
* uid -> loverdos1 |
461 |
*/ |
462 |
def makeEvent(millis:Long,map:Map[String,String]) = { |
463 |
val uid = map("uid") // |
464 |
val amount = map("amount") |
465 |
val id = "im.%d.add.credits".format(nextID) |
466 |
val occurredMillis = millis |
467 |
val receivedMillis = millis |
468 |
val userID = uid //"user%d@grnet.gr".format(uid) |
469 |
val clientID = "astakos" |
470 |
val isActive = false |
471 |
val role = "default" |
472 |
val eventVersion = "1.0" |
473 |
val eventType = "addcredits" |
474 |
val msg = MessageFactory.newResourceEventMsg( |
475 |
id, |
476 |
occurredMillis, receivedMillis, |
477 |
userID, clientID, |
478 |
"addcredits", "addcredits", |
479 |
amount, |
480 |
eventVersion, |
481 |
MessageFactory.newDetails(), |
482 |
uid |
483 |
) |
484 |
msg |
485 |
} |
486 |
} |
487 |
|
488 |
object Message { |
489 |
def apply(typ:String,args:Tuple2[String,String]*) : Message = |
490 |
apply(typ,args.foldLeft(Map[String,String]())({(map,arg)=> map + arg})) |
491 |
|
492 |
val msgMap = Map[String,()=>Message]( |
493 |
"vm" -> (() => new VMMessage), |
494 |
"disk" -> (() => new DiskMessage), |
495 |
"create" -> (() => new CreationMessage), |
496 |
"credits" -> (() => new AddCreditsMessage) |
497 |
) |
498 |
|
499 |
def apply(typ:String,map:Map[String,String]) : Message = { |
500 |
val msg = msgMap.getOrElse(typ,throw new Exception("Invalid type : "+typ))() |
501 |
msg.updateMap(map) |
502 |
msg |
503 |
} |
504 |
} |
505 |
|
506 |
|
507 |
class User(serverAndPort:String,month:Int) { |
508 |
val uid = "user%d@grnet.gr".format(UID.next) |
509 |
val _creationMessage : Message = Message("create","uid"->uid,"month"->month.toString,"spec"->"") |
510 |
var _resources : List[Message] = Nil |
511 |
var _billEntryMsg :Option[BillEntryMsg] = None |
512 |
var _resMsgs = 0 |
513 |
var _vmMsgs = 0 |
514 |
var _addMsgs = 0 |
515 |
var _messagesSent : List[Message] = Nil |
516 |
|
517 |
override def toString() = uid |
518 |
|
519 |
def scalaList[A](s:java.util.List[A]) : List[A] = { |
520 |
import scala.collection.JavaConverters.asScalaBufferConverter |
521 |
s.asScala.toList |
522 |
} |
523 |
|
524 |
def sumOf[A,D](l: java.util.List[A],start:D)(f:A=>D)(add:(D,D)=>D) : D = |
525 |
scalaList(l).map(f).foldLeft(start) {case (sum,v) => add(sum,v) } |
526 |
|
527 |
|
528 |
def checkSum[A,D](s:D,l: java.util.List[A],start:D)(f:A=>D)(add:(D,D)=>D) = |
529 |
check(s == sumOf(l,start)(f)(add)) |
530 |
|
531 |
def check(b: => Boolean) = { |
532 |
if(!b) |
533 |
throw new Exception("Invalid property") |
534 |
} |
535 |
|
536 |
|
537 |
type S[A] = {def getTotalCredits : String |
538 |
def getTotalElapsedTime:String |
539 |
def getTotalUnits:String |
540 |
def getDetails:java.util.List[A]} |
541 |
type V = (Double,Long,Double) |
542 |
|
543 |
def valuesOf[A,T<:S[A]](t:T) : (V,java.util.List[A]) = |
544 |
((t.getTotalCredits.toDouble, |
545 |
t.getTotalElapsedTime.toLong, |
546 |
t.getTotalUnits.toDouble), |
547 |
t.getDetails) |
548 |
|
549 |
def checkS[A,T<:S[A]](s:T)(f:A=>V)(add:(V,V)=>V) = { |
550 |
val zero = (0D,0L,0D) |
551 |
val (v0,v1) = valuesOf[A,T](s) |
552 |
checkSum(v0,v1,zero)(f)(add) |
553 |
} |
554 |
|
555 |
def add(a:V,b:V) : V= { |
556 |
val (a1,a2,a3) = a |
557 |
val (b1,b2,b3) = b |
558 |
//if(pos && b1 < 0.0D) a else |
559 |
(a1+b1,a2+b2,a3+b3) |
560 |
} |
561 |
|
562 |
val zero = (0.0D,0L,0.0D) |
563 |
|
564 |
def filterMessagesSent(serviceName:String) : List[Message] = |
565 |
_messagesSent.filter { (_,serviceName) match { |
566 |
case (_:DiskMessage,"diskspace") => true |
567 |
case (_:VMMessage,"vmtime") => true |
568 |
case (_:AddCreditsMessage,"addcredits") => true |
569 |
case _ => false |
570 |
}} |
571 |
|
572 |
def checkMessages(serviceName:String,c:List[ChargeEntryMsg],m:List[Message]) = { |
573 |
if(m.length == 0) check(c.length == 0) |
574 |
else check(c.length == (serviceName match { |
575 |
case "diskspace" => m.length - 1 |
576 |
case "vmtime" => m.length - 1 |
577 |
case "addcredits" => m.length |
578 |
})) |
579 |
} |
580 |
|
581 |
def validateChargeEntry(c:ChargeEntryMsg) : V = { |
582 |
(c.getTotalCredits.toDouble,c.getTotalElapsedTime.toLong,c.getTotalUnits.toDouble) |
583 |
} |
584 |
|
585 |
def validateEventEntry(serviceName:String,e:EventEntryMsg) : V = { |
586 |
val v1 = scalaList(e.getDetails) |
587 |
val v2 = filterMessagesSent(serviceName) |
588 |
checkMessages(serviceName,v1,v2) |
589 |
//val v3 = (e.getTotalCredits.toDouble,e.getTotalElapsedTime.toLong,e.getTotalUnits.toDouble) |
590 |
val v4 = sumOf(e.getDetails,zero)(validateChargeEntry)(add) |
591 |
v4 |
592 |
} |
593 |
|
594 |
def validateResourceEntry(serviceName:String,r:ResourceEntryMsg) : V = { |
595 |
val v0 = (r.getTotalCredits.toDouble,r.getTotalElapsedTime.toLong,r.getTotalUnits.toDouble) |
596 |
val v1 = sumOf(r.getDetails,zero)(validateEventEntry(serviceName,_))(add) |
597 |
check(v0 == v1) |
598 |
v0 |
599 |
} |
600 |
|
601 |
def validateServiceEntry(s:ServiceEntryMsg) : Double = { |
602 |
val v0 = (s.getTotalCredits.toDouble,s.getTotalElapsedTime.toLong,s.getTotalUnits.toDouble) |
603 |
val v1 = sumOf(s.getDetails,zero)(validateResourceEntry(s.getServiceName,_))(add) |
604 |
check(v0 == v1) |
605 |
v0._1 |
606 |
} |
607 |
|
608 |
def validateBillEntry(b:BillEntryMsg) : Boolean = { |
609 |
try{ |
610 |
check(b.getStatus == "ok") |
611 |
check(uid == b.getUserID) |
612 |
check(_creationMessage._range.from.getTime == b.getStartTime().toLong && |
613 |
_creationMessage._range.to.getTime == b.getEndTime().toLong) |
614 |
check(b.getDeductedCredits.toDouble == |
615 |
sumOf(b.getDetails,0.0D)(validateServiceEntry)(_ + _)) |
616 |
true |
617 |
} catch { |
618 |
case e:Exception => |
619 |
e.printStackTrace |
620 |
false |
621 |
} |
622 |
} |
623 |
|
624 |
def validateResults() : Boolean = { |
625 |
_billEntryMsg match { |
626 |
case None => false |
627 |
case Some(b) => validateBillEntry(b) |
628 |
} |
629 |
//throw new Exception("Not implemented !!!!") |
630 |
} |
631 |
|
632 |
def printMessages() = { |
633 |
Console.err.println("Messages sent:") |
634 |
for { m <- JsonLog.get} |
635 |
Console.err.println("%s".format(m)) //"\n==============\n%s\n===============" |
636 |
Console.err.println("\n=========================\n") |
637 |
} |
638 |
def printResponse() = { |
639 |
Console.err.println("Response:\n" + (_billEntryMsg match { |
640 |
case None => "NONE!!!!" |
641 |
case Some(r) => AvroHelpers.jsonStringOfSpecificRecord(r) |
642 |
})) |
643 |
} |
644 |
|
645 |
def add(no:Int,typ:String,args:Tuple2[String,String]*) : User = |
646 |
add(no,typ,args.foldLeft(Map[String,String]())({(map,arg)=> map + arg})) |
647 |
|
648 |
def add(no:Int,typ:String,map:Map[String,String]) : User = |
649 |
add(no,typ,{_ => map}) |
650 |
|
651 |
def add(no:Int,typ:String,map:Int=>Map[String,String]) : User = { |
652 |
for {i <- 1 to no} { |
653 |
val map0 : Map[String,String] = map(i) + ("uid"->uid) + ("month"->month.toString) |
654 |
_resources = Message(typ,map0) :: _resources |
655 |
} |
656 |
this |
657 |
} |
658 |
|
659 |
def addVMs(no:Int,cronSpec:String) : User = |
660 |
add(no,"vm",{i => |
661 |
Map("instanceID"->"cyclades.vm.%d".format(i), |
662 |
"vmName" -> "Virtual Machine #%d".format(i), |
663 |
"status" -> "on", // initially "on" msg |
664 |
"spec" -> cronSpec.format(month))}) |
665 |
|
666 |
def addFiles(no:Int,action:String/*,value:Int,minVal:Int,maxVal:Int*/,spec:String) : User = |
667 |
add(no,"disk",{i => |
668 |
//Console.err.println("Adding file : " + "/Papers/file_%d.PDF".format(i)) |
669 |
Map("action" -> action, |
670 |
"path"->"/Papers/file_%d.PDF".format(i), |
671 |
//"value"->UID.random(minVal,maxVal).toString, |
672 |
"spec" -> spec.format(month) |
673 |
) |
674 |
}) |
675 |
|
676 |
def addCredits(no:Int,spec:String) : User = { |
677 |
add(no,"credits",/*"month"->month.toString,"uid"->uid,*/"spec"->spec.format(month) |
678 |
/*,"amount"->amount.toString*/) |
679 |
} |
680 |
|
681 |
def run(ordered:Boolean,wait:Int,minFile:Int,maxFile:Int,minAmount:Int,maxAmount:Int,maxJSONRetry :Int, |
682 |
sendViaRabbitMQ:Boolean, sendDebugEnabled : Boolean) = { |
683 |
_messagesSent = Nil |
684 |
_creationMessage.send("month"->month.toString,"uid"->uid,"spec"->"0 0 * %d ?".format(month)) // send once! |
685 |
//Thread.sleep(2000) |
686 |
var iter = _resources.toList |
687 |
while(!iter.isEmpty) |
688 |
iter = (if(!ordered) iter |
689 |
else iter.sortWith{(m1,m2) => (m1.nextTime,m2.nextTime) match { |
690 |
case (Some(l1),Some(l2)) => l1 <= l2 |
691 |
case (None,None) => true |
692 |
case (None,Some(l)) => true |
693 |
case (Some(l),None) => false |
694 |
}}).filter({m => |
695 |
_messagesSent = _messagesSent ::: List(m) |
696 |
val b = m.send("value"->UID.random(minFile,maxFile).toString, |
697 |
"amount"->UID.random(minAmount,maxAmount).toString, |
698 |
"rabbitMQEnabled" -> sendViaRabbitMQ.toString, |
699 |
"debugEnabled" -> sendDebugEnabled.toString |
700 |
//"status" -> UID.random(List("off","on")) |
701 |
) |
702 |
if(b) m match { |
703 |
case _:DiskMessage => _resMsgs += 1 |
704 |
case _:VMMessage => _vmMsgs += 1 |
705 |
case _:AddCreditsMessage => _addMsgs +=1 |
706 |
} |
707 |
b |
708 |
}) |
709 |
Thread.sleep(wait) |
710 |
_billEntryMsg = getBillResponse(maxJSONRetry) |
711 |
} |
712 |
|
713 |
private[this] def getBillResponse(max:Int) : Option[BillEntryMsg] = { |
714 |
def get () : String = { |
715 |
val fromMillis = _creationMessage._range.from.getTime |
716 |
val toMillis = _creationMessage._range.to.getTime |
717 |
val url = " http://%s/user/%s/bill/%d/%d".format(serverAndPort,uid,fromMillis,toMillis) |
718 |
try{ |
719 |
val in = new BufferedReader( |
720 |
new InputStreamReader( |
721 |
new URL(url).openConnection(). |
722 |
getInputStream())) |
723 |
var inputLine = "" |
724 |
var ret = "" |
725 |
while ({inputLine = in.readLine();inputLine} != null) |
726 |
ret += (if(ret.isEmpty) "" else "\n")+ inputLine |
727 |
in.close() |
728 |
ret |
729 |
} catch { |
730 |
case e:Exception => |
731 |
"" |
732 |
} |
733 |
} |
734 |
var resp = "" |
735 |
var count = 0 |
736 |
var ret : Option[BillEntryMsg] = None |
737 |
while(resp.isEmpty && count < max){ |
738 |
if(count > 0) Console.err.println("Retrying for bill request.") |
739 |
resp = get() |
740 |
if(resp.isEmpty) Thread.sleep(1000) |
741 |
else { |
742 |
try{ |
743 |
var b = AvroHelpers.specificRecordOfJsonString(resp, new BillEntryMsg) |
744 |
ret = Some(b) |
745 |
if(b.getStatus().equals("processing")){ |
746 |
Thread.sleep(1000) |
747 |
resp = "" |
748 |
} |
749 |
} catch { |
750 |
case e:Exception => |
751 |
e.printStackTrace |
752 |
resp = "" |
753 |
} |
754 |
} |
755 |
//sleep(1000L) |
756 |
count += 1 |
757 |
} |
758 |
ret |
759 |
} |
760 |
} |
761 |
|
762 |
case class Resource( |
763 |
val resType : String, // Message.msgMap.keys |
764 |
val instances: Long, |
765 |
val cronSpec : String |
766 |
) |
767 |
extends JsonSupport {} |
768 |
|
769 |
case class Scenario( |
770 |
val ignoreScenario : Boolean, |
771 |
val printMessages : Boolean, |
772 |
val printResponses: Boolean, |
773 |
val host : String, |
774 |
val port : Long, |
775 |
val sendOrdered : Boolean, |
776 |
val sendViaRabbitMQ : Boolean, |
777 |
val sendDebugEnabled : Boolean, |
778 |
val validationEnabled : Boolean, |
779 |
val billingMonth: Long, |
780 |
val aquariumStartWaitMillis : Long, |
781 |
val aquariumStopWaitMillis : Long, |
782 |
val billResponseWaitMillis : Long, |
783 |
val numberOfUsers : Long, |
784 |
val numberOfResponseRetries : Long, |
785 |
val minFileCredits : Long, |
786 |
val maxFileCredits : Long, |
787 |
val minUserCredits : Long, |
788 |
val maxUserCredits : Long, |
789 |
val resources : List[Resource] |
790 |
) |
791 |
extends JsonSupport {} |
792 |
|
793 |
case class Scenarios( |
794 |
val scenarios : List[Scenario] ) |
795 |
extends JsonSupport {} |
796 |
|
797 |
object ScenarioRunner { |
798 |
val aquarium = AquariumInstance.aquarium |
799 |
|
800 |
def parseScenario(txt:String) : Scenario = |
801 |
StdConverters.AllConverters.convertEx[Scenario](JsonTextFormat(txt)) |
802 |
|
803 |
def parseScenarios(txt:String) : Scenarios = |
804 |
StdConverters.AllConverters.convertEx[Scenarios](JsonTextFormat(txt)) |
805 |
|
806 |
def runScenario(txt:String) : Unit = runScenario(parseScenario(txt)) |
807 |
|
808 |
private[this] def runUser(s:Scenario) : User = { |
809 |
val user = new User("%s:%d".format(s.host,s.port),s.billingMonth.toInt) |
810 |
val (minFileCredits,maxFileCredits) = (s.minFileCredits,s.maxFileCredits) |
811 |
val (minUserCredits,maxUserCredits) = (s.maxUserCredits,s.maxUserCredits) |
812 |
//Cron spec minutes hours day-of-month Month Day-of-Week (we do not specify seconds) |
813 |
//AquariumInstance.run(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt) { |
814 |
for{ r <- s.resources} // create messages |
815 |
r.resType match { |
816 |
case "vm" => |
817 |
user.addVMs(r.instances.toInt,r.cronSpec) |
818 |
case "disk" => |
819 |
user.addFiles(r.instances.toInt,"update",r.cronSpec) |
820 |
case "credits" => |
821 |
user.addCredits(r.instances.toInt,r.cronSpec) |
822 |
} |
823 |
// run scenario |
824 |
user.run(s.sendOrdered,s.billResponseWaitMillis.toInt,s.minFileCredits.toInt, |
825 |
s.maxFileCredits.toInt,s.minUserCredits.toInt,s.maxUserCredits.toInt, |
826 |
s.numberOfResponseRetries.toInt,s.sendViaRabbitMQ,s.sendDebugEnabled) |
827 |
//} |
828 |
user |
829 |
} |
830 |
|
831 |
private[this] def runAquarium[A](billWait:Long,stop:Long,default:A)(forkJoinCode: => A) : A = { |
832 |
Console.err.println("Starting aquarium") |
833 |
AquariumInstance.aquarium.start |
834 |
Thread.sleep(billWait) |
835 |
Console.err.println("Starting aquarium (%d seconds) --- DONE".format(billWait/1000)) |
836 |
try{ |
837 |
forkJoinCode |
838 |
} finally { |
839 |
Console.err.println("Stopping aquarium") |
840 |
AquariumInstance.aquarium.stop |
841 |
Thread.sleep(stop) |
842 |
Console.err.println("Stopping aquarium --- DONE") |
843 |
default |
844 |
} |
845 |
} |
846 |
|
847 |
def runScenario(s:Scenario): Unit = { |
848 |
if(s.ignoreScenario == false) { |
849 |
Console.err.println("=================\nRunning scenario:\n %s\n=======================\n".format(s.toJsonString)) |
850 |
runAquarium(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt,List[User]()){ |
851 |
val tasks = for { u <- 1 to s.numberOfUsers.toInt} |
852 |
yield scala.actors.Futures.future(runUser(s)) |
853 |
tasks.map(_()).toList |
854 |
}.foreach{ u => |
855 |
if(s.printMessages) u.printMessages() |
856 |
if(s.printResponses) u.printResponse() |
857 |
if(s.validationEnabled && u.validateResults() == false) |
858 |
Console.err.println("Validation FAILED for user " + u) |
859 |
} |
860 |
Console.err.println("\n=========================\nStopping scenario\n=======================") |
861 |
} |
862 |
} |
863 |
|
864 |
def runScenarios(txt:String) : Unit = runScenarios(parseScenarios(txt)) |
865 |
|
866 |
def runScenarios(ss:Scenarios) = { |
867 |
Console.err.println("=================\nScenarios:\n %s\n=======================\n".format(ss.toJsonString)) |
868 |
ss.scenarios.foreach(runScenario(_)) |
869 |
} |
870 |
|
871 |
} |
872 |
|
873 |
object UserTest extends Loggable { |
874 |
/* |
875 |
JSON example: |
876 |
{ |
877 |
"scenarios":[{ |
878 |
"ignoreScenario":false, |
879 |
"printMessages":false, |
880 |
"printResponses":true, |
881 |
"host":"localhost", |
882 |
"port":8888, |
883 |
"sendOrdered":true, |
884 |
"sendViaRabbitMQ":false, |
885 |
"sendDebugEnabled":false, |
886 |
"validationEnabled":false, |
887 |
"billingMonth":9, |
888 |
"aquariumStartWaitMillis":2000, |
889 |
"aquariumStopWaitMillis":2000, |
890 |
"billResponseWaitMillis":2000, |
891 |
"numberOfUsers":2, |
892 |
"numberOfResponseRetries":10, |
893 |
"minFileCredits":2000, |
894 |
"maxFileCredits":5000, |
895 |
"minUserCredits":10000, |
896 |
"maxUserCredits":50000, |
897 |
"resources":[{ |
898 |
"resType":"credits", |
899 |
"instances":1, |
900 |
"cronSpec":"00 00 10,12 %d ?" |
901 |
},{ |
902 |
"resType":"disk", |
903 |
"instances":1, |
904 |
"cronSpec":"00 18 15,20,29,30 %d ?" |
905 |
},{ |
906 |
"resType":"vm", |
907 |
"instances":1, |
908 |
"cronSpec":"00 18 14,17,19,20 %d ?" |
909 |
}] |
910 |
}] |
911 |
} |
912 |
*/ |
913 |
val basic = new Scenario(false,false,true,"localhost",8888,true,false,false,false,9,2000,2000,2000, |
914 |
1,10,2000,5000,10000,50000,List[Resource]( |
915 |
new Resource("credits",1, "00 00 10,12 %d ?".format(9)), |
916 |
new Resource("disk",1,"00 18 15,20,29,30 %d ?".format(9)), |
917 |
new Resource("vm",1,"00 18 14,17,19,20 %d ?".format(9)) |
918 |
)) |
919 |
|
920 |
def main(args: Array[String]) = { |
921 |
|
922 |
try{ |
923 |
val lines = scala.io.Source.fromFile(args.head).mkString |
924 |
ScenarioRunner.runScenarios(lines) |
925 |
} catch { |
926 |
case e:Exception => |
927 |
e.printStackTrace() |
928 |
ScenarioRunner.runScenarios(new Scenarios(List(basic))) |
929 |
} |
930 |
|
931 |
|
932 |
/* val user = new User("localhost:8888",9) |
933 |
val (minFileCredits,maxFileCredits) = (2000,5000) |
934 |
val (minUserCredits,maxUserCredits) = (10000,10000) |
935 |
//Cron spec minutes hours day-of-month Month Day-of-Week (we do not specify seconds) |
936 |
|
937 |
val json =AquariumInstance.run(2000,2000) { |
938 |
user. |
939 |
addCredits(1,"00 00 10,12 9 ?"). |
940 |
addFiles(1,"update",2000,1000,3000,"00 18 15,20,29,30 9 ?"). |
941 |
addVMs(1,"00 18 14,17,19,20 9 ?"). |
942 |
//addVMs(5,"on","00 18 ? 9 Tue") |
943 |
run(true,2000,minFileCredits,maxFileCredits,minUserCredits,maxUserCredits) |
944 |
} |
945 |
Thread.sleep(2000) |
946 |
Console.err.println("Messages sent:") |
947 |
for { m <- JsonLog.get} |
948 |
Console.err.println("%s".format(m)) //"\n==============\n%s\n===============" |
949 |
Console.err.println("\n=========================\n") |
950 |
Console.err.println("Response:\n" + json)*/ |
951 |
} |
952 |
|
953 |
} |
954 |
|
955 |
|
956 |
/* |
957 |
object BillTest extends Loggable { |
958 |
|
959 |
type JSON = String |
960 |
type UID = Long |
961 |
type DATE = String |
962 |
|
963 |
private[this] val counter = new AtomicLong(0L) |
964 |
private[this] def nextID() = counter.getAndIncrement |
965 |
|
966 |
private [this] val format = new SimpleDateFormat("HH/mm/s/dd/MM/yyyy"); |
967 |
|
968 |
val propsfile = new FileStreamResource(new File("aquarium.properties")) |
969 |
|
970 |
var props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters)) |
971 |
|
972 |
val (astakosExchangeName,astakosRoutingKey) = ("astakos","astakos.user") |
973 |
|
974 |
val (pithosExchangeName,pithosRoutingKey) = ("pithos","pithos.resource.diskspace") |
975 |
|
976 |
val aquarium = { |
977 |
exec("mongo aquarium --eval db.resevents.remove();db.imevents.remove();db.policies.remove();db.userstates.remove()", |
978 |
Console.err.println(_)) |
979 |
new AquariumBuilder(props, ResourceLocator.DefaultPolicyMsg). |
980 |
//update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider). |
981 |
update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))). |
982 |
build() |
983 |
} |
984 |
|
985 |
|
986 |
private[this] def exec(cmd : String,func : String=>Unit) : Unit = { |
987 |
val commands = cmd.split(" ") |
988 |
val proc = new ProcessBuilder(commands: _*).redirectErrorStream(true).start(); |
989 |
val ins = new java.io.BufferedReader(new java.io.InputStreamReader(proc.getInputStream)) |
990 |
val sb = new StringBuilder |
991 |
|
992 |
//spin off a thread to read process output. |
993 |
val outputReaderThread = new Thread(new Runnable(){ |
994 |
def run : Unit = { |
995 |
var ln : String = null |
996 |
while({ln = ins.readLine; ln != null}) |
997 |
func(ln) |
998 |
} |
999 |
}) |
1000 |
outputReaderThread.start() |
1001 |
|
1002 |
//suspense this main thread until sub process is done. |
1003 |
proc.waitFor |
1004 |
|
1005 |
//wait until output is fully read/completed. |
1006 |
outputReaderThread.join() |
1007 |
|
1008 |
ins.close() |
1009 |
} |
1010 |
|
1011 |
|
1012 |
private [this] def createUser(date:DATE) : (JSON,UID) = { |
1013 |
val mid = nextID |
1014 |
val id = "im.%d.create.user".format(mid) |
1015 |
val millis = format.parse(date).getTime |
1016 |
val occurredMillis = millis |
1017 |
val receivedMillis = millis |
1018 |
val userID = "user%d@grnet.gr".format(mid) |
1019 |
val clientID = "astakos" |
1020 |
val isActive = false |
1021 |
val role = "default" |
1022 |
val eventVersion = "1.0" |
1023 |
val eventType = "create" |
1024 |
|
1025 |
val msg = MessageFactory.newIMEventMsg(id,occurredMillis,receivedMillis,userID, clientID, isActive,role,eventVersion,eventType) |
1026 |
val json = AvroHelpers.jsonStringOfSpecificRecord(msg) |
1027 |
(json, mid) |
1028 |
} |
1029 |
|
1030 |
private [this] def addCredits(date:DATE,uid:UID,amount:Long) : JSON = { |
1031 |
val id = "im.%d.add.credits".format(nextID) |
1032 |
val millis = format.parse(date).getTime |
1033 |
val occurredMillis = millis |
1034 |
val receivedMillis = millis |
1035 |
val userID = "user%d@grnet.gr".format(uid) |
1036 |
val clientID = "astakos" |
1037 |
val isActive = false |
1038 |
val eventVersion = "1.0" |
1039 |
val resource = "addcredits" |
1040 |
val instanceID = "addcredits" |
1041 |
|
1042 |
val msg = MessageFactory.newResourceEventMsg(id, occurredMillis, receivedMillis, userID, clientID, resource, instanceID, amount.toString, eventVersion) |
1043 |
val json = AvroHelpers.jsonStringOfSpecificRecord(msg) |
1044 |
json |
1045 |
} |
1046 |
|
1047 |
private [this] def makePithos(date:DATE,uid:UID,path:String, |
1048 |
value:Double,action:String) : JSON = { |
1049 |
val id = "rc.%d.object.%s".format(nextID,action) |
1050 |
val millis = format.parse(date).getTime |
1051 |
val occurredMillis = millis |
1052 |
val receivedMillis = millis |
1053 |
val userID = "user%d@grnet.gr".format(uid) |
1054 |
val clientID = "pithos" |
1055 |
val resource ="diskspace" |
1056 |
val instanceID = "1" |
1057 |
val eventVersion = "1.0" |
1058 |
val details = MessageFactory.newDetails( |
1059 |
MessageFactory.newStringDetail("action", "object %s".format(action)), |
1060 |
MessageFactory.newStringDetail("total", "0.0"), |
1061 |
MessageFactory.newStringDetail("user", userID), |
1062 |
MessageFactory.newStringDetail("path", path) |
1063 |
) |
1064 |
|
1065 |
val msg = MessageFactory.newResourceEventMsg(id, occurredMillis, receivedMillis, userID, clientID, resource, instanceID, value.toString, eventVersion, details) |
1066 |
val json = AvroHelpers.jsonStringOfSpecificRecord(msg) |
1067 |
json |
1068 |
} |
1069 |
|
1070 |
private[this] def sendCreate(date:DATE) : UID = { |
1071 |
val (json,uid) = createUser(date) |
1072 |
aquarium(Aquarium.EnvKeys.rabbitMQProducer). |
1073 |
sendMessage(astakosExchangeName,astakosRoutingKey,json) |
1074 |
Console.err.println("Sent message:\n%s\n".format(json)) |
1075 |
uid |
1076 |
} |
1077 |
|
1078 |
private[this] def sendAddCredits(date:DATE,uid:UID,amount:Long) = { |
1079 |
val json = addCredits(date,uid,amount) |
1080 |
aquarium(Aquarium.EnvKeys.rabbitMQProducer). |
1081 |
sendMessage(astakosExchangeName,astakosRoutingKey, |
1082 |
json) |
1083 |
Console.err.println("Sent message:\n%s\n".format(json)) |
1084 |
} |
1085 |
|
1086 |
private[this] def sendPithos(date:DATE,uid:UID,path:String, |
1087 |
value:Double,action:String) = { |
1088 |
val json = makePithos(date,uid,path,value,action) |
1089 |
aquarium(Aquarium.EnvKeys.rabbitMQProducer). |
1090 |
sendMessage(pithosExchangeName,pithosRoutingKey, |
1091 |
json) |
1092 |
Console.err.println("Sent message:\n%s\n".format(json)) |
1093 |
} |
1094 |
|
1095 |
private[this] def jsonOf(url:String) : JSON = { |
1096 |
val in = new BufferedReader( |
1097 |
new InputStreamReader( |
1098 |
new URL(url).openConnection(). |
1099 |
getInputStream())) |
1100 |
var inputLine = "" |
1101 |
var ret = "" |
1102 |
while ({inputLine = in.readLine();inputLine} != null) |
1103 |
ret += (if(ret.isEmpty) "" else "\n")+ inputLine |
1104 |
in.close() |
1105 |
ret |
1106 |
} |
1107 |
|
1108 |
private[this] def getBill(uid:Long,from:String,to:String) : JSON = { |
1109 |
val fromMillis = format.parse(from).getTime |
1110 |
val toMillis = format.parse(to).getTime |
1111 |
val billURL = " http://localhost:8888/user/user%d@grnet.gr/bill/%d/%d".format(uid,fromMillis,toMillis) |
1112 |
try{ |
1113 |
jsonOf(billURL) |
1114 |
} catch { |
1115 |
case e:Exception => |
1116 |
"" |
1117 |
} |
1118 |
} |
1119 |
|
1120 |
private[this] def sleep(l:Long) = { |
1121 |
try { |
1122 |
Thread.sleep(l) |
1123 |
} catch { |
1124 |
case ex:InterruptedException => |
1125 |
Thread.currentThread().interrupt() |
1126 |
} |
1127 |
} |
1128 |
|
1129 |
|
1130 |
private[this] def testCase1() : JSON = { |
1131 |
/* GET BILL FROM TO*/ |
1132 |
val billFromDate = "00/00/00/01/08/2012" |
1133 |
val billToDate= "23/59/59/31/08/2012" |
1134 |
/* USER Creation */ |
1135 |
val creationDate = "15/00/00/03/08/2012" |
1136 |
/* ADD CREDITS */ |
1137 |
val addCreditsDate = "18/15/00/05/08/2012" |
1138 |
val creditsToAdd = 6000 |
1139 |
/* Pithos STUFF */ |
1140 |
val pithosPath = "/Papers/GOTO_HARMFUL.PDF" |
1141 |
|
1142 |
val pithosDate1 = "20/30/00/05/08/2012" |
1143 |
val pithosAction1 = "update" |
1144 |
val pithosValue1 = 2000 |
1145 |
|
1146 |
|
1147 |
val pithosDate2 = "21/05/00/15/08/2012" |
1148 |
val pithosAction2 = "update" |
1149 |
val pithosValue2 = 4000 |
1150 |
|
1151 |
|
1152 |
val pithosDate3 = "08/05/00/20/08/2012" |
1153 |
val pithosAction3 = "update" |
1154 |
val pithosValue3 = 100 |
1155 |
|
1156 |
val id = |
1157 |
sendCreate(creationDate) |
1158 |
//Thread.sleep(5000) |
1159 |
sendAddCredits(addCreditsDate,id,creditsToAdd) |
1160 |
//Thread.sleep(5000) |
1161 |
sendPithos(pithosDate1,id,pithosPath,pithosValue1,pithosAction1) |
1162 |
//Thread.sleep(5000) |
1163 |
sendPithos(pithosDate2,id,pithosPath,pithosValue2,pithosAction2) |
1164 |
// |
1165 |
sendPithos(pithosDate3,id,pithosPath,pithosValue3,pithosAction3) |
1166 |
|
1167 |
|
1168 |
Console.err.println("Waiting for stuff to be processed") |
1169 |
Thread.sleep(5000) |
1170 |
|
1171 |
var resp = "" |
1172 |
var count = 0 |
1173 |
while(resp.isEmpty && count < 5){ |
1174 |
if(count > 0) Console.err.println("Retrying for bill request.") |
1175 |
resp = getBill(id,billFromDate,billToDate) |
1176 |
if(resp.isEmpty) Thread.sleep(1000) |
1177 |
//sleep(1000L) |
1178 |
count += 1 |
1179 |
} |
1180 |
Console.err.println("Sending URL done") |
1181 |
resp |
1182 |
} |
1183 |
|
1184 |
def runTestCase(f: => JSON) = { |
1185 |
var json = "" |
1186 |
aquarium.start |
1187 |
Thread.sleep(2000) |
1188 |
try{ |
1189 |
json = f |
1190 |
} catch{ |
1191 |
case e:Exception => |
1192 |
e.printStackTrace |
1193 |
} |
1194 |
aquarium.stop |
1195 |
Thread.sleep(1000) |
1196 |
Console.err.println("Response : " + json ) |
1197 |
} |
1198 |
|
1199 |
def main(args: Array[String]) = { |
1200 |
//Console.err.println("JSON: " + (new BillEntry).toJsonString) |
1201 |
runTestCase(testCase1) |
1202 |
} |
1203 |
} |
1204 |
*/ |