Revision 137fd476

b/src/main/scala/gr/grnet/aquarium/policy/CronSpec.scala
77 77
        val (min,max,e) = (min0.getTime,max0.getTime,d1.getTime)
78 78
        if(e < min || e>max)
79 79
          None
80
        else
81
          Some({assert(d1.getTime>=d.getTime);d1})
80
        else {
81
          assert(d1.getTime>=d.getTime)
82
          Some(d1)
83
        }
82 84
    }
83 85

  
84 86
  override def toString : String = cronSpec
b/src/test/scala/gr/grnet/aquarium/BillTest.scala
2 2

  
3 3
import com.ckkloverdos.resource.FileStreamResource
4 4
import converter.StdConverters
5
import event.model.ExternalEventModel
5 6
import event.model.im.StdIMEvent
6 7
import event.model.resource.StdResourceEvent
7 8
import java.io.{InputStreamReader, BufferedReader, File}
8 9
import com.ckkloverdos.props.Props
10
import logic.accounting.dsl.Timeslot
9 11
import store.memory.MemStoreProvider
10 12
import java.util.concurrent.atomic.AtomicLong
11 13
import java.text.SimpleDateFormat
12 14
import java.net.{URLConnection, URL}
13 15
import util.Loggable
16
import java.util.{GregorianCalendar, Date,Calendar}
17
import gr.grnet.aquarium.policy.CronSpec
18
import scala.Tuple2
19
import scala.Tuple2
14 20

  
15 21
/*
16 22
* Copyright 2011-2012 GRNET S.A. All rights reserved.
......
51 57
/*
52 58
* @author Prodromos Gerakios <pgerakios@grnet.gr>
53 59
*/
60

  
61

  
62
object UID {
63
  private[this] val counter = new AtomicLong(0L)
64
  def next() = counter.getAndIncrement
65
  def random(min:Int=Int.MinValue,max:Int=Int.MaxValue) =
66
      min + (scala.math.random.toInt % (max+1)) % (max+1)
67

  
68
  def random[A](l:List[A]) : A = {
69
    val sz = l.size
70
    if(sz==0) throw new Exception("random")
71
     l(random(0,sz-1))
72
  }
73
}
74

  
75
object Process {
76
  private[this] def exec(cmd : String,func : String=>Unit) : Unit = {
77
    val commands = cmd.split(" ")
78
    val proc = new ProcessBuilder(commands: _*).redirectErrorStream(true).start();
79
    val ins = new java.io.BufferedReader(new java.io.InputStreamReader(proc.getInputStream))
80
    val sb = new StringBuilder
81

  
82
    //spin off a thread to read process output.
83
    val outputReaderThread = new Thread(new Runnable(){
84
      def run : Unit = {
85
        var ln : String = null
86
        while({ln = ins.readLine; ln != null})
87
          func(ln)
88
      }
89
    })
90
    outputReaderThread.start()
91

  
92
    //suspense this main thread until sub process is done.
93
    proc.waitFor
94

  
95
    //wait until output is fully read/completed.
96
    outputReaderThread.join()
97

  
98
    ins.close()
99
  }
100
  def exec(cmd:String) : Unit = exec(cmd,Console.err.println(_))
101
}
102

  
103
object Mongo {
104
  def clear = Process.exec("mongo aquarium --eval db.resevents.remove();db.imevents.remove();db.policies.remove();db.userstates.remove()")
105
}
106

  
107
object AquariumInstance {
108
  val propsfile = new FileStreamResource(new File("aquarium.properties"))
109
  var props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
110
  val aquarium = {
111
    Mongo.clear
112
    new AquariumBuilder(props, ResourceLocator.DefaultPolicyModel).
113
      //update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
114
      update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))).
115
      build()
116
  }
117
  def run(f : => String) : String = {
118
    var _ret = ""
119
    aquarium.start
120
    Thread.sleep(4)
121
    try{
122
      _ret = f
123
    } finally {
124
      Console.err.println("Stopping aquarium")
125
      Thread.sleep(15)
126
      Console.err.println("Stopping aquarium --- DONE")
127
      aquarium.stop
128
    }
129
    _ret
130
  }
