Statistics
| Branch: | Tag: | Revision:

root / src / test / scala / gr / grnet / aquarium / BillTest.scala @ 53389ed0

History | View | Annotate | Download (37.6 kB)

1
/*
2
* Copyright 2011-2012 GRNET S.A. All rights reserved.
3
*
4
* Redistribution and use in source and binary forms, with or
5
* without modification, are permitted provided that the following
6
* conditions are met:
7
*
8
*   1. Redistributions of source code must retain the above
9
*      copyright notice, this list of conditions and the following
10
*      disclaimer.
11
*
12
*   2. Redistributions in binary form must reproduce the above
13
*      copyright notice, this list of conditions and the following
14
*      disclaimer in the documentation and/or other materials
15
*      provided with the distribution.
16
*
17
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
* POSSIBILITY OF SUCH DAMAGE.
29
*
30
* The views and conclusions contained in the software and
31
* documentation are those of the authors and should not be
32
* interpreted as representing official policies, either expressed
33
* or implied, of GRNET S.A.
34
*/
35

    
36
package gr.grnet.aquarium
37

    
38
import com.ckkloverdos.props.Props
39
import converter.{JsonTextFormat, StdConverters}
40
import gr.grnet.aquarium.message.avro.{AvroHelpers, MessageFactory}
41
import java.io.{InputStreamReader, BufferedReader, File}
42
import java.net.URL
43
import java.util.concurrent.atomic.AtomicLong
44
import gr.grnet.aquarium.util.{Lock, Loggable}
45
import java.util.{Date, Calendar, GregorianCalendar}
46
import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
47
import message.avro.gen._
48
import org.apache.avro.specific.SpecificRecord
49
import policy.CronSpec
50
import util.json.JsonSupport
51
import scala.Some
52
import scala.Tuple2
53
import java.util.concurrent.locks.ReentrantLock
54

    
55

    
56
/*
57
* @author Prodromos Gerakios <pgerakios@grnet.gr>
58
*/
59

    
60

    
61
object UID {
62

    
63
  private[this] var privCounters = Map[String,Long]()
64
  private[this] val lock = new Lock()
65

    
66
  def next(s:String) : Long = {
67
     val l = lock.withLock{
68
      privCounters.get(s) match {
69
        case None => 1
70
        case Some(l) => l+1
71
      }
72
    }
73
    privCounters = privCounters + ((s,l))
74
    l
75
  }
76

    
77
  private[this] val counter = new AtomicLong(0L)
78
  def next() = counter.getAndIncrement
79
  def random(min:Int=Int.MinValue,max:Int=Int.MaxValue) =
80
      min + (scala.math.random.toInt % (max+1)) % (max+1)
81

    
82
  def random[A](l:List[A]) : A = {
83
    val sz = l.size
84
    if(sz==0) throw new Exception("random")
85
     l(random(0,sz-1))
86
  }
87
}
88

    
89
object Process {
90
  private[this] def exec(cmd : String,func : String=>Unit) : Unit = {
91
    val commands = cmd.split(" ")
92
    val proc = new ProcessBuilder(commands: _*).redirectErrorStream(true).start();
93
    val ins = new java.io.BufferedReader(new java.io.InputStreamReader(proc.getInputStream))
94
    val sb = new StringBuilder
95

    
96
    //spin off a thread to read process output.
97
    val outputReaderThread = new Thread(new Runnable(){
98
      def run : Unit = {
99
        var ln : String = null
100
        while({ln = ins.readLine; ln != null})
101
          func(ln)
102
      }
103
    })
104
    outputReaderThread.start()
105

    
106
    //suspense this main thread until sub process is done.
107
    proc.waitFor
108

    
109
    //wait until output is fully read/completed.
110
    outputReaderThread.join()
111

    
112
    ins.close()
113
  }
114
  def exec(cmd:String) : Unit = exec(cmd,Console.err.println(_))
115
}
116

    
117
object Mongo {
118
  def clear = Process.exec("mongo aquarium --eval db.resevents.remove();db.imevents.remove();db.policies.remove();db.userstates.remove()")
119
}
120

    
121
object AquariumInstance {
122
  //val propsfile = new FileStreamResource(new File("aquarium.properties"))
123
  var props: Props = ResourceLocator.AquariumProperties
124
  // Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
125
  val aquarium = {
126
    Mongo.clear
127
    new AquariumBuilder(props, ResourceLocator.DefaultPolicyMsg).
128
      //update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
129
      update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))).
130
      build()
131
  }
132

    
133
  private[this] val count=new java.util.concurrent.atomic.AtomicLong()
134
  private[this] val ready=new java.util.concurrent.atomic.AtomicBoolean(false)
135

    
136
  def run(billWait:Int, stop:Int)(f : => Unit) = {
137
    if(count.addAndGet(1) == 1){
138
      Console.err.println("Starting aquarium")
139
      aquarium.start
140
      Thread.sleep(billWait)
141
      Console.err.println("Starting aquarium  (%d seconds) --- DONE".format(billWait/1000))
142
      this.synchronized{
143
        ready.set(true)
144
        this.synchronized(this.notifyAll)
145
      }
146
    }
147
    try{
148
      this.synchronized{
149
        while(!ready.get) this.wait
150
      }
151
    } finally {
152
        if(count.addAndGet(-1) == 0){
153
       Console.err.println("Stopping aquarium")
154
       aquarium.stop
155
       Thread.sleep(stop)
156
       Console.err.println("Stopping aquarium --- DONE")
157
      }
158
    }
159
  }
