Fixed RabbitMQProducer. Added BillTest.scala that simulates IM and RC events.
authorProdromos Gerakios <pgerakios@grnet.gr>
Tue, 28 Aug 2012 09:01:04 +0000 (12:01 +0300)
committerProdromos Gerakios <pgerakios@grnet.gr>
Tue, 28 Aug 2012 09:01:04 +0000 (12:01 +0300)
bill.sh
src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQProducer.scala
src/main/scala/gr/grnet/aquarium/service/AkkaService.scala
src/test/scala/gr/grnet/aquarium/BillTest.scala [new file with mode: 0644]

diff --git a/bill.sh b/bill.sh
index f9471b3..ef88c04 100755 (executable)
--- a/bill.sh
+++ b/bill.sh
@@ -74,13 +74,13 @@ RESOURCE2="{
 #rabbitmqadmin -H dev82.dev.grnet.gr -P 55672 -u rabbit -p r@bb1t publish  routing_key=astakos.user exchange=astakos < __addcredits.json
 #rabbitmqadmin -H dev82.dev.grnet.gr -P 55672 -u rabbit -p r@bb1t publish  routing_key=pithos.resource.diskspace exchange=pithos < __rc.1.loverdos.json
 
- echo $USERCREATE | rabbitmqadmin -H dev82.dev.grnet.gr -P 55672 -u rabbit -p r@bb1t publish  routing_key=astakos.user exchange=astakos
+ echo "$USERCREATE" | rabbitmqadmin -H dev82.dev.grnet.gr -P 55672 -u rabbit -p r@bb1t publish  routing_key=astakos.user exchange=astakos
 # sleep 1
- echo $ADDCREDITS | rabbitmqadmin -H dev82.dev.grnet.gr -P 55672 -u rabbit -p r@bb1t publish  routing_key=astakos.user exchange=astakos 
+ echo "$ADDCREDITS" | rabbitmqadmin -H dev82.dev.grnet.gr -P 55672 -u rabbit -p r@bb1t publish  routing_key=astakos.user exchange=astakos 
 # sleep 1
- echo $RESOURCE1  | rabbitmqadmin -H dev82.dev.grnet.gr -P 55672 -u rabbit -p r@bb1t publish  routing_key=pithos.resource.diskspace exchange=pithos
+ echo "$RESOURCE1"  | rabbitmqadmin -H dev82.dev.grnet.gr -P 55672 -u rabbit -p r@bb1t publish  routing_key=pithos.resource.diskspace exchange=pithos
 # sleep 1
- echo $RESOURCE2  | rabbitmqadmin -H dev82.dev.grnet.gr -P 55672 -u rabbit -p r@bb1t publish  routing_key=pithos.resource.diskspace exchange=pithos
+ echo "$RESOURCE2"  | rabbitmqadmin -H dev82.dev.grnet.gr -P 55672 -u rabbit -p r@bb1t publish  routing_key=pithos.resource.diskspace exchange=pithos
 
 #read
 MAINCLASS=gr.grnet.aquarium.charging.bill.BillEntry
@@ -89,6 +89,6 @@ PID=$!
 echo "PID: $PID"
 sleep 2
 curl -s "$BILLREQUEST"
-kill -9 $!
+kill -9 $PID 
 echo -e "\nKilling aquarium. Success (0=ok)  => $?" 
 #sleep 1
index b8f5156..6a5f6b6 100644 (file)
@@ -400,7 +400,11 @@ class UserActor extends ReflectiveRoleableActor {
       updateLatestResourceEventIDFrom(rcEvent)
     }
 
