import com.ckkloverdos.resource.FileStreamResource
import gr.grnet.aquarium.converter.{CompactJsonTextFormat, StdConverters}
import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
-import gr.grnet.aquarium.message.avro.MessageHelpers
+import gr.grnet.aquarium.message.avro.{AvroHelpers, MessageHelpers}
import gr.grnet.aquarium.message.avro.gen.{ChargeslotMsg, WalletEntryMsg, UserStateMsg}
import gr.grnet.aquarium.store.memory.MemStoreProvider
import gr.grnet.aquarium.util.json.JsonSupport
import java.util.concurrent.atomic.AtomicLong
import scala.collection.immutable.TreeMap
import scala.collection.mutable.ListBuffer
+import gr.grnet.aquarium.policy.ResourceType
/*
val unitPrice:String,
val startTime:String,
val endTime:String,
- val ellapsedTime:String,
+ val elapsedTime:String,
+ val units:String,
val credits:String)
extends JsonSupport {}
case class ResourceEntry(val resourceName : String,
- val resourceType : String,
- val unitName : String,
+ //val resourceType : String,
+ //val unitName : String,
val totalCredits : String,
+ val totalElapsedTime : String,
+ val totalUnits : String,
val details : List[EventEntry])
-extends JsonSupport {}
+extends JsonSupport {
+ var unitName = "EMPTY_UNIT_NAME"
+ var resourceType = "EMPTY_RESOURCE_TYPE"
+}
+case class ServiceEntry(val serviceName: String,
+ val totalCredits : String,
+ val totalElapsedTime : String,
+ val totalUnits:String,
+ val unitName:String,
+ val details: List[ResourceEntry]
+ )
+extends JsonSupport {}
abstract class AbstractBillEntry
extends JsonSupport {}
val deductedCredits:String,
val startTime:String,
val endTime:String,
- val bill:List[ResourceEntry]
+ val bill:List[ServiceEntry]
)
extends AbstractBillEntry {}
Timeslot(dstart,dend)
} */
- private[this] def toChargeEntry(c:ChargeslotMsg) : ChargeEntry = {
+ private[this] def toChargeEntry(c:ChargeslotMsg) : (ChargeEntry,Long,Double) = {
val unitPrice = c.getUnitPrice.toString
val startTime = c.getStartMillis.toString
val endTime = c.getStopMillis.toString
- val difTime = (c.getStopMillis - c.getStartMillis).toString
+ val difTime = (c.getStopMillis - c.getStartMillis).toLong
+ val unitsD = (c.getCreditsToSubtract/c.getUnitPrice)
val credits = c.getCreditsToSubtract.toString
- new ChargeEntry(counter.getAndIncrement.toString,unitPrice,
- startTime,endTime,difTime,credits)
+ (new ChargeEntry(counter.getAndIncrement.toString,unitPrice,
+ startTime,endTime,difTime.toString,unitsD.toString,credits),difTime,unitsD)
}
- private[this] def toEventEntry(eventType:String,c:ChargeslotMsg) : EventEntry =
- new EventEntry(eventType,List(toChargeEntry(c)))
+ private[this] def toEventEntry(eventType:String,c:ChargeslotMsg) : (EventEntry,Long,Double) = {
+ val (c1,l1,d1) = toChargeEntry(c)
+ (new EventEntry(eventType,List(c1)),l1,d1)
+ }
private[this] def toResourceEntry(w:WalletEntryMsg) : ResourceEntry = {
val rcType = w.getResourceType.getName
val rcName = rcType match {
case "diskspace" =>
- String.valueOf(MessageHelpers.currentResourceEventOf(w).getDetails.get("path"))
+ String.valueOf(MessageHelpers.currentResourceEventOf(w).getDetails.get("path").getAnyValue)
case _ =>
MessageHelpers.currentResourceEventOf(w).getInstanceID
}
val eventType = //TODO: This is hardcoded; find a better solution
rcType match {
case "diskspace" =>
- val action = MessageHelpers.currentResourceEventOf(w).getDetails.get("action")
- val path = MessageHelpers.currentResourceEventOf(w).getDetails.get("path")
+ val action = MessageHelpers.currentResourceEventOf(w).getDetails.get("action").getAnyValue
+ //val path = MessageHelpers.currentResourceEventOf(w).getDetails.get("path")
//"%s@%s".format(action,path)
action
case "vmtime" =>
case "addcredits" =>
"once"
}
-
+ //w.
import scala.collection.JavaConverters.asScalaBufferConverter
- for { c <- w.getChargeslots.asScala }{
- if(c.getCreditsToSubtract != 0.0) {
- //Console.err.println("c.creditsToSubtract : " + c.creditsToSubtract)
- eventEntry += toEventEntry(eventType.toString,c)
- //credits += c.creditsToSubtract
+ ///FIXME: val elapsedTime = w.getChargeslots.asScala.foldLeft()
+ //c.getStopMillis - c.getStartMillis
+ var totalElapsedTime = 0L
+ var totalUnits = 0.0D
+ for { c <- w.getChargeslots.asScala }{
+ if(c.getCreditsToSubtract != 0.0) {
+ //Console.err.println("c.creditsToSubtract : " + c.creditsToSubtract)
+ val (e,l,u) = toEventEntry(eventType.toString,c)
+ eventEntry += e
+ totalElapsedTime += l
+ totalUnits += u
+ }
}
- }
//Console.err.println("TOTAL resource event credits: " + credits)
- new ResourceEntry(rcName,rcType,rcUnitName,credits.toString,eventEntry.toList)
+ val re = new ResourceEntry(rcName,/*rcType,rcUnitName,*/credits.toString,totalElapsedTime.toString,
+ totalUnits.toString,eventEntry.toList)
+ re.unitName = rcUnitName
+ re.resourceType = rcType
+ re
}
private[this] def resourceEntriesAt(t:Timeslot,w:UserStateMsg) : (List[ResourceEntry],Double) = {
val referenceTimeslot = MessageHelpers.referenceTimeslotOf(i)
if(t.contains(referenceTimeslot) && i.getSumOfCreditsToSubtract.toDouble != 0.0){
/*Console.err.println("i.sumOfCreditsToSubtract : " + i.sumOfCreditsToSubtract)*/
- if(i.getSumOfCreditsToSubtract.toDouble > 0.0D) sum += i.getSumOfCreditsToSubtract.toDouble
+ if(i.getSumOfCreditsToSubtract.toDouble > 0.0D)
+ sum += i.getSumOfCreditsToSubtract.toDouble
ret += toResourceEntry(i)
} else {
- /*Console.err.println("WALLET ENTERY : " + i.toJsonString + "\n" +
- t + " does not contain " + i.referenceTimeslot + " !!!!")*/
+ val ijson = AvroHelpers.jsonStringOfSpecificRecord(i)
+ val itimeslot = MessageHelpers.referenceTimeslotOf(i)
+ Console.err.println("IGNORING WALLET ENTRY : " + ijson + "\n" +
+ t + " does not contain " + itimeslot + " !!!!")
}
}
(ret.toList,sum)
}
- private[this] def aggregateResourceEntries(re:List[ResourceEntry]) : List[ResourceEntry] = {
+ private[this] def aggregateResourceEntries(re:List[ResourceEntry]) : List[ServiceEntry] = {
def addResourceEntries(a:ResourceEntry,b:ResourceEntry) : ResourceEntry = {
assert(a.resourceName == b.resourceName)
val totalCredits = (a.totalCredits.toDouble+b.totalCredits.toDouble).toString
- a.copy(a.resourceName,a.resourceType,a.unitName,totalCredits,a.details ::: b.details)
+ val totalElapsedTime = (a.totalElapsedTime.toLong+b.totalElapsedTime.toLong).toString
+ val totalUnits = (a.totalUnits.toDouble+b.totalUnits.toDouble).toString
+ val ab = a.copy(a.resourceName/*,a.resourceType,a.unitName*/,totalCredits,totalElapsedTime,totalUnits,
+ a.details ::: b.details)
+ ab.unitName = a.unitName
+ ab.resourceType = a.resourceType
+ ab
}
- re.foldLeft(TreeMap[String,ResourceEntry]()){ (map,r1) =>
+ val map0 = re.foldLeft(TreeMap[String,ResourceEntry]()){ (map,r1) =>
map.get(r1.resourceName) match {
case None => map + ((r1.resourceName,r1))
case Some(r0) => (map - r0.resourceName) +
((r0.resourceName, addResourceEntries(r0,r1)))
}
- }.values.toList
+ }
+ val map1 = map0.foldLeft(TreeMap[String,List[ResourceEntry]]()){ case (map,(_,r1)) =>
+ map.get(r1.resourceType) match {
+ case None => map + ((r1.resourceType,List(r1)))
+ case Some(rl) => (map - r1.resourceType) + ((r1.resourceType,r1::rl))
+ }
+ }
+ map1.foldLeft(List[ServiceEntry]()){ case (ret,(serviceName,resList)) =>
+ val (totalCredits,totalElapsedTime,totalUnits) =
+ resList.foldLeft((0.0D,0L,0.0D)){ case ((a,b,c),r) =>
+ (a+r.totalCredits.toDouble,
+ b+r.totalElapsedTime.toLong,
+ c+r.totalUnits.toDouble
+ )}
+ new ServiceEntry(serviceName,totalCredits.toString,
+ totalElapsedTime.toString,totalUnits.toString,
+ resList.head.unitName,resList) :: ret
+ }
}
- def fromWorkingUserState(t:Timeslot,userID:String,w:Option[UserStateMsg]) : AbstractBillEntry = {
+ def addMissingServices(se:List[ServiceEntry],re:Map[String,ResourceType]) : List[ServiceEntry]=
+ se:::(re -- se.map(_.serviceName).toSet).foldLeft(List[ServiceEntry]()) { case (ret,(name,typ:ResourceType)) =>
+ new ServiceEntry(name,"0.0","0","0.0",typ.unit,List[ResourceEntry]()) :: ret
+ }
+
+ def fromWorkingUserState(t0:Timeslot,userID:String,w:Option[UserStateMsg],
+ resourceTypes:Map[String,ResourceType]) : AbstractBillEntry = {
+ val t = t0.roundMilliseconds /* we do not care about milliseconds */
+ //Console.err.println("Timeslot: " + t0)
+ //Console.err.println("After rounding timeslot: " + t)
val ret = w match {
case None =>
+ val allMissing = addMissingServices(Nil,resourceTypes)
new BillEntry(counter.getAndIncrement.toString,
userID,"processing",
"0.0",
"0.0",
t.from.getTime.toString,t.to.getTime.toString,
- Nil)
+ allMissing)
case Some(w) =>
+ val wjson = AvroHelpers.jsonStringOfSpecificRecord(w)
+ Console.err.println("Working user state: %s".format(wjson))
val (rcEntries,rcEntriesCredits) = resourceEntriesAt(t,w)
- val resMap = aggregateResourceEntries(rcEntries)
- Console.err.println("Working user state: %s".format(w.toString))
+ val resList0 = aggregateResourceEntries(rcEntries)
+ val resList1 = addMissingServices(resList0,resourceTypes)
new BillEntry(counter.getAndIncrement.toString,
userID,"ok",
w.getTotalCredits.toString,
rcEntriesCredits.toString,
- t.from.getTime.toString,t.to.getTime.toString,
- resMap)
+ t.from.getTime.toString,
+ t.to.getTime.toString,
+ resList1)
}
//Console.err.println("JSON: " + ret.toJsonString)
ret
}
- val jsonSample = "{\n \"id\":\"2\",\n \"userID\":\"loverdos@grnet.gr\",\n \"status\":\"ok\",\n \"remainingCredits\":\"3130.0000027777783\",\n \"deductedCredits\":\"5739.9999944444435\",\n \"startTime\":\"1341090000000\",\n \"endTime\":\"1343768399999\",\n \"bill\":[{\n \"resourceName\":\"diskspace\",\n \"resourceType\":\"diskspace\",\n \"unitName\":\"MB/Hr\",\n \"totalCredits\":\"2869.9999972222217\",\n \"eventType\":\"object update@/Papers/GOTO_HARMFUL.PDF\",\n\t \"details\":[\n\t {\"totalCredits\":\"2869.9999972222217\",\n\t \"details\":[{\n\t \"id\":\"0\",\n\t \"unitPrice\":\"0.01\",\n\t \"startTime\":\"1342735200000\",\n\t \"endTime\":\"1343768399999\",\n\t \"ellapsedTime\":\"1033199999\",\n\t \"credits\":\"2869.9999972222217\"\n\t \t}]\n\t }\n\t ]\n },{\n \"resourceName\":\"diskspace\",\n \"resourceType\":\"diskspace\",\n \"unitName\":\"MB/Hr\",\n \"totalCredits\":\"2869.9999972222217\",\n \"eventType\":\"object update@/Papers/GOTO_HARMFUL.PDF\",\n \"details\":[\t {\"totalCredits\":\"2869.9999972222217\",\n\t \"details\":[{\n\t \"id\":\"0\",\n\t \"unitPrice\":\"0.01\",\n\t \"startTime\":\"1342735200000\",\n\t \"endTime\":\"1343768399999\",\n\t \"ellapsedTime\":\"1033199999\",\n\t \"credits\":\"2869.9999972222217\"\n\t \t}]\n\t }\n\t]\n }]\n}"
-
- def main0(args: Array[String]) = {
- val b : BillEntry = StdConverters.AllConverters.convertEx[BillEntry](CompactJsonTextFormat(jsonSample))
- val l0 = b.bill
- val l1 = aggregateResourceEntries(l0)
-
- Console.err.println("Initial resources: ")
- for{ i <- l0 } Console.err.println("RESOURCE: " + i.toJsonString)
- Console.err.println("Aggregate resources: ")
- for{ a <- l1 } {
- Console.err.println("RESOURCE: %s\n %s\nEND RESOURCE".format(a.resourceName,a.toJsonString))
- }
-
- val aggr = new BillEntry(b.id,b.userID,b.status,b.remainingCredits,b.deductedCredits,b.startTime,b.endTime,l1)
- Console.err.println("Aggregate:\n" + aggr.toJsonString)
- }
-
//
def main(args: Array[String]) = {
//Console.err.println("JSON: " + (new BillEntry).toJsonString)
val propsfile = new FileStreamResource(new File("aquarium.properties"))
var _props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
val aquarium = new AquariumBuilder(_props, ResourceLocator.DefaultPolicyMsg).
- update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
+ //update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))).
build()
aquarium.start()