160
}
161

    
162
object JsonLog {
163
  private[this] final val lock = new Lock()
164
  private[this] var _log : List[String] = Nil
165
  def add(json:String) =  lock.withLock(_log = _log ::: List(json))
166
  def get() : List[String] = lock.withLock(_log.toList)
167
}
168

    
169
/*object MessageQueue {
170
  private[this] final val lock = new Lock()
171
  private[this] var _sortedMsgs  = SortedMap[Timeslot,(String,String,String)]
172
} */
173

    
174
object MessageService {
175
  private[this] val lock = new Lock
176

    
177
  def send(event:SpecificRecord, rabbitMQEnabled : Boolean = false, debugEnabled:Boolean =false) = {
178
    val json = AvroHelpers.jsonStringOfSpecificRecord(event)
179
    if(rabbitMQEnabled){
180
      val (exchangeName,routingKey) = event match {
181
        case rc:ResourceEventMsg => rc.getResource match {
182
          case "vmtime" =>
183
            ("cyclades","cyclades.resource.vmtime")
184
          case "diskspace" =>
185
            ("pithos","pithos.resource.diskspace")
186
          case "addcredits" =>
187
            ("astakos","astakos.resource")
188
          case x =>
189
            throw new Exception("send cast failed: %s".format(x))
190
        }
191
        case im:IMEventMsg =>
192
          ("astakos","astakos.user")
193
        case _ =>
194
          throw new Exception("send cast failed")
195
      }
196
      AquariumInstance.aquarium(Aquarium.EnvKeys.rabbitMQProducer).
197
        sendMessage(exchangeName,routingKey,json)
198
    } else {
199
      val uid = event match {
200
        case rcevent: ResourceEventMsg =>
201
            AquariumInstance.aquarium.resourceEventStore.insertResourceEvent(rcevent)
202
            rcevent.getUserID
203
        case imevent: IMEventMsg =>
204
             AquariumInstance.aquarium.imEventStore.insertIMEvent(imevent)
205
             imevent.getUserID
206
      }
207
      val userActorRef = lock.withLock(AquariumInstance.aquarium.akkaService.getOrCreateUserActor(uid))
208
      userActorRef ! event
209
    }
210
    val millis = event match {
211
      case rc:ResourceEventMsg => rc.getOccurredMillis
212
      case im:IMEventMsg => im.getOccurredMillis
213
    }
214
    JsonLog.add(/*new Date(millis).toString + " ---- " +*/ json)
215
    if(debugEnabled)
216
      Console.err.println("Sent message:\n%s - %s\n".format(new Date(millis).toString,json))
217
  }
218
}
219

    
220
abstract class Message {
221
  val dbg = true
222
  val cal =   new GregorianCalendar
223
  var _range : Timeslot = null
224
  var _cronSpec : CronSpec = null
225
  var _messagesSent = 0
226
  //var _done = false
227
  var _map = Map[String,String]()
228

    
229
  def updateMap(args:Tuple2[String,String]*) : Message  =
230
    updateMap(args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
231

    
232
  def updateMap(map:Map[String,String]) : Message = {
233
    def mergeMap[A, B](ms: List[Map[A, B]])(f: (B, B) => B): Map[A, B] =
234
      (Map[A, B]() /: (for (m <- ms; kv <- m) yield kv)) { (a, kv) =>
235
        a + (if (a.contains(kv._1)) kv._1 -> f(a(kv._1), kv._2) else kv)
236
    }
237
    _map =  mergeMap(List(_map,map))((v1,v2) => v2)
238
    (_map.get("month"),_map.get("spec")) match {
239
      case (Some((month0:String)),Some(spec)) =>
240
        val month : Int = month0.toInt
241
        if((_cronSpec==null ||  _cronSpec.cronSpec != spec ||cal.get(Calendar.MONTH) != month -1)) {
242
           val d1 = getDate(1,if(month==12) 1 else month+1,year,0,0,0)
243
           val d0 = getDate(1,month,year,0,0,0)
244
           _range = Timeslot((d0/1000)*1000,(d1/1000)*1000 - 1000)
245
           cal.setTimeInMillis(d0)
246
          _cronSpec = new CronSpec(if(spec.isEmpty) "* * * * *" else spec)
247
        }
248
      case _ => ()
249
    }
250
    this
251
  }
252

    
253
  //def done = _done
254
  def sentMessages = _messagesSent
255

    
256
  def nextTime : Option[Long] = nextTime(false)
257

    
258
  def nextTime(update:Boolean) : Option[Long] = {
259
    _cronSpec match{
260
      case null =>
261
        None
262
      case _ =>
263
        _cronSpec.nextValidDate(_range,cal.getTime) match {
264
          case Some(d) =>
265
            val millis = d.getTime
266
            if(update) cal.setTimeInMillis(millis)
267
            Some(millis)
268
          case None    =>
269
            None
270
        }
271
    }
272
  }
273

    
274
  def year : Int = {
275
    val tmp = getMillis
276
    cal.setTimeInMillis(System.currentTimeMillis())
277
    val ret = cal.get(Calendar.YEAR)
278
    cal.setTimeInMillis(tmp)
279
    ret
280
  }
281

    
282
  def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int,sec:Int) : Long = {
283
    val tmp = getMillis
284
    cal.set(year,month-1,day,hour,min,sec)
285
    val ret = cal.getTimeInMillis
286
    cal.setTimeInMillis(tmp)
287
    ret
288
  }
289

    
290
  def getMillis : Long = cal.getTimeInMillis
291

    
292
  def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int) : Long =
293
    getDate(day,month,year,hour,min,0)
294

    
295
  /*def setMillis(millis:Long) = {
296
    cal.setTimeInMillis(millis)
297
  }*/
298

    
299
  /*def addMillis(day:Int,hour:Int) = {
300
    cal.roll(Calendar.DATE,day)
301
    cal.roll(Calendar.DATE,hour)
302
  }*/
303

    
304
  def nextID = UID.next(this.getClass().getName) /*this match {
305
    case _:DiskMessage => UID.next("DiskMessage")
306
    case _:CreationMessage => UID.next("CreationMessage")
307
    case _:VMMessage => UID.next("VMMessage")
308
    case _:AddCreditsMessage =>  UID.next("AddCreditsMessage")
309
  }*/
310

    
311
  def makeEvent(millis:Long,map:Map[String,String]) : SpecificRecord
312

    
313
  def send(args:Tuple2[String,String]*) : Boolean =