-    val oldTotalCredits = this._workingUserState.totalCredits
+    val oldTotalCredits =
+      if(this._workingUserState!=null)
+        this._workingUserState.totalCredits
+      else
+        0.0D
     // FIXME check these
     if(nowYear != eventYear || nowMonth != eventMonth) {
       DEBUG(
index b591287..2c70a66 100644 (file)
@@ -56,12 +56,13 @@ import collection.mutable
 
 private class RabbitMQProducerActor extends Actor {
   def receive = {
-    case sendMessage:Function1[_,_] =>
+    case sendMessage: Function0[_] =>
       //Console.err.println("Executing msg ... " + sendMessage.hashCode)
       sendMessage.asInstanceOf[()=>Unit]()
-    case x  : AnyRef     =>
-      //Console.err.println("Dammit  ..." + x.getClass.getSimpleName)
-      ()
+    case x : AnyRef =>
+      //Console.err.println("Dammit  ..." + x.getClass.getSuperclass.getName)
+      throw new Exception("Unexpected value in RabbitMQProducerActor with type: " +
+                          x.getClass.getSuperclass.getName)
   }
 }
 
@@ -107,7 +108,7 @@ class RabbitMQProducer extends AquariumAwareSkeleton with Configurable with Logg
         _actorRef =  aquarium.akkaService.createNamedActor[RabbitMQProducerActor]("RabbitMQProducerActor")
       }
       if(_actorRef != null){
-       //Console.err.println("RabbitMQProducer Timer --> messages ...")
+      // Console.err.println("RabbitMQProducer Timer --> messages ...")
        var msgs : mutable.Queue[()=>Unit] = null
        lock.withLock {
           if(isChannelOpen) {
@@ -119,7 +120,7 @@ class RabbitMQProducer extends AquariumAwareSkeleton with Configurable with Logg
          //if(msgs.length>0) Console.err.println("RabbitMQProducer Timer --> messages ..." + msgs.length)
          for {msg <- msgs} {
           // Console.err.println("RabbitMQProducer Timer sending message .." + msg.hashCode)
-           _actorRef ! msg
+           _actorRef ! (msg:()=>Unit)
          }
        }
       } else {
@@ -163,7 +164,6 @@ class RabbitMQProducer extends AquariumAwareSkeleton with Configurable with Logg
         ret
        }
 
-
       def handleAck(seqNo:Long,multiple:Boolean) = {
         //Console.err.println("Received ack for  " + seqNo)
         cutSubset(seqNo,multiple)
@@ -171,7 +171,7 @@ class RabbitMQProducer extends AquariumAwareSkeleton with Configurable with Logg
 
       def handleNack(seqNo:Long,multiple:Boolean) = {
         //Console.err.println("Received Nack for msg for " + seqNo)
-        for {msg <- cutSubset(seqNo,multiple)} _actorRef ! msg
+        for {(_,msg) <- cutSubset(seqNo,multiple)} _actorRef ! (msg:()=>Unit)
       }
     })
   }
@@ -193,7 +193,7 @@ class RabbitMQProducer extends AquariumAwareSkeleton with Configurable with Logg
     def msg () : Unit =
       lock.withLock {
         try {
-             if(isChannelOpen) {
+            if(isChannelOpen) {
               var seq : Long = _channel.getNextPublishSeqNo()
               _unconfirmedSet += seq
               _unconfirmedMessages += ((seq,msg))
@@ -212,7 +212,7 @@ class RabbitMQProducer extends AquariumAwareSkeleton with Configurable with Logg
         }
       }
     if(_actorRef != null)
-      _actorRef ! msg
+      _actorRef ! (msg:()=>Unit)
     else
       lock.withLock(_unsentMessages += msg)
  }
index 423fa62..52aecc3 100644 (file)
@@ -184,7 +184,8 @@ final class AkkaService extends AquariumAwareSkeleton with Configurable with Lif
   }
 
   def createNamedActor[T <: Actor:ClassManifest](name:String) : ActorRef=
