Minor fixes in UserActor, BillEntry and RabbitMQProducer
authorProdromos Gerakios <pgerakios@grnet.gr>
Fri, 17 Aug 2012 06:15:53 +0000 (09:15 +0300)
committerProdromos Gerakios <pgerakios@grnet.gr>
Fri, 17 Aug 2012 06:15:53 +0000 (09:15 +0300)
bill.sh
src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala
src/main/scala/gr/grnet/aquarium/charging/bill/BillEntry.scala
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQProducer.scala

diff --git a/bill.sh b/bill.sh
index 18a9248..79eb27a 100755 (executable)
--- a/bill.sh
+++ b/bill.sh
@@ -1,21 +1,70 @@
 #!/bin/bash
 rm -f logs/aquarium.log  aquarium.home_IS_UNDEFINED/logs/aquarium.log
 mongo aquarium --eval "db.resevents.remove(); db.imevents.remove() ; db.policies.remove() ; db.userstates.remove()"
-rabbitmqadmin -H dev82.dev.grnet.gr -P 55672 -u rabbit -p r@bb1t publish  routing_key=astakos.user exchange=astakos < __im.1.create.loverdos.json
-rabbitmqadmin -H dev82.dev.grnet.gr -P 55672 -u rabbit -p r@bb1t publish  routing_key=astakos.user exchange=astakos < __addcredits.json
-rabbitmqadmin -H dev82.dev.grnet.gr -P 55672 -u rabbit -p r@bb1t publish  routing_key=pithos.resource.diskspace exchange=pithos < __rc.1.loverdos.json
+
+BILLREQUEST="http://localhost:8888/user/loverdos@grnet.gr/bill/1341090000000/1343768399999"
+
+USERCREATE="{
+    \"id\": \"im.1.create.loverdos\",
+    \"clientID\": \"astakos\", 
+    \"details\": {
+        \"__aquarium_comment_1\": \"user is created\"
+    },
+    \"eventType\": \"create\", 
+    \"eventVersion\": \"1\", 
+    \"isActive\": false, 
+    \"occurredMillis\":  1342731600000, 
+    \"receivedMillis\": 1342731600000, 
+    \"role\": \"default\", 
+    \"userID\": \"loverdos@grnet.gr\"
+}"
+
+ADDCREDITS="{
+ \"id\": \"im1400\",
+ \"clientID\": \"astakos\",
+ \"details\": {
+           \"credits\": \"5000\"
+  },
+ \"eventType\": \"addcredits\",
+ \"eventVersion\": \"1\",
+ \"isActive\": false,
+ \"occurredMillis\": 1344345437000,
+ \"receivedMillis\": 1344345437000,
+ \"role\": \"default\",
+ \"userID\": \"loverdos@grnet.gr\"
+}"
+
+RESOURCE1="{
+  \"id\": \"rc.1.loverdos\",
+  \"occurredMillis\": 1342735200000,
+  \"receivedMillis\": 1342735200000,
+  \"userID\": \"loverdos@grnet.gr\",
+  \"clientID\":   \"pithos\",
+  \"resource\":   \"diskspace\",
+  \"instanceID\": \"1\",
+  \"value\":  1000.0,
+  \"eventVersion\":   \"1.0\",
+  \"details\": {
+    \"action\":   \"object update\",
+    \"total\":    \"1000.0\",
+    \"user\":     \"loverdos@grnet.gr\",
+    \"path\":     \"/Papers/GOTO_HARMFUL.PDF\"
+  }
+}"
+
+#rabbitmqadmin -H dev82.dev.grnet.gr -P 55672 -u rabbit -p r@bb1t publish  routing_key=astakos.user exchange=astakos < __im.1.create.loverdos.json
+#rabbitmqadmin -H dev82.dev.grnet.gr -P 55672 -u rabbit -p r@bb1t publish  routing_key=astakos.user exchange=astakos < __addcredits.json
+#rabbitmqadmin -H dev82.dev.grnet.gr -P 55672 -u rabbit -p r@bb1t publish  routing_key=pithos.resource.diskspace exchange=pithos < __rc.1.loverdos.json
+
+ echo $USERCREATE | rabbitmqadmin -H dev82.dev.grnet.gr -P 55672 -u rabbit -p r@bb1t publish  routing_key=astakos.user exchange=astakos
+ echo $ADDCREDITS | rabbitmqadmin -H dev82.dev.grnet.gr -P 55672 -u rabbit -p r@bb1t publish  routing_key=astakos.user exchange=astakos 
+ echo $RESOURCE1  | rabbitmqadmin -H dev82.dev.grnet.gr -P 55672 -u rabbit -p r@bb1t publish  routing_key=pithos.resource.diskspace exchange=pithos
 MAINCLASS=gr.grnet.aquarium.charging.bill.BillEntry
 /home/pgerakios/jdk1.6.0_33/bin/java -Dfile.encoding=UTF-8 -classpath /home/pgerakios/jdk1.6.0_33/jre/lib/rt.jar:/home/pgerakios/jdk1.6.0_33/jre/lib/resources.jar:/home/pgerakios/jdk1.6.0_33/jre/lib/jce.jar:/home/pgerakios/jdk1.6.0_33/jre/lib/plugin.jar:/home/pgerakios/jdk1.6.0_33/jre/lib/deploy.jar:/home/pgerakios/jdk1.6.0_33/jre/lib/jsse.jar:/home/pgerakios/jdk1.6.0_33/jre/lib/management-agent.jar:/home/pgerakios/jdk1.6.0_33/jre/lib/javaws.jar:/home/pgerakios/jdk1.6.0_33/jre/lib/charsets.jar:/home/pgerakios/jdk1.6.0_33/jre/lib/ext/sunpkcs11.jar:/home/pgerakios/jdk1.6.0_33/jre/lib/ext/localedata.jar:/home/pgerakios/jdk1.6.0_33/jre/lib/ext/dnsns.jar:/home/pgerakios/jdk1.6.0_33/jre/lib/ext/sunjce_provider.jar:/home/pgerakios/aquarium/target/classes:/home/pgerakios/.m2/repository/org/scala-lang/scala-library/2.9.1/scala-library-2.9.1.jar:/home/pgerakios/.m2/repository/org/slf4j/slf4j-api/1.6.1/slf4j-api-1.6.1.jar:/home/pgerakios/.m2/repository/ch/qos/logback/logback-classic/0.9.29/logback-classic-0.9.29.jar:/home/pgerakios/.m2/repository/ch/qos/logback/logback-core/0.9.29/logback-core-0.9.29.jar:/home/pgerakios/.m2/repository/org/scala-lang/scala-compiler/2.9.1/scala-compiler-2.9.1.jar:/home/pgerakios/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.0.2/jackson-core-2.0.2.jar:/home/pgerakios/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.0.2/jackson-databind-2.0.2.jar:/home/pgerakios/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.0.2/jackson-annotations-2.0.2.jar:/home/pgerakios/.m2/repository/com/fasterxml/jackson/module/jackson-module-scala/2.0.2/jackson-module-scala-2.0.2.jar:/home/pgerakios/.m2/repository/org/scalastuff/scalabeans/0.2/scalabeans-0.2.jar:/home/pgerakios/.m2/repository/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar:/home/pgerakios/.m2/repository/com/google/guava/guava/12.0/guava-12.0.jar:/home/pgerakios/.m2/repository/net/liftweb/lift-json_2.9.1/2.4/lift-json_2.9.1-2.4.jar:/home/pgerakios/.m2/repository/org/scala-lang/scalap/2.9.1/scalap-2.9.1.jar:/home/pgerakios/.m2/repository/net/liftweb/lift-json-ext_2.9.1/2.4/lift-json-ext_2.9.1-2.4.jar:/home/pgerakios/.m2/repository/commons-codec/commons-codec/1.4/commons-codec-1.4.jar:/home/pgerakios/.m2/repository/net/liftweb/lift-common_2.9.1/2.4/lift-common_2.9.1-2.4.jar:/home/pgerakios/.m2/repository/org/yaml/snakeyaml/1.9/snakeyaml-1.9.jar:/home/pgerakios/.m2/repository/com/kenai/crontab-parser/crontab-parser/1.0.1/crontab-parser-1.0.1.jar:/home/pgerakios/.m2/repository/com/rabbitmq/amqp-client/2.8.4/amqp-client-2.8.4.jar:/home/pgerakios/.m2/repository/com/ckkloverdos/jbootstrap/3.0.0/jbootstrap-3.0.0.jar:/home/pgerakios/.m2/repository/com/ckkloverdos/streamresource/0.5.1/streamresource-0.5.1.jar:/home/pgerakios/.m2/repository/com/ckkloverdos/maybe/0.5.0/maybe-0.5.0.jar:/home/pgerakios/.m2/repository/com/ckkloverdos/sysprop/0.5.1/sysprop-0.5.1.jar:/home/pgerakios/.m2/repository/com/ckkloverdos/converter/0.5.0/converter-0.5.0.jar:/home/pgerakios/.m2/repository/com/ckkloverdos/typedkey/0.5.0/typedkey-0.5.0.jar:/home/pgerakios/.m2/repository/com/thoughtworks/xstream/xstream/1.4.1/xstream-1.4.1.jar:/home/pgerakios/.m2/repository/xmlpull/xmlpull/1.1.3.1/xmlpull-1.1.3.1.jar:/home/pgerakios/.m2/repository/xpp3/xpp3_min/1.1.4c/xpp3_min-1.1.4c.jar:/home/pgerakios/.m2/repository/org/mongodb/mongo-java-driver/2.7.2/mongo-java-driver-2.7.2.jar:/home/pgerakios/.m2/repository/com/typesafe/akka/akka-actor/2.0.2/akka-actor-2.0.2.jar:/home/pgerakios/.m2/repository/com/typesafe/akka/akka-remote/2.0.2/akka-remote-2.0.2.jar:/home/pgerakios/.m2/repository/io/netty/netty/3.3.0.Final/netty-3.3.0.Final.jar:/home/pgerakios/.m2/repository/com/google/protobuf/protobuf-java/2.4.1/protobuf-java-2.4.1.jar:/home/pgerakios/.m2/repository/net/debasishg/sjson_2.9.1/0.15/sjson_2.9.1-0.15.jar:/home/pgerakios/.m2/repository/net/databinder/dispatch-json_2.9.1/0.8.5/dispatch-json_2.9.1-0.8.5.jar:/home/pgerakios/.m2/repository/org/apache/httpcomponents/httpclient/4.1/httpclient-4.1.jar:/home/pgerakios/.m2/repository/org/apache/httpcomponents/httpcore/4.1/httpcore-4.1.jar:/home/pgerakios/.m2/repository/commons-logging/commons-logging/1.1.1/commons-logging-1.1.1.jar:/home/pgerakios/.m2/repository/org/objenesis/objenesis/1.2/objenesis-1.2.jar:/home/pgerakios/.m2/repository/commons-io/commons-io/1.4/commons-io-1.4.jar:/home/pgerakios/.m2/repository/voldemort/store/compress/h2-lzf/1.0/h2-lzf-1.0.jar:/home/pgerakios/.m2/repository/com/typesafe/akka/akka-slf4j/2.0.2/akka-slf4j-2.0.2.jar:/home/pgerakios/.m2/repository/com/twitter/finagle-core_2.9.1/4.0.2/finagle-core_2.9.1-4.0.2.jar:/home/pgerakios/.m2/repository/com/twitter/util-core_2.9.1/4.0.1/util-core_2.9.1-4.0.1.jar:/home/pgerakios/.m2/repository/com/twitter/util-collection_2.9.1/4.0.1/util-collection_2.9.1-4.0.1.jar:/home/pgerakios/.m2/repository/commons-collections/commons-collections/3.2.1/commons-collections-3.2.1.jar:/home/pgerakios/.m2/repository/com/twitter/util-hashing_2.9.1/4.0.1/util-hashing_2.9.1-4.0.1.jar:/home/pgerakios/.m2/repository/com/twitter/finagle-http_2.9.1/4.0.2/finagle-http_2.9.1-4.0.2.jar:/home/pgerakios/.m2/repository/com/twitter/util-codec_2.9.1/4.0.1/util-codec_2.9.1-4.0.1.jar:/home/pgerakios/.m2/repository/com/twitter/util-logging_2.9.1/4.0.1/util-logging_2.9.1-4.0.1.jar:/home/pgerakios/.m2/repository/commons-lang/commons-lang/2.6/commons-lang-2.6.jar:/home/pgerakios/.m2/repository/com/google/code/findbugs/jsr305/1.3.9/jsr305-1.3.9.jar:/home/pgerakios/.m2/repository/joda-time/joda-time/2.0/joda-time-2.0.jar:/home/pgerakios/.m2/repository/org/joda/joda-convert/1.1/joda-convert-1.1.jar:/home/pgerakios/.m2/repository/org/quartz-scheduler/quartz/2.1.5/quartz-2.1.5.jar:/home/pgerakios/.m2/repository/c3p0/c3p0/0.9.1.1/c3p0-0.9.1.1.jar:/home/pgerakios/.m2/repository/org/quartz-scheduler/quartz-oracle/2.1.5/quartz-oracle-2.1.5.jar:/home/pgerakios/.m2/repository/org/quartz-scheduler/quartz-weblogic/2.1.5/quartz-weblogic-2.1.5.jar:/home/pgerakios/.m2/repository/org/quartz-scheduler/quartz-jboss/2.1.5/quartz-jboss-2.1.5.jar:/home/pgerakios/idea-IC-117.418/lib/idea_rt.jar $MAINCLASS & 
 PID=$!
 echo "PID: $PID"
 sleep 2