314
    send(args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
315

    
316
  def send(map:Map[String,String]) : Boolean = {
317
    nextTime(true) match {
318
      case Some(millis) =>
319
        updateMap(map)
320
        val event = makeEvent(millis,_map)
321
        val ren = _map.getOrElse("rabbitMQEnabled","false").toBoolean
322
        val rdb = _map.getOrElse("debugEnabled","false").toBoolean
323
        MessageService.send(event,ren,rdb)
324
        _messagesSent += 1
325
        true
326
      case None =>
327
        //_done = true
328
        false
329
    }
330
  }
331

    
332
}
333

    
334
class DiskMessage extends Message {
335
  /*
336
   *  map:
337
   *      "action" -> "update" , "delete" , "purge"
338
   *      "uid"    ->
339
   *      "path"   ->
340
   *      "value"  ->
341
   */
342
  def makeEvent(millis:Long,map:Map[String,String]) = {
343
      val action = map("action")
344
      val uid    = map("uid")
345
      val path   = map("path")
346
      val value  = map("value")
347
      val id = "rc.%d.object.%s".format(nextID,action)
348
      val occurredMillis = millis
349
      val receivedMillis = millis
350
      val userID = uid //"user%s@grnet.gr".format(uid)
351
      val clientID = "pithos"
352
      val resource ="diskspace"
353
      val instanceID = "1"
354
      val eventVersion = "1.0"
355
      val details = MessageFactory.newDetails(
356
        MessageFactory.newStringDetail("action", "object %s".format(action)),
357
        MessageFactory.newStringDetail("total", "0.0"),
358
        MessageFactory.newStringDetail("user", userID),
359
        MessageFactory.newStringDetail("path", path)
360
      )
361

    
362
      val msg = MessageFactory.newResourceEventMsg(
363
        id,
364
        occurredMillis, receivedMillis,
365
        userID, clientID,
366
        resource, instanceID,
367
        value,
368
        eventVersion,
369
        details,
370
        uid
371
      )
372

    
373
      msg
374
  }
375
}
376

    
377
class VMMessage extends Message {
378
  /*
379
   *   map:
380
   *      uid        -> unique id for user
381
   *      instanceID -> "cyclades.vm.kJSOLek"
382
   *      vmName     -> "My Lab VM"
383
   *      status     ->  "on", "off" , "destroy"
384
   */
385
  var _status = "on"
386
  def nextStatus = {
387
    if(_status=="off") _status = "on" else _status = "off"
388
    _status
389
  }
390
  def makeEvent(millis:Long,map:Map[String,String]) = {
391
    val uid    = map("uid")
392
    val value  =  /* map("status")*/nextStatus match {
393
       case "on" => "1"
394
       case "off" => "0"
395
       case "destroy" => "2"
396
       case x => throw new Exception("VMMessage bad status: %s".format(x))
397
      }
398
    val id = "rc.%d.vmtime".format(nextID)
399
    val occurredMillis = millis
400
    val receivedMillis = millis
401
    val userID = uid // "user%s@grnet.gr".format(uid)
402
    val clientID = "cyclades"
403
    val resource ="vmtime"
404
    val instanceID = map("instanceID")
405
    val eventVersion = "1.0"
406
    val details = MessageFactory.newDetails(
407
      MessageFactory.newStringDetail("VM Name", map("vmName"))
408
    )
409

    
410
    val msg = MessageFactory.newResourceEventMsg(
411
      id,
412
      occurredMillis, receivedMillis,
413
      userID, clientID,
414
      resource, instanceID,
415
      value,
416
      eventVersion,
417
      details,
418
      uid
419
    )
420

    
421
    msg
422
  }
423
 }