131
}
132

  
133

  
134
abstract class Message {
135
  val dbg = true
136
  val cal =   new GregorianCalendar
137
  var _range : Timeslot = null
138
  var _cronSpec : CronSpec = null
139
  var _messagesSent = 0
140
  var _done = false
141
  var _map = Map[String,String]()
142

  
143
  def updateMap(args:Tuple2[String,String]*) : Message  =
144
    updateMap(args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
145

  
146
  def updateMap(map:Map[String,String]) : Message = {
147
    def mergeMap[A, B](ms: List[Map[A, B]])(f: (B, B) => B): Map[A, B] =
148
      (Map[A, B]() /: (for (m <- ms; kv <- m) yield kv)) { (a, kv) =>
149
        a + (if (a.contains(kv._1)) kv._1 -> f(a(kv._1), kv._2) else kv)
150
    }
151
    _map =  mergeMap(List(_map,map))((v1,v2) => v2)
152
    (_map.get("month"),_map.get("spec")) match {
153
      case (Some((month0:String)),Some(spec)) =>
154
        val month : Int = month0.toInt
155
        if((_cronSpec==null ||  _cronSpec.cronSpec != spec ||cal.get(Calendar.MONTH) != month -1)) {
156
           val d1 = getDate(1,if(month==12) 1 else month+1,year,0,0,0)
157
           val d0 = getDate(1,month,year,0,0,0)
158
           _range = Timeslot(d0,d1 - 1000)
159
          _cronSpec = new CronSpec(if(spec.isEmpty) "* * * * *" else spec)
160
        }
161
      case _ => ()
162
    }
163
    this
164
  }
165

  
166
  def done = _done
167
  def sentMessages = _messagesSent
168

  
169
  def nextTime : Option[Long] = {
170
    _cronSpec match{
171
      case null =>
172
        None
173
      case _ =>
174
        _cronSpec.nextValidDate(_range,cal.getTime) match {
175
          case Some(d) =>
176
            val millis = d.getTime
177
            cal.setTimeInMillis(millis)
178
            Some(millis)
179
          case None    =>
180
            None
181
        }
182
    }
183
  }
184

  
185
  def year : Int = {
186
    cal.setTimeInMillis(System.currentTimeMillis())
187
    cal.get(Calendar.YEAR)
188
  }
189

  
190
  def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int,sec:Int) : Long = {
191
    cal.set(year,month-1,day,hour,min,sec)
192
    cal.getTimeInMillis
193
  }
194

  
195
  def getMillis : Long = cal.getTimeInMillis
196

  
197
  def getDate(day:Int,month:Int,year:Int,hour:Int,min:Int) : Long =
198
    getDate(day,month,year,hour,min,0)
199

  
200
  def setMillis(millis:Long) = {
201
    cal.setTimeInMillis(millis)
202
  }
203

  
204
  def addMillis(day:Int,hour:Int) = {
205
    cal.roll(Calendar.DATE,day)
206
    cal.roll(Calendar.DATE,hour)
207
  }
208

  
209
  def nextID = UID.next
210

  
211
  def makeEvent(millis:Long,map:Map[String,String]) : ExternalEventModel
212

  
213
  def send(args:Tuple2[String,String]*) : Boolean =
214
    send(args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
215

  
216
  def send(map:Map[String,String]) : Boolean = {
217
    nextTime match {
218
      case Some(millis) =>
219
        updateMap(map)
220
        val event = makeEvent(millis,_map)
221
        val (exchangeName,routingKey) = event match {
222
          case rc:StdResourceEvent => rc.resource match {
223
            case "vmtime" =>
224
              ("cyclades","cyclades.resource.vmtime")
225
            case "diskspace" =>
226
              ("pithos","pithos.resource.diskspace")
227
            case _ =>
228
              throw new Exception("send cast failed")
229
          }
230
          case im:StdIMEvent =>
231
            ("astakos","astakos.user")
232
          case _ =>
233
            throw new Exception("send cast failed")
234
        }
235
        val json = event.toJsonString
236
        AquariumInstance.aquarium(Aquarium.EnvKeys.rabbitMQProducer).
237
        sendMessage(exchangeName,routingKey,json)
238
        if(dbg)Console.err.println("Sent message:\n%s\n".format(json))
239
        _messagesSent += 1
240
        true
241
      case None =>
242
        _done = true
243
        false
244
    }
245
  }
246

  
247
}
248

  
249
class DiskMessage extends Message {
250
  /*
251
   *  map:
252
   *      "action" -> "update" , "delete" , "purge"
253
   *      "uid"    ->
254
   *      "path"   ->
255
   *      "value"  ->
256
   */
257
  def makeEvent(millis:Long,map:Map[String,String]) : ExternalEventModel = {
258
      val action = map("action")
259
      val uid    = map("uid")
260
      val path   = map("path")
261
      val value  = map("value").toLong
262
      val id = "rc.%d.object.%s".format(nextID,action)
263
      val occurredMillis = millis
264
      val receivedMillis = millis
265
      val userID = uid //"user%s@grnet.gr".format(uid)
266
      val clientID = "pithos"
267
      val resource ="diskspace"
268
      val instanceID = "1"
269
      val eventVersion = "1.0"
270
      val details = Map("action" -> "object %s".format(action),
271
                        "total"  -> "0.0",
272
                        "user"   -> userID,
273
                        "path"   -> path)
274
      new StdResourceEvent(id,occurredMillis,receivedMillis,userID,
275
                           clientID,resource,instanceID,value,
276
                           eventVersion,details)
277
  }
278
}
279

  
280
class VMMessage extends Message {
281
  /*
282
   *   map:
283
   *      uid        -> unique id for user
284
   *      instanceID -> "cyclades.vm.kJSOLek"
285
   *      vmName     -> "My Lab VM"
286
   *      status     ->  "on", "off" , "destroy"
287
   */
288
  var _status = "off"
289
  def nextStatus = {
290
    if(_status=="off") _status = "on" else _status = "off"
291
    _status
292
  }
293
  def makeEvent(millis:Long,map:Map[String,String]) : ExternalEventModel = {
294
    val uid    = map("uid")
295
    val value  = nextStatus /* map("status") match {
296
       case "on" => 1.0
297
       case "off" => 0.0
298
       case "destroy" => 2.0
299
       case x => throw new Exception("VMMessage bad status: %s".format(x))
300
      }*/
301
    val id = "rc.%d.vmtime".format(nextID)
302
    val occurredMillis = millis
303
    val receivedMillis = millis
304
    val userID = uid // "user%s@grnet.gr".format(uid)
305
    val clientID = "cyclades"
306
    val resource ="vmtime"
307
    val instanceID = map("instanceID")
308
    val eventVersion = "1.0"
309
    val details = Map("VM Name" -> map("vmName"))
310
    new StdResourceEvent(id,occurredMillis,receivedMillis,userID,clientID,
311
                         resource,instanceID,value.toDouble,eventVersion,details)
312
  }
313
 }
314

  
315
class CreationMessage extends Message {
316
  /*
317
   *  map contains:
318
   *   uid -> user id
319
   */
320
  def makeEvent(millis:Long,map:Map[String,String]) : ExternalEventModel = {
321
    val uid    = map("uid")     //
322
    val id = "im.%d.create.user".format(nextID)
323
    val occurredMillis = millis
324
    val receivedMillis = millis
325
    val userID =  uid //"user%d@grnet.gr".format(mid)
326
    val clientID = "astakos"
327
    val isActive = false
328
    val role = "default"
329
    val eventVersion = "1.0"
330
    val eventType = "create"
331
    new StdIMEvent(id,occurredMillis,receivedMillis,userID,
332
                  clientID,isActive,role,eventVersion,eventType,
333
                  Map())
334
  }
335
}
336

  
337
class AddCreditsMessage extends Message {
338
  /*
339
   *  map contains:
340
   *    amount -> "2000"
341
   *    uid    -> loverdos1
342
   */
343
  def makeEvent(millis:Long,map:Map[String,String]) : ExternalEventModel = {
344
    val uid    = map("uid")     //
345
    val amount = map("amount")
346
    val id = "im.%d.add.credits".format(nextID)
347
    val occurredMillis = millis
348
    val receivedMillis = millis
349
    val userID = uid //"user%d@grnet.gr".format(uid)
350
    val clientID = "astakos"
351
    val isActive = false
352
    val role = "default"
353
    val eventVersion = "1.0"
354
    val eventType = "addcredits"
355
    new StdIMEvent(id,occurredMillis,receivedMillis,userID,
356
                   clientID,isActive,role,eventVersion,eventType,
357
                   Map("credits" -> amount.toString))
358
  }
359
}
360

  
361
object Message {
362
  def apply(typ:String,args:Tuple2[String,String]*) : Message =
363
    apply(typ,args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
364

  
365
  def apply(typ:String,map:Map[String,String]) : Message = {
366
    val msg =  typ match {
367
      case "vm" => new VMMessage
368
      case "disk" =>   new DiskMessage
369
      case "create" => new CreationMessage
370
      case "credits" => new AddCreditsMessage
371
      case _ => throw new Exception("unknown type")
372
    }
373
    msg.updateMap(map)
374
    msg
375
  }
376
}
377

  
378

  
379
class User(serverAndPort:String,month:Int) {
380
  val uid = "user%d@grnet.gr".format(UID.next)
381
  val _creationMessage  : Message = Message("create","uid"->uid,"month"->month.toString,"spec"->"")
382
  var _resources : List[Message] = Nil
383

  
384

  
385

  
386
  def add(no:Int,typ:String,args:Tuple2[String,String]*) : User =
387
    add(no,typ,args.foldLeft(Map[String,String]())({(map,arg)=> map + arg}))
388

  
389
  def add(no:Int,typ:String,map:Map[String,String]) : User  =
390
    add(no,typ,{_ => map})
391

  
392
  def add(no:Int,typ:String,map:Int=>Map[String,String]) : User  = {
393
    for {i <- 1 to no} {
394
      val map0 : Map[String,String] = map(i) + ("uid"->uid) + ("month"->month.toString)
395
      _resources = Message(typ,map0) :: _resources
396
    }
397
    this
398
  }
399

  
400
  def addVMs(no:Int,status:String,cronSpec:String) : User =
401
    add(no,"vm",{i =>
402
         Map("instanceID"->"cyclades.vm.%d".format(i),
403
         "vmName"  -> "Virtual Machine #%d".format(i),
404
         "status"  -> status,
405
         "spec"    -> cronSpec)})
406

  
407
  def addFiles(no:Int,action:String,value:Int,minVal:Int,maxVal:Int,spec:String) : User =
408
    add(no,"disk",{i =>
409
       Map("action" -> action,
410
           "path"->"/Papers/file_%d.PDF".format(i),
411
           "value"->UID.random(minVal,maxVal).toString,
412
           "spec" -> spec
413
          )
414
    })
415

  
416
  def addCredits(amount:Int,spec:String) : User = {
417
    add(1,"credits","month"->month.toString,"uid"->uid,"spec"->spec,"amount"->amount.toString)
418
  }
419

  
420
  def run(minFile:Int,maxFile:Int,minAmount:Int,maxAmount:Int,maxJSONRetry:Int=10) : String =  {
421
    _creationMessage.send("month"->month.toString,"uid"->uid,"spec"->"0 0 * %d ?".format(month)) // send once!
422
    var iter = _resources.toList
423
    var done = false
424
    while(!iter.isEmpty){
425
      iter = _resources.filterNot(_.done)
426
      for{i<-iter}
427
        i.send("value"->UID.random(minFile,maxFile).toString,
428
               "amount"->UID.random(minAmount,maxAmount).toString //,
429
               //"status" -> UID.random(List("off","on"))
430
               )
431
    }
432
    getJSON(maxJSONRetry)
433
  }
434

  
435
  def getJSON(max:Int=10) : String = {
436
    def get () : String = {
437
      val fromMillis = _creationMessage._range.from.getTime
438
      val toMillis   = _creationMessage._range.to.getTime
439
      val url = " http://%s/user/%s/bill/%d/%d".format(serverAndPort,uid,fromMillis,toMillis)
440
      try{
441
        val in = new BufferedReader(
442
          new InputStreamReader(
443
            new URL(url).openConnection().
444
              getInputStream()))
445
        var inputLine = ""
446
        var ret = ""
447
        while ({inputLine = in.readLine();inputLine} != null)
448
          ret += (if(ret.isEmpty) "" else "\n")+ inputLine
449
        in.close()
450
        ret
451
      } catch {
452
        case e:Exception =>
453
          ""
454
      }
455
    }
456
    var resp = ""
457
    var count = 0
458
    while(resp.isEmpty && count < max){
459
      if(count > 0) Console.err.println("Retrying for bill request.")
460
      resp = get()
461
      if(resp.isEmpty) Thread.sleep(1000)
462
      //sleep(1000L)
463
      count += 1
464
    }
465
    resp
466
  }
467
}
468

  
469
object UserTest extends Loggable {
470

  
471
 val aquarium  = AquariumInstance.aquarium
472

  
473
 def main(args: Array[String]) = {
474
    val user = new User("localhost:8888",9)
475
    val (minFileCredits,maxFileCredits) = (2000,5000)
476
    val (minUserCredits,maxUserCredits) = (10000,10000)
477
    //Cron spec  minutes hours day-of-month Month Day-of-Week (we do not specify seconds)
478

  
479
   val json =AquariumInstance.run {
480
          user.
481
                    addCredits(10000,"00 00 ? 9 Sat").
482
                    addFiles(1,"update",2000,1000,3000,"00 18 ? 9 Tue").
483
                    //addVMs(1,"on","00 18 ? 9 Mon").
484
                    //addVMs(5,"on","00 18 ? 9 Tue")
485
                    run(minFileCredits,maxFileCredits,minUserCredits,maxUserCredits)
486
   }
487
   Thread.sleep(2000)
488
   Console.err.println("Response:\n" + json)
489
 }
490

  
491
}
492

  
493

  
494

  
54 495
object BillTest extends Loggable {
55 496

  
56 497
  type JSON = String
......
217 658
    }
218 659
  }
219 660

  
661

  
220 662
  private[this] def testCase1() : JSON  = {
221 663
    /* GET BILL FROM TO*/
222 664
    val billFromDate = "00/00/00/01/08/2012"

Also available in: Unified diff