-curl -s "http://localhost:8888/user/loverdos@grnet.gr/bill/1341090000000/1343768399999/"
+curl -s "$BILLREQUEST"
 kill -9 $!
 echo -e "\nKilling aquarium. Success (0=ok)  => $?" 
 #sleep 1
-#rabbitmqadmin -H dev82.dev.grnet.gr -P 55672 -u rabbit -p r@bb1t publish  routing_key=pithos.resource.diskspace exchange=pithos < __rc.2.loverdos.json
-#rabbitmqadmin -H dev82.dev.grnet.gr -P 55672 -u rabbit -p r@bb1t publish  routing_key=pithos.resource.diskspace exchange=pithos < __rc.3.loverdos.json
-#rabbitmqadmin -H dev82.dev.grnet.gr -P 55672 -u rabbit -p r@bb1t publish  routing_key=pithos.resource.diskspace exchange=pithos < __rc.4.loverdos.json
-#rabbitmqadmin -H dev82.dev.grnet.gr -P 55672 -u rabbit -p r@bb1t publish  routing_key=pithos.resource.diskspace exchange=pithos < __rc.5.loverdos.json
-
-
index 0c259be..65d14ff 100644 (file)
@@ -321,7 +321,7 @@ class UserActor extends ReflectiveRoleableActor {
   /* Convert astakos message for adding credits
     to a regular RESOURCE message */
   def onHandleAddCreditsEvent(imEvent : IMEventModel) = {
-    val credits = -imEvent.details(IMEventModel.DetailsNames.credits).toInt.toDouble
+    val credits = imEvent.details(IMEventModel.DetailsNames.credits).toInt.toDouble
     val event = new StdResourceEvent(
       imEvent.id,
       imEvent.occurredMillis,
@@ -442,7 +442,7 @@ class UserActor extends ReflectiveRoleableActor {
     try{
       val timeslot = event.timeslot
       val state= if(haveWorkingUserState) Some(this._workingUserState) else None
-      val billEntry = BillEntry.fromWorkingUserState(timeslot,state)
+      val billEntry = BillEntry.fromWorkingUserState(timeslot,this._userID,state)
       val billData = GetUserBillResponseData(this._userID,billEntry)
       sender ! GetUserBillResponse(Right(billData))
     } catch {
index 3a33e71..5de5905 100644 (file)
@@ -11,6 +11,13 @@ import gr.grnet.aquarium.store.memory.MemStoreProvider
 import gr.grnet.aquarium.converter.StdConverters._
 import scala.Some
 import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
+import java.util.concurrent.atomic.AtomicLong
+import java.util.{Date, Calendar, GregorianCalendar}
+import gr.grnet.aquarium.charging.wallet.WalletEntry
+import collection.parallel.mutable
+import collection.mutable.ListBuffer
+import gr.grnet.aquarium.Aquarium.EnvKeys
+import gr.grnet.aquarium.charging.Chargeslot
 
 /*
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
@@ -52,13 +59,12 @@ import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
 * @author Prodromos Gerakios <pgerakios@grnet.gr>
 */
 
-class EventEntry(id:String,
-                 eventType:String,
-                 unitPrice:String,
-                 startTime:String,
-                 endTime:String,
-                 ellapsedTime:String,
-                 credits:String) extends JsonSupport {
+class EventEntry(val id:String,
+                 val unitPrice:String,
+                 val startTime:String,
+                 val endTime:String,
+                 val ellapsedTime:String,
+                 val credits:String) extends JsonSupport {
 
 }
 
@@ -66,6 +72,7 @@ class ResourceEntry(val resourceName : String,
                     val resourceType : String,
                     val unitName : String,
                     val totalCredits : String,
+                    val eventType:String,
                     val details : List[EventEntry]) extends JsonSupport {
 
 }
@@ -83,18 +90,114 @@ class BillEntry(val id:String,
 }
 
 object BillEntry {
-  def fromWorkingUserState(t:Timeslot,w:Option[WorkingUserState]) : BillEntry = {
-    //TODO: get entries at timeslot "t"
-    val eventEntry = new EventEntry("1234","onOff","0.1","323232323","3232223456","10000","5.00")
-    val resourceEntry = new ResourceEntry("VM_1","vmtime","0.01","5.0",List(eventEntry))
-    new BillEntry("323232","loverdos@grnet.gr","ok","100.00","5.00","23023020302","23232323",
-                  List(resourceEntry))
+
+  private[this] val counter = new AtomicLong(0L)
+  private[this] def nextUIDObject() = counter.getAndIncrement
+
+  /*private[this] def walletTimeslot(i:WalletEntry) : Timeslot = {
+    val cal = new GregorianCalendar
+    cal.set(i.billingYear,i.billingMonth,1)
+    val dstart = cal.getTime
+    val lastDate = cal.getActualMaximum(Calendar.DATE)
+    cal.set(Calendar.DATE, lastDate)
+    val dend = cal.getTime
+   Timeslot(dstart,dend)
+  } */
+
+  private[this] def toEventEntry(c:Chargeslot) : EventEntry = {
+    val unitPrice = c.unitPrice.toString
+    val startTime = c.startMillis.toString
+    val endTime   = c.stopMillis.toString
+    val difTime   = (c.stopMillis - c.startMillis).toString
+    val credits   = c.creditsToSubtract.toString
+    new EventEntry(counter.getAndIncrement.toString,unitPrice,
+                    startTime,endTime,difTime,credits)
+
+  }
+
+
+  private[this] def toResourceEntry(w:WalletEntry) : ResourceEntry = {
+    assert(w.sumOfCreditsToSubtract==0.0 || w.chargslotCount > 0)
+    val rcName =  w.resource.toString
+    val rcType =  w.resourceType.name
+    val rcUnitName = w.resourceType.unit
+    val eventEntry = new ListBuffer[EventEntry]
+    val credits = w.sumOfCreditsToSubtract
+    val eventType = //TODO: This is hardcoded; find a better solution
+        w.currentResourceEvent.clientID match {
+          case "pithos" =>
+            val action = w.currentResourceEvent.details("action")
+            val path = w.currentResourceEvent.details("path")
+            "%s@%s".format(action,path)
+          case "cyclades" =>
+            w.currentResourceEvent.value.toInt match {
+              case 0 => // OFF
+                  "offOn"
+              case 1 =>  // ON
+                 "onOff"
+              case 2 =>
+                 "destroy"
+              case _ =>
+                 "BUG"
+            }
+          case "astakos" =>
+            "once"
+        }
+
+    for { c <- w.chargeslots }{
+      if(c.creditsToSubtract != 0.0) {
+        //Console.err.println("c.creditsToSubtract : " + c.creditsToSubtract)
+        eventEntry += toEventEntry(c)
+        //credits += c.creditsToSubtract
+      }
+    }
+    //Console.err.println("TOTAL resource event credits: " + credits)
+    new ResourceEntry(rcName,rcType,rcUnitName,credits.toString,eventType.toString,eventEntry.toList)
+  }
+
+  private[this] def resourceEntriesAt(t:Timeslot,w:WorkingUserState) : (List[ResourceEntry],Double) = {
+    val ret = new ListBuffer[ResourceEntry]
+    var sum = 0.0
+    //Console.err.println("Wallet entries: " + w.walletEntries)
+    for { i <- w.walletEntries} {
+      if(t.contains(i.referenceTimeslot) && i.sumOfCreditsToSubtract != 0.0){
+        //Console.err.println("i.sumOfCreditsToSubtract : " + i.sumOfCreditsToSubtract)
+        sum += i.sumOfCreditsToSubtract
+        ret += toResourceEntry(i)
+      } else {
+        //Console.err.println("WALLET ENTERY : " + i + "\n" +
+          //           t + "  does not contain " +  i.referenceTimeslot + "  !!!!")
+      }
+    }
+    (ret.toList,sum)
+  }
+
+  def fromWorkingUserState(t:Timeslot,userID:String,w:Option[WorkingUserState]) : BillEntry = {
+    val ret = w match {
+      case None =>
+          new BillEntry(counter.getAndIncrement.toString,
+                        userID,"processing",
+                        "0.0",
+                        "0.0",
+                        t.from.getTime.toString,t.to.getTime.toString,
+                        Nil)
+      case Some(w) =>
+        val (rcEntries,rcEntriesCredits) = resourceEntriesAt(t,w)
+        new BillEntry(counter.getAndIncrement.toString,
+                      userID,"ok",
+                      w.totalCredits.toString,
+                      rcEntriesCredits.toString,
+                      t.from.getTime.toString,t.to.getTime.toString,
+                      rcEntries)
+    }
+    //Console.err.println("JSON: " +  ret.toJsonString)
+    ret
   }
 
   //
   def main(args: Array[String]) = {
     //Console.err.println("JSON: " +  (new BillEntry).toJsonString)
-    val propsfile = new FileStreamResource(new File("a1.properties"))
+    val propsfile = new FileStreamResource(new File("aquarium.properties"))
     var _props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
     val aquarium = new AquariumBuilder(_props, ResourceLocator.DefaultPolicyModel).
       update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
index 7f911c8..f778467 100644 (file)
@@ -110,14 +110,14 @@ class RabbitMQProducer extends Configurable {
 
       def handleAck(seqNo:Long,multiple:Boolean) = {
         withChannel {
-          Console.err.println("Received ack for msg " + _unconfirmedMessages.get(seqNo) )
+          //Console.err.println("Received ack for msg " + _unconfirmedMessages.get(seqNo) )
           subset(seqNo,multiple)
         }
       }
 
       def handleNack(seqNo:Long,multiple:Boolean) = {
         withChannel {
-          Console.err.println("Received Nack for msg " + _unconfirmedMessages.get(seqNo) )
+          //Console.err.println("Received Nack for msg " + _unconfirmedMessages.get(seqNo) )
           for { (_,msg) <- subset(seqNo,multiple) }
             sendMessage(msg)
         }