424

    
425
class CreationMessage extends Message {
426
  /*
427
   *  map contains:
428
   *   uid -> user id
429
   */
430
  def makeEvent(millis:Long,map:Map[String,String]) = {
431
    val uid    = map("uid")     //
432
    val id = "im.%d.create.user".format(nextID)
433
    val occurredMillis = millis
434
    val receivedMillis = millis
435
    val userID =  uid //"user%d@grnet.gr".format(mid)
436
    val clientID = "astakos"
437
    val isActive = false
438
    val role = "default"
439
    val eventVersion = "1.0"
440
    val eventType = "create"
441

    
442
    val msg = MessageFactory.newIMEventMsg(
443
      id,
444
      occurredMillis, receivedMillis,
445
      userID, clientID,
446
      isActive,
447
      role,
448
      eventVersion, eventType,
449
      MessageFactory.newDetails(),
450
      uid
451
    )
452
    msg
453
  }
454
}
455

    
456
class AddCreditsMessage extends Message {
457
  /*
458
   *  map contains:
459
   *    amount -> "2000"
460
   *    uid    -> loverdos1
461
   */
462
  def makeEvent(millis:Long,map:Map[String,String]) = {
463
    val uid    = map("uid")     //
464
    val amount = map("amount")
465
    val id = "im.%d.add.credits".format(nextID)
466
    val occurredMillis = millis
467
    val receivedMillis = millis
468
    val userID = uid //"user%d@grnet.gr".format(uid)
469
    val clientID = "astakos"
470
    val isActive = false
471
    val role = "default"
472
    val eventVersion = "1.0"
473
    val eventType = "addcredits"
474
    val msg = MessageFactory.newResourceEventMsg(
475
      id,
476
      occurredMillis, receivedMillis,
477
      userID, clientID,
478
      "addcredits", "addcredits",
479
      amount,
480
      eventVersion,
481
      MessageFactory.newDetails(),
482
      uid
483
    )
484
    msg
485
  }
486
}
487

    
488
object Message {
489
  def apply(typ:String,args:Tuple2[String,String]*) : Message =
490
    apply(typ,args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
491

    
492
  val msgMap = Map[String,()=>Message](
493
    "vm"      -> (() => new VMMessage),
494
    "disk"    -> (() => new DiskMessage),
495
    "create"  -> (() => new CreationMessage),
496
    "credits" -> (() => new AddCreditsMessage)
497
  )
498

    
499
  def apply(typ:String,map:Map[String,String]) : Message = {
500
    val msg = msgMap.getOrElse(typ,throw new Exception("Invalid type : "+typ))()
501
    msg.updateMap(map)
502
    msg
503
  }
504
}
505

    
506

    
507
class User(serverAndPort:String,month:Int) {
508
  val uid = "user%d@grnet.gr".format(UID.next)
509
  val _creationMessage  : Message = Message("create","uid"->uid,"month"->month.toString,"spec"->"")
510
  var _resources : List[Message] = Nil
511
  var _billEntryMsg :Option[BillEntryMsg] = None
512
  var _resMsgs = 0
513
  var _vmMsgs = 0
514
  var _addMsgs = 0
515
  var _messagesSent : List[Message] = Nil
516

    
517
  override def toString() = uid
518

    
519
  def scalaList[A](s:java.util.List[A]) : List[A] = {
520
    import scala.collection.JavaConverters.asScalaBufferConverter
521
    s.asScala.toList
522
  }
523

    
524
  def sumOf[A,D](l: java.util.List[A],start:D)(f:A=>D)(add:(D,D)=>D) : D =
525
    scalaList(l).map(f).foldLeft(start) {case (sum,v) =>  add(sum,v) }
526

    
527

    
528
  def checkSum[A,D](s:D,l: java.util.List[A],start:D)(f:A=>D)(add:(D,D)=>D) =
529
    check(s == sumOf(l,start)(f)(add))
530

    
531
  def check(b: => Boolean) = {
532
    if(!b)
533
      throw new Exception("Invalid property")
534
  }
535

    
536

    
537
  type S[A] = {def getTotalCredits : String
538
               def getTotalElapsedTime:String
539
               def getTotalUnits:String
540
               def getDetails:java.util.List[A]}
541
  type V = (Double,Long,Double)
542

    
543
  def valuesOf[A,T<:S[A]](t:T) : (V,java.util.List[A]) =
544
    ((t.getTotalCredits.toDouble,
545
      t.getTotalElapsedTime.toLong,
546
      t.getTotalUnits.toDouble),
547
      t.getDetails)
548

    
549
  def checkS[A,T<:S[A]](s:T)(f:A=>V)(add:(V,V)=>V) = {
550
    val zero = (0D,0L,0D)
551
    val (v0,v1) = valuesOf[A,T](s)
552
    checkSum(v0,v1,zero)(f)(add)
553
  }
554

    
555
  def add(a:V,b:V) : V= {
556
    val (a1,a2,a3) = a
557
    val (b1,b2,b3) = b
558
    //if(pos && b1 < 0.0D) a  else
559
      (a1+b1,a2+b2,a3+b3)
560
  }
561

    
562
  val zero  = (0.0D,0L,0.0D)
563

    
564
  def filterMessagesSent(serviceName:String) : List[Message] =
565
    _messagesSent.filter { (_,serviceName) match  {
566
       case (_:DiskMessage,"diskspace") => true
567
       case (_:VMMessage,"vmtime") => true
568
       case (_:AddCreditsMessage,"addcredits") => true
569
       case _ => false
570
    }}
571

    
572
  def checkMessages(serviceName:String,c:List[ChargeEntryMsg],m:List[Message]) = {
573
     if(m.length == 0) check(c.length == 0)
574
     else check(c.length == (serviceName match  {
575
       case "diskspace" =>  m.length - 1
576
       case "vmtime" => m.length - 1
577
       case "addcredits" => m.length
578
     }))
579
  }
580

    
581
  def validateChargeEntry(c:ChargeEntryMsg) : V = {
582
    (c.getTotalCredits.toDouble,c.getTotalElapsedTime.toLong,c.getTotalUnits.toDouble)
583
  }
584

    
585
  def validateEventEntry(serviceName:String,e:EventEntryMsg) : V = {
586
    val v1 = scalaList(e.getDetails)
587
    val v2 = filterMessagesSent(serviceName)
588
    checkMessages(serviceName,v1,v2)
589
    //val v3  = (e.getTotalCredits.toDouble,e.getTotalElapsedTime.toLong,e.getTotalUnits.toDouble)
590
    val v4 = sumOf(e.getDetails,zero)(validateChargeEntry)(add)
591
    v4
592
  }
593

    
594
  def validateResourceEntry(serviceName:String,r:ResourceEntryMsg) : V = {
595
    val v0  = (r.getTotalCredits.toDouble,r.getTotalElapsedTime.toLong,r.getTotalUnits.toDouble)
596
    val v1 = sumOf(r.getDetails,zero)(validateEventEntry(serviceName,_))(add)
597
    check(v0 == v1)
598
    v0
599
  }
600

    
601
  def validateServiceEntry(s:ServiceEntryMsg) : Double = {
602
    val v0  = (s.getTotalCredits.toDouble,s.getTotalElapsedTime.toLong,s.getTotalUnits.toDouble)
603
    val v1 = sumOf(s.getDetails,zero)(validateResourceEntry(s.getServiceName,_))(add)
604
    check(v0 == v1)
605
    v0._1
606
  }
607

    
608
  def validateBillEntry(b:BillEntryMsg) : Boolean = {
609
    try{
610
      check(b.getStatus == "ok")
611
      check(uid == b.getUserID)
612
      check(_creationMessage._range.from.getTime == b.getStartTime().toLong &&
613
            _creationMessage._range.to.getTime == b.getEndTime().toLong)
614
      check(b.getDeductedCredits.toDouble ==
615
            sumOf(b.getDetails,0.0D)(validateServiceEntry)(_ + _))
616
      true
617
    } catch {
618
      case e:Exception =>
619
        e.printStackTrace
620
        false
621
    }
622
  }
623

    
624
  def validateResults() : Boolean = {
625
    _billEntryMsg match {
626
      case None => false
627
      case Some(b) => validateBillEntry(b)
628
    }
629
    //throw new Exception("Not implemented !!!!")
630
  }
631

    
632
  def printMessages() = {
633
    Console.err.println("Messages sent:")
634
    for { m <- JsonLog.get}
635
      Console.err.println("%s".format(m)) //"\n==============\n%s\n==============="
636
    Console.err.println("\n=========================\n")
637
  }
638
  def printResponse() = {
639
    Console.err.println("Response:\n" + (_billEntryMsg match {
640
      case None => "NONE!!!!"
641
      case Some(r) => AvroHelpers.jsonStringOfSpecificRecord(r)
642
    }))
643
  }
644

    
645
  def add(no:Int,typ:String,args:Tuple2[String,String]*) : User =
646
    add(no,typ,args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
647

    
648
  def add(no:Int,typ:String,map:Map[String,String]) : User  =
649
    add(no,typ,{_ => map})
650

    
651
  def add(no:Int,typ:String,map:Int=>Map[String,String]) : User  = {
652
    for {i <- 1 to no} {
653
      val map0 : Map[String,String] = map(i) + ("uid"->uid) + ("month"->month.toString)
654
      _resources = Message(typ,map0) :: _resources
655
    }
656
    this
657
  }
658

    
659
  def addVMs(no:Int,cronSpec:String) : User =
660
    add(no,"vm",{i =>
661
         Map("instanceID"->"cyclades.vm.%d".format(i),
662
         "vmName"  -> "Virtual Machine #%d".format(i),
663
         "status"  -> "on", // initially "on" msg
664
         "spec"    -> cronSpec.format(month))})
665

    
666
  def addFiles(no:Int,action:String/*,value:Int,minVal:Int,maxVal:Int*/,spec:String) : User =
667
    add(no,"disk",{i =>
668
       //Console.err.println("Adding file : " + "/Papers/file_%d.PDF".format(i))
669
       Map("action" -> action,
670
           "path"->"/Papers/file_%d.PDF".format(i),
671
           //"value"->UID.random(minVal,maxVal).toString,
672
           "spec" -> spec.format(month)
673
          )
674
    })
675

    
676
  def addCredits(no:Int,spec:String) : User = {
677
    add(no,"credits",/*"month"->month.toString,"uid"->uid,*/"spec"->spec.format(month)
678
       /*,"amount"->amount.toString*/)
679
  }
680

    
681
  def run(ordered:Boolean,wait:Int,minFile:Int,maxFile:Int,minAmount:Int,maxAmount:Int,maxJSONRetry :Int,
682
          sendViaRabbitMQ:Boolean, sendDebugEnabled : Boolean)  =  {
683
    _messagesSent = Nil
684
    _creationMessage.send("month"->month.toString,"uid"->uid,"spec"->"0 0 * %d ?".format(month)) // send once!
685
    //Thread.sleep(2000)
686
    var iter = _resources.toList
687
    while(!iter.isEmpty)
688
      iter = (if(!ordered) iter
689
       else iter.sortWith{(m1,m2) => (m1.nextTime,m2.nextTime) match {
690
        case (Some(l1),Some(l2)) => l1 <= l2
691
        case (None,None) => true
692
        case (None,Some(l)) => true
693
        case (Some(l),None) => false
694
      }}).filter({m =>
695
        _messagesSent = _messagesSent ::: List(m)
696
        val b = m.send("value"->UID.random(minFile,maxFile).toString,
697
                       "amount"->UID.random(minAmount,maxAmount).toString,
698
                       "rabbitMQEnabled" -> sendViaRabbitMQ.toString,
699
                       "debugEnabled" -> sendDebugEnabled.toString
700
                       //"status" -> UID.random(List("off","on"))
701
                      )
702
        if(b) m match {
703
          case _:DiskMessage => _resMsgs += 1
704
          case _:VMMessage => _vmMsgs += 1
705
          case _:AddCreditsMessage => _addMsgs +=1
706
        }
707
        b
708
      })
709
    Thread.sleep(wait)
710
    _billEntryMsg = getBillResponse(maxJSONRetry)
711
  }
712

    
713
  private[this] def getBillResponse(max:Int) : Option[BillEntryMsg] = {
714
    def get () : String = {
715
      val fromMillis = _creationMessage._range.from.getTime
716
      val toMillis   = _creationMessage._range.to.getTime
717
      val url = " http://%s/user/%s/bill/%d/%d".format(serverAndPort,uid,fromMillis,toMillis)
718
      try{
719
        val in = new BufferedReader(
720
          new InputStreamReader(
721
            new URL(url).openConnection().
722
              getInputStream()))
723
        var inputLine = ""
724
        var ret = ""
725
        while ({inputLine = in.readLine();inputLine} != null)
726
          ret += (if(ret.isEmpty) "" else "\n")+ inputLine
727
        in.close()
728
        ret
729
      } catch {
730
        case e:Exception =>
731
          ""
732
      }
733
    }
734
    var resp = ""
735
    var count = 0
736
    var ret : Option[BillEntryMsg] = None
737
    while(resp.isEmpty && count < max){
738
      if(count > 0) Console.err.println("Retrying for bill request.")
739
      resp = get()
740
      if(resp.isEmpty) Thread.sleep(1000)
741
      else {
742
        try{
743
          var b = AvroHelpers.specificRecordOfJsonString(resp, new BillEntryMsg)
744
          ret = Some(b)
745
          if(b.getStatus().equals("processing")){
746
            Thread.sleep(1000)
747
            resp = ""
748
          }
749
        }  catch {
750
          case e:Exception =>
751
              e.printStackTrace
752
              resp = ""
753
        }
754
      }
755
      //sleep(1000L)
756
      count += 1
757
    }
758
    ret
759
  }
760
}
761

    
762
case class Resource(
763
   val resType  : String, // Message.msgMap.keys
764
   val instances: Long,
765
   val cronSpec : String
766
 )
767
extends JsonSupport {}
768

    
769
case class Scenario(
770
  val ignoreScenario : Boolean,
771
  val printMessages : Boolean,
772
  val printResponses: Boolean,
773
  val host : String,
774
  val port : Long,
775
  val sendOrdered : Boolean,
776
  val sendViaRabbitMQ : Boolean,
777
  val sendDebugEnabled : Boolean,
778
  val validationEnabled : Boolean,
779
  val billingMonth: Long,
780
  val aquariumStartWaitMillis : Long,
781
  val aquariumStopWaitMillis : Long,
782
  val billResponseWaitMillis : Long,
783
  val numberOfUsers  : Long,
784
  val numberOfResponseRetries : Long,
785
  val minFileCredits : Long,
786
  val maxFileCredits : Long,
787
  val minUserCredits : Long,
788
  val maxUserCredits : Long,
789
  val resources : List[Resource]
790
)
791
extends JsonSupport {}
792

    
793
case class Scenarios(
794
   val scenarios : List[Scenario] )
795
extends JsonSupport {}
796

    
797
object ScenarioRunner {
798
  val aquarium  = AquariumInstance.aquarium
799

    
800
  def parseScenario(txt:String) : Scenario =
801
    StdConverters.AllConverters.convertEx[Scenario](JsonTextFormat(txt))
802

    
803
  def parseScenarios(txt:String) : Scenarios =
804
    StdConverters.AllConverters.convertEx[Scenarios](JsonTextFormat(txt))
805

    
806
  def runScenario(txt:String) : Unit = runScenario(parseScenario(txt))
807

    
808
  private[this] def runUser(s:Scenario) : User = {
809
    val user = new User("%s:%d".format(s.host,s.port),s.billingMonth.toInt)
810
    val (minFileCredits,maxFileCredits) = (s.minFileCredits,s.maxFileCredits)
811
    val (minUserCredits,maxUserCredits) = (s.maxUserCredits,s.maxUserCredits)
812
    //Cron spec  minutes hours day-of-month Month Day-of-Week (we do not specify seconds)
813
    //AquariumInstance.run(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt) {
814
      for{ r <- s.resources}  // create messages
815
        r.resType match {
816
          case "vm" =>
817
            user.addVMs(r.instances.toInt,r.cronSpec)
818
          case "disk" =>
819
            user.addFiles(r.instances.toInt,"update",r.cronSpec)
820
          case "credits" =>
821
            user.addCredits(r.instances.toInt,r.cronSpec)
822
        }
823
      // run scenario
824
      user.run(s.sendOrdered,s.billResponseWaitMillis.toInt,s.minFileCredits.toInt,
825
               s.maxFileCredits.toInt,s.minUserCredits.toInt,s.maxUserCredits.toInt,
826
               s.numberOfResponseRetries.toInt,s.sendViaRabbitMQ,s.sendDebugEnabled)
827
    //}
828
    user
829
  }
830

    
831
  private[this] def runAquarium[A](billWait:Long,stop:Long,default:A)(forkJoinCode: => A) : A = {
832
     Console.err.println("Starting aquarium")
833
     AquariumInstance.aquarium.start
834
     Thread.sleep(billWait)
835
     Console.err.println("Starting aquarium  (%d seconds) --- DONE".format(billWait/1000))
836
     try{
837
       forkJoinCode
838
     } finally {
839
       Console.err.println("Stopping aquarium")
840
       AquariumInstance.aquarium.stop
841
       Thread.sleep(stop)
842
       Console.err.println("Stopping aquarium --- DONE")
843
       default
844
     }
845
  }
846

    
847
  def runScenario(s:Scenario): Unit = {
848
    if(s.ignoreScenario == false) {
849
      Console.err.println("=================\nRunning scenario:\n %s\n=======================\n".format(s.toJsonString))
850
      runAquarium(s.aquariumStartWaitMillis.toInt,s.aquariumStopWaitMillis.toInt,List[User]()){
851
        val tasks = for { u <- 1 to s.numberOfUsers.toInt}
852
                    yield scala.actors.Futures.future(runUser(s))
853
        tasks.map(_()).toList
854
      }.foreach{ u =>
855
        if(s.printMessages) u.printMessages()
856
        if(s.printResponses) u.printResponse()
857
        if(s.validationEnabled && u.validateResults() == false)
858
          Console.err.println("Validation FAILED for user " + u)
859
      }
860
      Console.err.println("\n=========================\nStopping scenario\n=======================")
861
    }
862
  }
863

    
864
  def runScenarios(txt:String) : Unit = runScenarios(parseScenarios(txt))
865

    
866
  def runScenarios(ss:Scenarios) = {
867
    Console.err.println("=================\nScenarios:\n %s\n=======================\n".format(ss.toJsonString))
868
    ss.scenarios.foreach(runScenario(_))
869
  }
870

    
871
}
872

    
873
object UserTest extends Loggable {
874
/*
875
    JSON example:
876
  {
877
  "scenarios":[{
878
    "ignoreScenario":false,
879
    "printMessages":false,
880
    "printResponses":true,
881
    "host":"localhost",
882
    "port":8888,
883
    "sendOrdered":true,
884
    "sendViaRabbitMQ":false,
885
    "sendDebugEnabled":false,
886
    "validationEnabled":false,
887
    "billingMonth":9,
888
    "aquariumStartWaitMillis":2000,
889
    "aquariumStopWaitMillis":2000,
890
    "billResponseWaitMillis":2000,
891
    "numberOfUsers":2,
892
    "numberOfResponseRetries":10,
893
    "minFileCredits":2000,
894
    "maxFileCredits":5000,
895
    "minUserCredits":10000,
896
    "maxUserCredits":50000,
897
    "resources":[{
898
      "resType":"credits",
899
      "instances":1,
900
      "cronSpec":"00 00 10,12 %d ?"
901
    },{
902
      "resType":"disk",
903
      "instances":1,
904
      "cronSpec":"00 18 15,20,29,30 %d ?"
905
    },{
906
      "resType":"vm",
907
      "instances":1,
908
      "cronSpec":"00 18 14,17,19,20 %d ?"
909
    }]
910
  }]
911
}
912
 */
913
 val basic = new Scenario(false,false,true,"localhost",8888,true,false,false,false,9,2000,2000,2000,
914
                          1,10,2000,5000,10000,50000,List[Resource](
915
                          new Resource("credits",1, "00 00 10,12 %d ?".format(9)),
916
                          new Resource("disk",1,"00 18 15,20,29,30 %d ?".format(9)),
917
                          new Resource("vm",1,"00 18 14,17,19,20 %d ?".format(9))
918
                        ))
919

    
920
 def main(args: Array[String]) = {
921

    
922
   try{
923
     val lines = scala.io.Source.fromFile(args.head).mkString
924
     ScenarioRunner.runScenarios(lines)
925
   } catch {
926
     case e:Exception =>
927
       e.printStackTrace()
928
       ScenarioRunner.runScenarios(new Scenarios(List(basic)))
929
   }
930

    
931

    
932
/*    val user = new User("localhost:8888",9)
933
    val (minFileCredits,maxFileCredits) = (2000,5000)
934
    val (minUserCredits,maxUserCredits) = (10000,10000)
935
    //Cron spec  minutes hours day-of-month Month Day-of-Week (we do not specify seconds)
936

    
937
   val json =AquariumInstance.run(2000,2000) {
938
          user.
939
                  addCredits(1,"00 00 10,12 9 ?").
940
                  addFiles(1,"update",2000,1000,3000,"00 18 15,20,29,30 9 ?").
941
                  addVMs(1,"00 18 14,17,19,20 9 ?").
942
                  //addVMs(5,"on","00 18 ? 9 Tue")
943
                 run(true,2000,minFileCredits,maxFileCredits,minUserCredits,maxUserCredits)
944
   }
945
   Thread.sleep(2000)
946
   Console.err.println("Messages sent:")
947
   for { m <- JsonLog.get}
948
     Console.err.println("%s".format(m)) //"\n==============\n%s\n==============="
949
   Console.err.println("\n=========================\n")
950
   Console.err.println("Response:\n" + json)*/
951
 }
952

    
953
}
954

    
955

    
956
/*
957
object BillTest extends Loggable {
958

    
959
  type JSON = String
960
  type UID  = Long
961
  type DATE = String
962

    
963
  private[this] val counter = new AtomicLong(0L)
964
  private[this] def nextID() = counter.getAndIncrement
965

    
966
  private [this] val format = new SimpleDateFormat("HH/mm/s/dd/MM/yyyy");
967

    
968
  val propsfile = new FileStreamResource(new File("aquarium.properties"))
969

    
970
  var props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
971

    
972
  val (astakosExchangeName,astakosRoutingKey) = ("astakos","astakos.user")
973

    
974
  val (pithosExchangeName,pithosRoutingKey) = ("pithos","pithos.resource.diskspace")
975

    
976
  val aquarium = {
977
      exec("mongo aquarium --eval db.resevents.remove();db.imevents.remove();db.policies.remove();db.userstates.remove()",
978
           Console.err.println(_))
979
      new AquariumBuilder(props, ResourceLocator.DefaultPolicyMsg).
980
      //update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
981
      update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))).
982
      build()
983
  }
984

    
985

    
986
  private[this] def exec(cmd : String,func : String=>Unit) : Unit = {
987
    val commands = cmd.split(" ")
988
    val proc = new ProcessBuilder(commands: _*).redirectErrorStream(true).start();
989
    val ins = new java.io.BufferedReader(new java.io.InputStreamReader(proc.getInputStream))
990
    val sb = new StringBuilder
991

    
992
    //spin off a thread to read process output.
993
    val outputReaderThread = new Thread(new Runnable(){
994
      def run : Unit = {
995
        var ln : String = null
996
        while({ln = ins.readLine; ln != null})
997
          func(ln)
998
      }
999
    })
1000
    outputReaderThread.start()
1001

    
1002
    //suspense this main thread until sub process is done.
1003
    proc.waitFor
1004

    
1005
    //wait until output is fully read/completed.
1006
    outputReaderThread.join()
1007

    
1008
    ins.close()
1009
  }
1010

    
1011

    
1012
  private [this] def createUser(date:DATE) : (JSON,UID) = {
1013
    val mid = nextID
1014
    val id = "im.%d.create.user".format(mid)
1015
    val millis = format.parse(date).getTime
1016
    val occurredMillis = millis
1017
    val receivedMillis = millis
1018
    val userID = "user%d@grnet.gr".format(mid)
1019
    val clientID = "astakos"
1020
    val isActive = false
1021
    val role = "default"
1022
    val eventVersion = "1.0"
1023
    val eventType = "create"
1024

    
1025
    val msg = MessageFactory.newIMEventMsg(id,occurredMillis,receivedMillis,userID, clientID, isActive,role,eventVersion,eventType)
1026
    val json = AvroHelpers.jsonStringOfSpecificRecord(msg)
1027
    (json, mid)
1028
  }
1029

    
1030
  private [this] def addCredits(date:DATE,uid:UID,amount:Long) : JSON = {
1031
    val id = "im.%d.add.credits".format(nextID)
1032
    val millis = format.parse(date).getTime
1033
    val occurredMillis = millis
1034
    val receivedMillis = millis
1035
    val userID = "user%d@grnet.gr".format(uid)
1036
    val clientID = "astakos"
1037
    val isActive = false
1038
    val eventVersion = "1.0"
1039
    val resource = "addcredits"
1040
    val instanceID = "addcredits"
1041

    
1042
    val msg = MessageFactory.newResourceEventMsg(id, occurredMillis, receivedMillis, userID, clientID, resource, instanceID, amount.toString, eventVersion)
1043
    val json = AvroHelpers.jsonStringOfSpecificRecord(msg)
1044
    json
1045
  }
1046

    
1047
  private [this] def makePithos(date:DATE,uid:UID,path:String,
1048
                                value:Double,action:String) : JSON = {
1049
    val id = "rc.%d.object.%s".format(nextID,action)
1050
    val millis = format.parse(date).getTime
1051
    val occurredMillis = millis
1052
    val receivedMillis = millis
1053
    val userID = "user%d@grnet.gr".format(uid)
1054
    val clientID = "pithos"
1055
    val resource ="diskspace"
1056
    val instanceID = "1"
1057
    val eventVersion = "1.0"
1058
    val details = MessageFactory.newDetails(
1059
      MessageFactory.newStringDetail("action", "object %s".format(action)),
1060
      MessageFactory.newStringDetail("total", "0.0"),
1061
      MessageFactory.newStringDetail("user", userID),
1062
      MessageFactory.newStringDetail("path", path)
1063
    )
1064

    
1065
    val msg = MessageFactory.newResourceEventMsg(id, occurredMillis, receivedMillis, userID, clientID, resource, instanceID, value.toString, eventVersion, details)
1066
    val json = AvroHelpers.jsonStringOfSpecificRecord(msg)
1067
    json
1068
  }
1069

    
1070
  private[this] def sendCreate(date:DATE) : UID = {
1071
    val (json,uid) = createUser(date)
1072
    aquarium(Aquarium.EnvKeys.rabbitMQProducer).
1073
    sendMessage(astakosExchangeName,astakosRoutingKey,json)
1074
    Console.err.println("Sent message:\n%s\n".format(json))
1075
    uid
1076
  }
1077

    
1078
  private[this] def sendAddCredits(date:DATE,uid:UID,amount:Long) = {
1079
    val json = addCredits(date,uid,amount)
1080
    aquarium(Aquarium.EnvKeys.rabbitMQProducer).
1081
    sendMessage(astakosExchangeName,astakosRoutingKey,
1082
                json)
1083
    Console.err.println("Sent message:\n%s\n".format(json))
1084
  }
1085

    
1086
  private[this] def sendPithos(date:DATE,uid:UID,path:String,
1087
                               value:Double,action:String) = {
1088
    val json = makePithos(date,uid,path,value,action)
1089
    aquarium(Aquarium.EnvKeys.rabbitMQProducer).
1090
    sendMessage(pithosExchangeName,pithosRoutingKey,
1091
                json)
1092
    Console.err.println("Sent message:\n%s\n".format(json))
1093
  }
1094

    
1095
  private[this] def jsonOf(url:String) : JSON = {
1096
     val in = new BufferedReader(
1097
                         new InputStreamReader(
1098
                         new URL(url).openConnection().
1099
                         getInputStream()))
1100
      var inputLine = ""
1101
      var ret = ""
1102
      while ({inputLine = in.readLine();inputLine} != null)
1103
        ret += (if(ret.isEmpty) "" else "\n")+ inputLine
1104
      in.close()
1105
      ret
1106
  }
1107

    
1108
  private[this] def getBill(uid:Long,from:String,to:String) : JSON = {
1109
    val fromMillis = format.parse(from).getTime
1110
    val toMillis   = format.parse(to).getTime
1111
    val billURL = " http://localhost:8888/user/user%d@grnet.gr/bill/%d/%d".format(uid,fromMillis,toMillis)
1112
    try{
1113
      jsonOf(billURL)
1114
    } catch {
1115
      case e:Exception =>
1116
        ""
1117
    }
1118
  }
1119

    
1120
  private[this] def sleep(l:Long) = {
1121
  try {
1122
      Thread.sleep(l)
1123
    } catch {
1124
      case ex:InterruptedException =>
1125
        Thread.currentThread().interrupt()
1126
    }
1127
  }
1128

    
1129

    
1130
  private[this] def testCase1() : JSON  = {
1131
    /* GET BILL FROM TO*/
1132
    val billFromDate = "00/00/00/01/08/2012"
1133
    val billToDate= "23/59/59/31/08/2012"
1134
    /* USER Creation */
1135
    val creationDate = "15/00/00/03/08/2012"
1136
    /* ADD CREDITS */
1137
    val addCreditsDate = "18/15/00/05/08/2012"
1138
    val creditsToAdd = 6000
1139
    /* Pithos STUFF */
1140
    val pithosPath = "/Papers/GOTO_HARMFUL.PDF"
1141

    
1142
    val pithosDate1 = "20/30/00/05/08/2012"
1143
    val pithosAction1 = "update"
1144
    val pithosValue1 = 2000
1145

    
1146

    
1147
    val pithosDate2 = "21/05/00/15/08/2012"
1148
    val pithosAction2 = "update"
1149
    val pithosValue2 = 4000
1150

    
1151

    
1152
    val pithosDate3 = "08/05/00/20/08/2012"
1153
    val pithosAction3 = "update"
1154
    val pithosValue3 = 100
1155

    
1156
    val id =
1157
      sendCreate(creationDate)
1158
      //Thread.sleep(5000)
1159
      sendAddCredits(addCreditsDate,id,creditsToAdd)
1160
      //Thread.sleep(5000)
1161
      sendPithos(pithosDate1,id,pithosPath,pithosValue1,pithosAction1)
1162
      //Thread.sleep(5000)
1163
      sendPithos(pithosDate2,id,pithosPath,pithosValue2,pithosAction2)
1164
      //
1165
      sendPithos(pithosDate3,id,pithosPath,pithosValue3,pithosAction3)
1166

    
1167

    
1168
    Console.err.println("Waiting for stuff to be processed")
1169
    Thread.sleep(5000)
1170

    
1171
    var resp = ""
1172
    var count = 0
1173
    while(resp.isEmpty && count < 5){
1174
      if(count > 0) Console.err.println("Retrying for bill request.")
1175
      resp = getBill(id,billFromDate,billToDate)
1176
      if(resp.isEmpty) Thread.sleep(1000)
1177
      //sleep(1000L)
1178
      count += 1
1179
    }
1180
    Console.err.println("Sending URL done")
1181
    resp
1182
  }
1183

    
1184
  def runTestCase(f: => JSON) = {
1185
    var json = ""
1186
    aquarium.start
1187
    Thread.sleep(2000)
1188
    try{
1189
      json = f
1190
    }  catch{
1191
      case e:Exception =>
1192
        e.printStackTrace
1193
    }
1194
    aquarium.stop
1195
    Thread.sleep(1000)
1196
    Console.err.println("Response : " + json )
1197
  }
1198

    
1199
  def main(args: Array[String]) = {
1200
    //Console.err.println("JSON: " +  (new BillEntry).toJsonString)
1201
    runTestCase(testCase1)
1202
  }
1203
}
1204
*/