-     this.actorSystem.actorOf(Props[T],name)
+     if(this._actorSystem==null) null
+     else this.actorSystem.actorOf(Props[T],name)
 
   def getOrCreateUserActor(userID: String): ActorRef = {
     if(this.isShuttingDown.get()) {
diff --git a/src/test/scala/gr/grnet/aquarium/BillTest.scala b/src/test/scala/gr/grnet/aquarium/BillTest.scala
new file mode 100644 (file)
index 0000000..ae21303
--- /dev/null
@@ -0,0 +1,280 @@
+package gr.grnet.aquarium
+
+import com.ckkloverdos.resource.FileStreamResource
+import converter.StdConverters
+import event.model.im.StdIMEvent
+import event.model.resource.StdResourceEvent
+import java.io.{InputStreamReader, BufferedReader, File}
+import com.ckkloverdos.props.Props
+import store.memory.MemStoreProvider
+import java.util.concurrent.atomic.AtomicLong
+import java.text.SimpleDateFormat
+import java.net.{URLConnection, URL}
+import util.Loggable
+
+/*
+* Copyright 2011-2012 GRNET S.A. All rights reserved.
+*
+* Redistribution and use in source and binary forms, with or
+* without modification, are permitted provided that the following
+* conditions are met:
+*
+*   1. Redistributions of source code must retain the above
+*      copyright notice, this list of conditions and the following
+*      disclaimer.
+*
+*   2. Redistributions in binary form must reproduce the above
+*      copyright notice, this list of conditions and the following
+*      disclaimer in the documentation and/or other materials
+*      provided with the distribution.
+*
+* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+* POSSIBILITY OF SUCH DAMAGE.
+*
+* The views and conclusions contained in the software and
+* documentation are those of the authors and should not be
+* interpreted as representing official policies, either expressed
+* or implied, of GRNET S.A.
+*/
+
+
+/*
+* @author Prodromos Gerakios <pgerakios@grnet.gr>
+*/
+object BillTest extends Loggable {
+
+  type JSON = String
+  type UID  = Long
+  type DATE = String
+
+  private[this] val counter = new AtomicLong(0L)
+  private[this] def nextID() = counter.getAndIncrement
+
+  private [this] val format = new SimpleDateFormat("HH/mm/s/dd/MM/yyyy");
+
+  val propsfile = new FileStreamResource(new File("aquarium.properties"))
+
+  var props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
+
+  val (astakosExchangeName,astakosRoutingKey) = ("astakos","astakos.user")
+
+  val (pithosExchangeName,pithosRoutingKey) = ("pithos","pithos.resource.diskspace")
+
+  val aquarium = {
+      exec("mongo aquarium --eval db.resevents.remove();db.imevents.remove();db.policies.remove();db.userstates.remove()",
+           Console.err.println(_))
+      new AquariumBuilder(props, ResourceLocator.DefaultPolicyModel).
+      update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
+      update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))).
+      build()
+  }
+
+
+  private[this] def exec(cmd : String,func : String=>Unit) : Unit = {
+    val commands = cmd.split(" ")
+    val proc = new ProcessBuilder(commands: _*).redirectErrorStream(true).start();
+    val ins = new java.io.BufferedReader(new java.io.InputStreamReader(proc.getInputStream))
+    val sb = new StringBuilder
+
+    //spin off a thread to read process output.
+    val outputReaderThread = new Thread(new Runnable(){
+      def run : Unit = {
+        var ln : String = null
+        while({ln = ins.readLine; ln != null})
+          func(ln)
+      }
+    })
+    outputReaderThread.start()
+
+    //suspense this main thread until sub process is done.
+    proc.waitFor
+
+    //wait until output is fully read/completed.
+    outputReaderThread.join()
+
+    ins.close()
+  }
+
+
+  private [this] def createUser(date:DATE) : (JSON,UID) = {
+    val mid = nextID
+    val id = "im.%d.create.user".format(mid)
+    val millis = format.parse(date).getTime
+    val occurredMillis = millis
+    val receivedMillis = millis
+    val userID = "user%d@grnet.gr".format(mid)
+    val clientID = "astakos"
+    val isActive = false
+    val role = "default"
+    val eventVersion = "1.0"
+    val eventType = "create"
+    (new StdIMEvent(id,occurredMillis,receivedMillis,userID,
+                   clientID,isActive,role,eventVersion,eventType,
+                   Map()).toJsonString,mid)
+  }
+
+  private [this] def addCredits(date:DATE,uid:UID,amount:Long) : JSON = {
+    val id = "im.%d.add.credits".format(nextID)
+    val millis = format.parse(date).getTime
+    val occurredMillis = millis
+    val receivedMillis = millis
+    val userID = "user%d@grnet.gr".format(uid)
+    val clientID = "astakos"
+    val isActive = false
+    val role = "default"
+    val eventVersion = "1.0"
+    val eventType = "addcredits"
+    new StdIMEvent(id,occurredMillis,receivedMillis,userID,
+                   clientID,isActive,role,eventVersion,eventType,
+                   Map()).toJsonString
+  }
+
+  private [this] def makePithos(date:DATE,uid:UID,path:String,
+                                value:Double,action:String) : JSON = {
+    val id = "rc.%d.object.%s".format(nextID,action)
+    val millis = format.parse(date).getTime
+    val occurredMillis = millis
+    val receivedMillis = millis
+    val userID = "user%d@grnet.gr".format(uid)
+    val clientID = "pithos"
+    val resource ="diskspace"
+    val instanceID = "1"
+    val eventVersion = "1.0"
+    val details = Map("action" -> "object %s".format(action),
+                      "total"  -> "0.0",
+                      "user"   -> userID,
+                      "path"   -> path)
+    new StdResourceEvent(id,occurredMillis,receivedMillis,userID,clientID,
+                         resource,instanceID,value,eventVersion,details).toJsonString
+  }
+
+  private[this] def sendCreate(date:DATE) : UID = {
+    val (json,uid) = createUser(date)
+    aquarium(Aquarium.EnvKeys.rabbitMQProducer).
+    sendMessage(astakosExchangeName,astakosRoutingKey,json)
+    Console.err.println("Sent message:\n%s\n".format(json))
+    uid
+  }
+
+  private[this] def sendAddCredits(date:DATE,uid:UID,amount:Long) = {
+    val json = addCredits(date,uid,amount)
+    aquarium(Aquarium.EnvKeys.rabbitMQProducer).
+    sendMessage(astakosExchangeName,astakosRoutingKey,
+                json)
+    Console.err.println("Sent message:\n%s\n".format(json))
+  }
+
+  private[this] def sendPithos(date:DATE,uid:UID,path:String,
+                               value:Double,action:String) = {
+    val json = makePithos(date,uid,path,value,action)
+    aquarium(Aquarium.EnvKeys.rabbitMQProducer).
+    sendMessage(pithosExchangeName,pithosRoutingKey,
+                json)
+    Console.err.println("Sent message:\n%s\n".format(json))
+  }
+
+  private[this] def jsonOf(url:String) : JSON = {
+     val in = new BufferedReader(
+                         new InputStreamReader(
+                         new URL(url).openConnection().
+                         getInputStream()))
+      var inputLine = ""
+      var ret = ""
+      while ({inputLine = in.readLine();inputLine} != null)
+        ret += (if(ret.isEmpty) "" else "\n")+ inputLine
+      in.close()
+      ret
+  }
+
+  private[this] def getBill(uid:Long,from:String,to:String) : JSON = {
+    val fromMillis = format.parse(from).getTime
+    val toMillis   = format.parse(to).getTime
+    val billURL = " http://localhost:8888/user/user%d@grnet.gr/bill/%d/%d".format(uid,fromMillis,toMillis)
+    try{
+      jsonOf(billURL)
+    } catch {
+      case e:Exception =>
+        ""
+    }
+  }
+
+  private[this] def sleep(l:Long) = {
+  try {
+      Thread.sleep(l)
+    } catch {
+      case ex:InterruptedException =>
+        Thread.currentThread().interrupt()
+    }
+  }
+
+  private[this] def testCase1() : JSON  = {
+    /* GET BILL FROM TO*/
+    val billFromDate = "00/00/00/01/08/2012"
+    val billToDate= "23/59/59/31/08/2012"
+    /* USER Creation */
+    val creationDate = "15/00/00/03/08/2012"
+    /* ADD CREDITS */
+    val addCreditsDate = "18/15/00/05/08/2012"
+    val creditsToAdd = 5000
+    /* Pithos STUFF */
+    val pithosPath = "/Papers/GOTO_HARMFUL.PDF"
+
+    val pithosDate1 = "08/30/00/05/08/2012"
+    val pithosAction1 = "update"
+    val pithosValue1 = 2000
+
+
+    val pithosDate2 = "21/05/00/15/08/2012"
+    val pithosAction2 = "update"
+    val pithosValue2 = 4000
+
+    val id =
+      sendCreate(creationDate)
+      sendAddCredits(addCreditsDate,id,creditsToAdd)
+      sendPithos(pithosDate1,id,pithosPath,pithosValue1,pithosAction1)
+      sendPithos(pithosDate2,id,pithosPath,pithosValue2,pithosAction2)
+
+    Console.err.println("Waiting for stuff to be processed")
+    Thread.sleep(2000)
+
+    var resp = ""
+    var count = 0
+    while(resp.isEmpty && count < 5){
+      if(count > 0) Console.err.println("Retrying for bill request.")
+      resp = getBill(id,billFromDate,billToDate)
+      if(resp.isEmpty) Thread.sleep(1000)
+      //sleep(1000L)
+      count += 1
+    }
+    Console.err.println("Sending URL done")
+    resp
+  }
+
+  def runTestCase(f: => JSON) = {
+    var json = ""
+    aquarium.start
+    try{
+      json = f
+    }  catch{
+      case e:Exception =>
+        e.printStackTrace
+    }
+    aquarium.stop
+    Console.err.println("Response : " + json )
+  }
+
+  def main(args: Array[String]) = {
+    //Console.err.println("JSON: " +  (new BillEntry).toJsonString)
+    runTestCase(testCase1)
+  }
+}