-package gr.grnet.aquarium.charging.bill
-
-import gr.grnet.aquarium.charging.state.WorkingUserState
-import gr.grnet.aquarium.util.json.JsonSupport
-import com.ckkloverdos.resource.FileStreamResource
-import java.io.File
-import com.ckkloverdos.props.Props
-import gr.grnet.aquarium.converter.{CompactJsonTextFormat, PrettyJsonTextFormat, StdConverters}
-import gr.grnet.aquarium.{Aquarium, ResourceLocator, AquariumBuilder}
-import gr.grnet.aquarium.store.memory.MemStoreProvider
-import gr.grnet.aquarium.converter.StdConverters._
-import scala._
-import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
-import java.util.concurrent.atomic.AtomicLong
-import java.util.{Date, Calendar, GregorianCalendar}
-import gr.grnet.aquarium.charging.wallet.WalletEntry
-import scala.collection.parallel.mutable
-import scala.collection.mutable.ListBuffer
-import gr.grnet.aquarium.Aquarium.EnvKeys
-import gr.grnet.aquarium.charging.Chargeslot
-import scala.collection.immutable.TreeMap
-import scala.Some
-import gr.grnet.aquarium.charging.Chargeslot
-
-
/*
* Copyright 2011-2012 GRNET S.A. All rights reserved.
*
* or implied, of GRNET S.A.
*/
+package gr.grnet.aquarium.charging.bill
+
+import com.ckkloverdos.props.Props
+import com.ckkloverdos.resource.FileStreamResource
+import gr.grnet.aquarium.converter.{CompactJsonTextFormat, StdConverters}
+import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
+import gr.grnet.aquarium.message.avro.{AvroHelpers, MessageHelpers}
+import gr.grnet.aquarium.message.avro.gen.{ChargeslotMsg, WalletEntryMsg, UserStateMsg}
+import gr.grnet.aquarium.store.memory.MemStoreProvider
+import gr.grnet.aquarium.util.json.JsonSupport
+import gr.grnet.aquarium.{Aquarium, ResourceLocator, AquariumBuilder}
+import java.io.File
+import java.util.concurrent.atomic.AtomicLong
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable.ListBuffer
+import gr.grnet.aquarium.policy.ResourceType
+
/*
* @author Prodromos Gerakios <pgerakios@grnet.gr>
val unitPrice:String,
val startTime:String,
val endTime:String,
- val ellapsedTime:String,
+ val elapsedTime:String,
+ val units:String,
val credits:String)
extends JsonSupport {}
case class ResourceEntry(val resourceName : String,
- val resourceType : String,
- val unitName : String,
+ //val resourceType : String,
+ //val unitName : String,
val totalCredits : String,
+ val totalElapsedTime : String,
+ val totalUnits : String,
val details : List[EventEntry])
-extends JsonSupport {}
+extends JsonSupport {
+ var unitName = "EMPTY_UNIT_NAME"
+ var resourceType = "EMPTY_RESOURCE_TYPE"
+}
+case class ServiceEntry(val serviceName: String,
+ val totalCredits : String,
+ val totalElapsedTime : String,
+ val totalUnits:String,
+ val unitName:String,
+ val details: List[ResourceEntry]
+ )
+extends JsonSupport {}
abstract class AbstractBillEntry
extends JsonSupport {}
val deductedCredits:String,
val startTime:String,
val endTime:String,
- val bill:List[ResourceEntry]
+ val bill:List[ServiceEntry]
)
extends AbstractBillEntry {}
Timeslot(dstart,dend)
} */
- private[this] def toChargeEntry(c:Chargeslot) : ChargeEntry = {
- val unitPrice = c.unitPrice.toString
- val startTime = c.startMillis.toString
- val endTime = c.stopMillis.toString
- val difTime = (c.stopMillis - c.startMillis).toString
- val credits = c.creditsToSubtract.toString
- new ChargeEntry(counter.getAndIncrement.toString,unitPrice,
- startTime,endTime,difTime,credits)
+ private[this] def toChargeEntry(c:ChargeslotMsg) : (ChargeEntry,Long,Double) = {
+ val unitPrice = c.getUnitPrice.toString
+ val startTime = c.getStartMillis.toString
+ val endTime = c.getStopMillis.toString
+ val difTime = (c.getStopMillis - c.getStartMillis).toLong
+ val unitsD = (c.getCreditsToSubtract/c.getUnitPrice)
+ val credits = c.getCreditsToSubtract.toString
+ (new ChargeEntry(counter.getAndIncrement.toString,unitPrice,
+ startTime,endTime,difTime.toString,unitsD.toString,credits),difTime,unitsD)
}
- private[this] def toEventEntry(eventType:String,c:Chargeslot) : EventEntry =
- new EventEntry(eventType,List(toChargeEntry(c)))
+ private[this] def toEventEntry(eventType:String,c:ChargeslotMsg) : (EventEntry,Long,Double) = {
+ val (c1,l1,d1) = toChargeEntry(c)
+ (new EventEntry(eventType,List(c1)),l1,d1)
+ }
- private[this] def toResourceEntry(w:WalletEntry) : ResourceEntry = {
- assert(w.sumOfCreditsToSubtract==0.0 || w.chargslotCount > 0)
- val rcType = w.resourceType.name
+ private[this] def toResourceEntry(w:WalletEntryMsg) : ResourceEntry = {
+ assert(w.getSumOfCreditsToSubtract==0.0 || MessageHelpers.chargeslotCountOf(w) > 0)
+ val rcType = w.getResourceType.getName
val rcName = rcType match {
case "diskspace" =>
- w.currentResourceEvent.details("path")
+ String.valueOf(MessageHelpers.currentResourceEventOf(w).getDetails.get("path").getAnyValue)
case _ =>
- w.currentResourceEvent.instanceID
+ MessageHelpers.currentResourceEventOf(w).getInstanceID
}
- val rcUnitName = w.resourceType.unit
+ val rcUnitName = w.getResourceType.getUnit
val eventEntry = new ListBuffer[EventEntry]
- val credits = w.sumOfCreditsToSubtract
+ val credits = w.getSumOfCreditsToSubtract
val eventType = //TODO: This is hardcoded; find a better solution
rcType match {
case "diskspace" =>
- val action = w.currentResourceEvent.details("action")
- val path = w.currentResourceEvent.details("path")
+ val action = MessageHelpers.currentResourceEventOf(w).getDetails.get("action").getAnyValue
+ //val path = MessageHelpers.currentResourceEventOf(w).getDetails.get("path")
//"%s@%s".format(action,path)
action
case "vmtime" =>
- w.currentResourceEvent.value.toInt match {
+ MessageHelpers.currentResourceEventOf(w).getValue.toInt match {
case 0 => // OFF
"offOn"
case 1 => // ON
case "addcredits" =>
"once"
}
-
- for { c <- w.chargeslots }{
- if(c.creditsToSubtract != 0.0) {
- //Console.err.println("c.creditsToSubtract : " + c.creditsToSubtract)
- eventEntry += toEventEntry(eventType.toString,c)
- //credits += c.creditsToSubtract
+ //w.
+ import scala.collection.JavaConverters.asScalaBufferConverter
+ ///FIXME: val elapsedTime = w.getChargeslots.asScala.foldLeft()
+ //c.getStopMillis - c.getStartMillis
+ var totalElapsedTime = 0L
+ var totalUnits = 0.0D
+ for { c <- w.getChargeslots.asScala }{
+ if(c.getCreditsToSubtract != 0.0) {
+ //Console.err.println("c.creditsToSubtract : " + c.creditsToSubtract)
+ val (e,l,u) = toEventEntry(eventType.toString,c)
+ eventEntry += e
+ totalElapsedTime += l
+ totalUnits += u
+ }
}
- }
//Console.err.println("TOTAL resource event credits: " + credits)
- new ResourceEntry(rcName,rcType,rcUnitName,credits.toString,eventEntry.toList)
+ val re = new ResourceEntry(rcName,/*rcType,rcUnitName,*/credits.toString,totalElapsedTime.toString,
+ totalUnits.toString,eventEntry.toList)
+ re.unitName = rcUnitName
+ re.resourceType = rcType
+ re
}
- private[this] def resourceEntriesAt(t:Timeslot,w:WorkingUserState) : (List[ResourceEntry],Double) = {
+ private[this] def resourceEntriesAt(t:Timeslot,w:UserStateMsg) : (List[ResourceEntry],Double) = {
val ret = new ListBuffer[ResourceEntry]
var sum = 0.0
//Console.err.println("Wallet entries: " + w.walletEntries)
- val walletEntries = w.walletEntries
+ import scala.collection.JavaConverters.asScalaBufferConverter
+ val walletEntries = w.getWalletEntries.asScala
/*Console.err.println("Wallet entries ")
for { i <- walletEntries }
Console.err.println("WALLET ENTRY\n%s\nEND WALLET ENTRY".format(i.toJsonString))
Console.err.println("End wallet entries")*/
for { i <- walletEntries} {
- if(t.contains(i.referenceTimeslot) && i.sumOfCreditsToSubtract != 0.0){
+ val referenceTimeslot = MessageHelpers.referenceTimeslotOf(i)
+ if(t.contains(referenceTimeslot) && i.getSumOfCreditsToSubtract.toDouble != 0.0){
/*Console.err.println("i.sumOfCreditsToSubtract : " + i.sumOfCreditsToSubtract)*/
- if(i.sumOfCreditsToSubtract > 0.0D) sum += i.sumOfCreditsToSubtract
+ if(i.getSumOfCreditsToSubtract.toDouble > 0.0D)
+ sum += i.getSumOfCreditsToSubtract.toDouble
ret += toResourceEntry(i)
} else {
- /*Console.err.println("WALLET ENTERY : " + i.toJsonString + "\n" +
- t + " does not contain " + i.referenceTimeslot + " !!!!")*/
+ val ijson = AvroHelpers.jsonStringOfSpecificRecord(i)
+ val itimeslot = MessageHelpers.referenceTimeslotOf(i)
+ Console.err.println("IGNORING WALLET ENTRY : " + ijson + "\n" +
+ t + " does not contain " + itimeslot + " !!!!")
}
}
(ret.toList,sum)
}
- private[this] def aggregateResourceEntries(re:List[ResourceEntry]) : List[ResourceEntry] = {
+ private[this] def aggregateResourceEntries(re:List[ResourceEntry]) : List[ServiceEntry] = {
def addResourceEntries(a:ResourceEntry,b:ResourceEntry) : ResourceEntry = {
assert(a.resourceName == b.resourceName)
val totalCredits = (a.totalCredits.toDouble+b.totalCredits.toDouble).toString
- a.copy(a.resourceName,a.resourceType,a.unitName,totalCredits,a.details ::: b.details)
+ val totalElapsedTime = (a.totalElapsedTime.toLong+b.totalElapsedTime.toLong).toString
+ val totalUnits = (a.totalUnits.toDouble+b.totalUnits.toDouble).toString
+ val ab = a.copy(a.resourceName/*,a.resourceType,a.unitName*/,totalCredits,totalElapsedTime,totalUnits,
+ a.details ::: b.details)
+ ab.unitName = a.unitName
+ ab.resourceType = a.resourceType
+ ab
}
- re.foldLeft(TreeMap[String,ResourceEntry]()){ (map,r1) =>
+ val map0 = re.foldLeft(TreeMap[String,ResourceEntry]()){ (map,r1) =>
map.get(r1.resourceName) match {
case None => map + ((r1.resourceName,r1))
case Some(r0) => (map - r0.resourceName) +
((r0.resourceName, addResourceEntries(r0,r1)))
}
- }.values.toList
+ }
+ val map1 = map0.foldLeft(TreeMap[String,List[ResourceEntry]]()){ case (map,(_,r1)) =>
+ map.get(r1.resourceType) match {
+ case None => map + ((r1.resourceType,List(r1)))
+ case Some(rl) => (map - r1.resourceType) + ((r1.resourceType,r1::rl))
+ }
+ }
+ map1.foldLeft(List[ServiceEntry]()){ case (ret,(serviceName,resList)) =>
+ val (totalCredits,totalElapsedTime,totalUnits) =
+ resList.foldLeft((0.0D,0L,0.0D)){ case ((a,b,c),r) =>
+ (a+r.totalCredits.toDouble,
+ b+r.totalElapsedTime.toLong,
+ c+r.totalUnits.toDouble
+ )}
+ new ServiceEntry(serviceName,totalCredits.toString,
+ totalElapsedTime.toString,totalUnits.toString,
+ resList.head.unitName,resList) :: ret
+ }
}
- def fromWorkingUserState(t:Timeslot,userID:String,w:Option[WorkingUserState]) : AbstractBillEntry = {
+ def addMissingServices(se:List[ServiceEntry],re:Map[String,ResourceType]) : List[ServiceEntry]=
+ se:::(re -- se.map(_.serviceName).toSet).foldLeft(List[ServiceEntry]()) { case (ret,(name,typ:ResourceType)) =>
+ new ServiceEntry(name,"0.0","0","0.0",typ.unit,List[ResourceEntry]()) :: ret
+ }
+
+ def fromWorkingUserState(t0:Timeslot,userID:String,w:Option[UserStateMsg],
+ resourceTypes:Map[String,ResourceType]) : AbstractBillEntry = {
+ val t = t0.roundMilliseconds /* we do not care about milliseconds */
+ //Console.err.println("Timeslot: " + t0)
+ //Console.err.println("After rounding timeslot: " + t)
val ret = w match {
case None =>
+ val allMissing = addMissingServices(Nil,resourceTypes)
new BillEntry(counter.getAndIncrement.toString,
userID,"processing",
"0.0",
"0.0",
t.from.getTime.toString,t.to.getTime.toString,
- Nil)
+ allMissing)
case Some(w) =>
+ val wjson = AvroHelpers.jsonStringOfSpecificRecord(w)
+ Console.err.println("Working user state: %s".format(wjson))
val (rcEntries,rcEntriesCredits) = resourceEntriesAt(t,w)
- val resMap = aggregateResourceEntries(rcEntries)
- Console.err.println("Working user state: %s".format(w.toString))
+ val resList0 = aggregateResourceEntries(rcEntries)
+ val resList1 = addMissingServices(resList0,resourceTypes)
new BillEntry(counter.getAndIncrement.toString,
userID,"ok",
- w.totalCredits.toString,
+ w.getTotalCredits.toString,
rcEntriesCredits.toString,
- t.from.getTime.toString,t.to.getTime.toString,
- resMap)
+ t.from.getTime.toString,
+ t.to.getTime.toString,
+ resList1)
}
//Console.err.println("JSON: " + ret.toJsonString)
ret
}
- val jsonSample = "{\n \"id\":\"2\",\n \"userID\":\"loverdos@grnet.gr\",\n \"status\":\"ok\",\n \"remainingCredits\":\"3130.0000027777783\",\n \"deductedCredits\":\"5739.9999944444435\",\n \"startTime\":\"1341090000000\",\n \"endTime\":\"1343768399999\",\n \"bill\":[{\n \"resourceName\":\"diskspace\",\n \"resourceType\":\"diskspace\",\n \"unitName\":\"MB/Hr\",\n \"totalCredits\":\"2869.9999972222217\",\n \"eventType\":\"object update@/Papers/GOTO_HARMFUL.PDF\",\n\t \"details\":[\n\t {\"totalCredits\":\"2869.9999972222217\",\n\t \"details\":[{\n\t \"id\":\"0\",\n\t \"unitPrice\":\"0.01\",\n\t \"startTime\":\"1342735200000\",\n\t \"endTime\":\"1343768399999\",\n\t \"ellapsedTime\":\"1033199999\",\n\t \"credits\":\"2869.9999972222217\"\n\t \t}]\n\t }\n\t ]\n },{\n \"resourceName\":\"diskspace\",\n \"resourceType\":\"diskspace\",\n \"unitName\":\"MB/Hr\",\n \"totalCredits\":\"2869.9999972222217\",\n \"eventType\":\"object update@/Papers/GOTO_HARMFUL.PDF\",\n \"details\":[\t {\"totalCredits\":\"2869.9999972222217\",\n\t \"details\":[{\n\t \"id\":\"0\",\n\t \"unitPrice\":\"0.01\",\n\t \"startTime\":\"1342735200000\",\n\t \"endTime\":\"1343768399999\",\n\t \"ellapsedTime\":\"1033199999\",\n\t \"credits\":\"2869.9999972222217\"\n\t \t}]\n\t }\n\t]\n }]\n}"
-
- def main0(args: Array[String]) = {
- val b : BillEntry = StdConverters.AllConverters.convertEx[BillEntry](CompactJsonTextFormat(jsonSample))
- val l0 = b.bill
- val l1 = aggregateResourceEntries(l0)
-
- Console.err.println("Initial resources: ")
- for{ i <- l0 } Console.err.println("RESOURCE: " + i.toJsonString)
- Console.err.println("Aggregate resources: ")
- for{ a <- l1 } {
- Console.err.println("RESOURCE: %s\n %s\nEND RESOURCE".format(a.resourceName,a.toJsonString))
- }
-
- val aggr = new BillEntry(b.id,b.userID,b.status,b.remainingCredits,b.deductedCredits,b.startTime,b.endTime,l1)
- Console.err.println("Aggregate:\n" + aggr.toJsonString)
- }
-
//
def main(args: Array[String]) = {
//Console.err.println("JSON: " + (new BillEntry).toJsonString)
val propsfile = new FileStreamResource(new File("aquarium.properties"))
var _props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
- val aquarium = new AquariumBuilder(_props, ResourceLocator.DefaultPolicyModel).
- update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
+ val aquarium = new AquariumBuilder(_props, ResourceLocator.DefaultPolicyMsg).
+ //update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))).
build()
aquarium.start()