-package gr.grnet.aquarium.charging.bill
-
-import gr.grnet.aquarium.charging.state.WorkingUserState
-import gr.grnet.aquarium.util.json.JsonSupport
-import com.ckkloverdos.resource.FileStreamResource
-import java.io.File
-import com.ckkloverdos.props.Props
-import gr.grnet.aquarium.converter.{CompactJsonTextFormat, PrettyJsonTextFormat, StdConverters}
-import gr.grnet.aquarium.{Aquarium, ResourceLocator, AquariumBuilder}
-import gr.grnet.aquarium.store.memory.MemStoreProvider
-import gr.grnet.aquarium.converter.StdConverters._
-import scala._
-import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
-import java.util.concurrent.atomic.AtomicLong
-import java.util.{Date, Calendar, GregorianCalendar}
-import gr.grnet.aquarium.charging.wallet.WalletEntry
-import scala.collection.parallel.mutable
-import scala.collection.mutable.ListBuffer
-import gr.grnet.aquarium.Aquarium.EnvKeys
-import gr.grnet.aquarium.charging.Chargeslot
-import scala.collection.immutable.TreeMap
-import scala.Some
-import gr.grnet.aquarium.charging.Chargeslot
-
-
/*
* Copyright 2011-2012 GRNET S.A. All rights reserved.
*
* or implied, of GRNET S.A.
*/
+package gr.grnet.aquarium.charging.bill
+
+import com.ckkloverdos.props.Props
+import com.ckkloverdos.resource.FileStreamResource
+import gr.grnet.aquarium.converter.StdConverters
+import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
+import gr.grnet.aquarium.message.avro.{AvroHelpers, MessageHelpers}
+import gr.grnet.aquarium.message.avro.gen._
+import gr.grnet.aquarium.util.json.JsonSupport
+import gr.grnet.aquarium.{Real, Aquarium, ResourceLocator, AquariumBuilder}
+import java.io.File
+import java.util.concurrent.atomic.AtomicLong
+import scala.collection.immutable.TreeMap
+import scala.collection.mutable.ListBuffer
+import scala.Some
+import gr.grnet.aquarium.policy.ResourceType
+import java.util
+
/*
* @author Prodromos Gerakios <pgerakios@grnet.gr>
*/
-
+/*
case class ChargeEntry(val id:String,
val unitPrice:String,
val startTime:String,
val endTime:String,
- val ellapsedTime:String,
+ val elapsedTime:String,
+ val units:String,
val credits:String)
extends JsonSupport {}
val resourceType : String,
val unitName : String,
val totalCredits : String,
+ val totalElapsedTime : String,
+ val totalUnits : String,
val details : List[EventEntry])
-extends JsonSupport {}
+extends JsonSupport {
+ //var unitName = "EMPTY_UNIT_NAME"
+ //var resourceType = "EMPTY_RESOURCE_TYPE"
+}
+case class ServiceEntry(val serviceName: String,
+ val totalCredits : String,
+ val totalElapsedTime : String,
+ val totalUnits:String,
+ val unitName:String,
+ val details: List[ResourceEntry]
+ )
+extends JsonSupport {}
-abstract class AbstractBillEntry
- extends JsonSupport {}
class BillEntry(val id:String,
val userID : String,
val deductedCredits:String,
val startTime:String,
val endTime:String,
- val bill:List[ResourceEntry]
+ val bill:List[ServiceEntry]
)
- extends AbstractBillEntry {}
-
+ extends JsonSupport {}
+*/
-object AbstractBillEntry {
+object BillEntryMsg {
private[this] val counter = new AtomicLong(0L)
private[this] def nextUIDObject() = counter.getAndIncrement
val dend = cal.getTime
Timeslot(dstart,dend)
} */
+ private [this] def newChargeEntry(id:String,
+ unitPrice:String,
+ startTime:String,
+ endTime:String,
+ elapsedTime:String,
+ units:String,
+ credits:String) : ChargeEntryMsg = {
+ val msg = new ChargeEntryMsg
+ msg.setId(id)
+ msg.setUnitPrice(unitPrice)
+ msg.setStartTime(startTime)
+ msg.setEndTime(endTime)
+ msg.setElapsedTime(elapsedTime)
+ msg.setUnits(units)
+ msg.setCredits(credits)
+ msg
+ }
+
+ private[this] def newEventEntry(eventType : String,
+ details : java.util.List[ChargeEntryMsg]) : EventEntryMsg = {
+
+ val msg = new EventEntryMsg
+ msg.setEventType(eventType)
+ msg.setDetails(details)
+ msg
+ }
+
+ private [this] def newResourceEntry( resourceName : String,
+ resourceType : String,
+ unitName : String,
+ totalCredits : String,
+ totalElapsedTime : String,
+ totalUnits : String,
+ details : java.util.List[EventEntryMsg]) : ResourceEntryMsg = {
+ val msg = new ResourceEntryMsg
+ msg.setResourceName(resourceName)
+ msg.setResourceType(resourceType)
+ msg.setUnitName(unitName)
+ msg.setTotalCredits(totalCredits)
+ msg.setTotalElapsedTime(totalElapsedTime)
+ msg.setTotalUnits(totalUnits)
+ msg.setDetails(details)
+ msg
+ }
+
+ private[this] def newServiceEntry( serviceName: String,
+ totalCredits : String,
+ totalElapsedTime : String,
+ totalUnits:String,
+ unitName:String,
+ details: java.util.List[ResourceEntryMsg]
+ ) : ServiceEntryMsg = {
+ val msg = new ServiceEntryMsg
+ msg.setServiceName(serviceName)
+ msg.setTotalCredits(totalCredits)
+ msg.setTotalElapsedTime(totalElapsedTime)
+ msg.setTotalUnits(totalUnits)
+ msg.setUnitName(unitName)
+ msg.setDetails(details)
+ msg
+ }
+
- private[this] def toChargeEntry(c:Chargeslot) : ChargeEntry = {
- val unitPrice = c.unitPrice.toString
- val startTime = c.startMillis.toString
- val endTime = c.stopMillis.toString
- val difTime = (c.stopMillis - c.startMillis).toString
- val credits = c.creditsToSubtract.toString
- new ChargeEntry(counter.getAndIncrement.toString,unitPrice,
- startTime,endTime,difTime,credits)
+ private[this] def newBillEntry( id:String,
+ userID : String,
+ status : String,
+ remainingCredits:String,
+ deductedCredits:String,
+ startTime:String,
+ endTime:String,
+ bill: java.util.List[ServiceEntryMsg]
+ ) : BillEntryMsg = {
+ val msg = new BillEntryMsg
+ msg.setId(id)
+ msg.setUserID(userID)
+ msg.setStatus(status)
+ msg.setRemainingCredits(remainingCredits)
+ msg.setDeductedCredits(deductedCredits)
+ msg.setStartTime(startTime)
+ msg.setEndTime(endTime)
+ msg.setBill(bill)
+ msg
}
- private[this] def toEventEntry(eventType:String,c:Chargeslot) : EventEntry =
- new EventEntry(eventType,List(toChargeEntry(c)))
+ private[this] def javaList[A](l:A*) : java.util.List[A] = {
+ val al = new java.util.ArrayList[A]()
+ l.foreach(al.add(_))
+ al
+ }
+
+
+ private[this] def toChargeEntry(c:ChargeslotMsg) : (ChargeEntryMsg,Long,Real) = {
+ val unitPrice = c.getUnitPrice.toString
+ val startTime = c.getStartMillis.toString
+ val endTime = c.getStopMillis.toString
+ val difTime = (c.getStopMillis - c.getStartMillis).toLong
+ val unitsD = (Real(c.getCreditsToSubtract)/Real(c.getUnitPrice))
+ val credits = c.getCreditsToSubtract.toString
+ (newChargeEntry(counter.getAndIncrement.toString,unitPrice,
+ startTime,endTime,difTime.toString,unitsD.toString,credits),difTime,unitsD)
+ }
+
+
+
+ private[this] def toEventEntry(eventType:String,c:ChargeslotMsg) : (EventEntryMsg,Long,Real) = {
+ val (c1,l1,d1) = toChargeEntry(c)
+ (newEventEntry(eventType,javaList(c1)),l1,d1)
+ }
- private[this] def toResourceEntry(w:WalletEntry) : ResourceEntry = {
- assert(w.sumOfCreditsToSubtract==0.0 || w.chargslotCount > 0)
- val rcName = w.currentResourceEvent.clientID match {
- case "pithos" =>
- w.currentResourceEvent.details("path")
+ private[this] def toResourceEntry(w:WalletEntryMsg) : ResourceEntryMsg = {
+ assert(w.getSumOfCreditsToSubtract==0.0 || MessageHelpers.chargeslotCountOf(w) > 0)
+ val rcType = w.getResourceType.getName
+ val rcName = rcType match {
+ case "diskspace" =>
+ String.valueOf(MessageHelpers.currentResourceEventOf(w).getDetails.get("path").getAnyValue)
case _ =>
- w.currentResourceEvent.instanceID
+ MessageHelpers.currentResourceEventOf(w).getInstanceID
}
- val rcType = w.resourceType.name
- val rcUnitName = w.resourceType.unit
- val eventEntry = new ListBuffer[EventEntry]
- val credits = w.sumOfCreditsToSubtract
+ val rcUnitName = w.getResourceType.getUnit
+ val credits = w.getSumOfCreditsToSubtract
val eventType = //TODO: This is hardcoded; find a better solution
- w.currentResourceEvent.clientID match {
- case "pithos" =>
- val action = w.currentResourceEvent.details("action")
- val path = w.currentResourceEvent.details("path")
- "%s@%s".format(action,path)
- case "cyclades" =>
- w.currentResourceEvent.value.toInt match {
+ rcType match {
+ case "diskspace" =>
+ val action = MessageHelpers.currentResourceEventOf(w).getDetails.get("action").getAnyValue
+ //val path = MessageHelpers.currentResourceEventOf(w).getDetails.get("path")
+ //"%s@%s".format(action,path)
+ action
+ case "vmtime" =>
+ MessageHelpers.currentResourceEventOf(w).getValue.toInt match {
case 0 => // OFF
"offOn"
case 1 => // ON
case _ =>
"BUG"
}
- case "astakos" =>
+ case "addcredits" =>
"once"
}
-
- for { c <- w.chargeslots }{
- if(c.creditsToSubtract != 0.0) {
- //Console.err.println("c.creditsToSubtract : " + c.creditsToSubtract)
- eventEntry += toEventEntry(eventType.toString,c)
- //credits += c.creditsToSubtract
+ //w.
+ import scala.collection.JavaConverters.asScalaBufferConverter
+ ///FIXME: val elapsedTime = w.getChargeslots.asScala.foldLeft()
+ //c.getStopMillis - c.getStartMillis
+ var totalElapsedTime = 0L
+ var totalUnits = Real.Zero
+ val eventEntry = new java.util.ArrayList[EventEntryMsg]() //new ListBuffer[EventEntry]
+ for { c <- w.getChargeslots.asScala }{
+ if(c.getCreditsToSubtract != 0.0) {
+ //Console.err.println("c.creditsToSubtract : " + c.creditsToSubtract)
+ val (e,l,u) = toEventEntry(eventType.toString,c)
+ eventEntry.add(e) //eventEntry += e
+ totalElapsedTime += l
+ totalUnits += u
+ }
}
- }
//Console.err.println("TOTAL resource event credits: " + credits)
- new ResourceEntry(rcName,rcType,rcUnitName,credits.toString,eventEntry.toList)
+ /*val re =*/ newResourceEntry(rcName,rcType,rcUnitName,credits.toString,totalElapsedTime.toString,
+ totalUnits.toString,eventEntry)
+ //re.unitName = rcUnitName
+ //re.resourceType = rcType
+ //re
}
- private[this] def resourceEntriesAt(t:Timeslot,w:WorkingUserState) : (List[ResourceEntry],Double) = {
- val ret = new ListBuffer[ResourceEntry]
+ private[this] def resourceEntriesAt(t:Timeslot,w:UserStateMsg) : (java.util.List[ResourceEntryMsg],Double) = {
var sum = 0.0
//Console.err.println("Wallet entries: " + w.walletEntries)
- val walletEntries = w.walletEntries
+ import scala.collection.JavaConverters.asScalaBufferConverter
+ val walletEntries = w.getWalletEntries.asScala
+ /*Console.err.println("Wallet entries ")
+ for { i <- walletEntries }
+ Console.err.println("WALLET ENTRY\n%s\nEND WALLET ENTRY".format(i.toJsonString))
+ Console.err.println("End wallet entries")*/
+ val ret = new java.util.ArrayList[ResourceEntryMsg]()//new ListBuffer[ResourceEntry]
for { i <- walletEntries} {
- if(t.contains(i.referenceTimeslot) && i.sumOfCreditsToSubtract != 0.0){
- //Console.err.println("i.sumOfCreditsToSubtract : " + i.sumOfCreditsToSubtract)
- if(i.sumOfCreditsToSubtract > 0.0D) sum += i.sumOfCreditsToSubtract
- ret += toResourceEntry(i)
+ val referenceTimeslot = MessageHelpers.referenceTimeslotOf(i)
+ if(t.contains(referenceTimeslot) && i.getSumOfCreditsToSubtract.toDouble != 0.0){
+ /*Console.err.println("i.sumOfCreditsToSubtract : " + i.sumOfCreditsToSubtract)*/
+ if(i.getSumOfCreditsToSubtract.toDouble > 0.0D)
+ sum += i.getSumOfCreditsToSubtract.toDouble
+ ret.add(toResourceEntry(i)) //ret += toResourceEntry(i)
} else {
- //Console.err.println("WALLET ENTERY : " + i + "\n" +
- // t + " does not contain " + i.referenceTimeslot + " !!!!")
+ val ijson = AvroHelpers.jsonStringOfSpecificRecord(i)
+ val itimeslot = MessageHelpers.referenceTimeslotOf(i)
+ // Console.err.println("IGNORING WALLET ENTRY : " + ijson + "\n" +
+ // t + " does not contain " + itimeslot + " !!!!")
}
}
- (ret.toList,sum)
+ (ret,sum)
}
- private[this] def aggregateResourceEntries(re:List[ResourceEntry]) : List[ResourceEntry] = {
- def addResourceEntries(a:ResourceEntry,b:ResourceEntry) : ResourceEntry = {
- assert(a.resourceName == b.resourceName)
- val totalCredits = (a.totalCredits.toDouble+b.totalCredits.toDouble).toString
- a.copy(a.resourceName,a.resourceType,a.unitName,totalCredits,a.details ::: b.details)
+ private[this] def aggregateResourceEntries(re:java.util.List[ResourceEntryMsg]) : java.util.List[ServiceEntryMsg] = {
+ def addResourceEntries(a:ResourceEntryMsg,b:ResourceEntryMsg) : ResourceEntryMsg = {
+ assert(a.getResourceName == b.getResourceName)
+ val totalCredits = (a.getTotalCredits.toDouble+b.getTotalCredits.toDouble).toString
+ val totalElapsedTime = (a.getTotalElapsedTime.toLong+b.getTotalElapsedTime.toLong).toString
+ val totalUnits = (a.getTotalUnits.toDouble+b.getTotalUnits.toDouble).toString
+ val msg = new ResourceEntryMsg
+
+ val details = new java.util.ArrayList[EventEntryMsg](a.getDetails)
+ details.addAll(b.getDetails)
+ msg.setResourceName(a.getResourceName)
+ msg.setResourceType(a.getResourceType)
+ msg.setUnitName(a.getUnitName)
+ msg.setTotalCredits(totalCredits)
+ msg.setTotalElapsedTime(totalElapsedTime)
+ msg.setTotalUnits(totalUnits)
+ msg.setDetails(details)
+ msg
+ /*/*val ab =*/ a.copy(a.getResourceName,a.getResourceType,a.getUnitName,totalCredits,totalElapsedTime,totalUnits,
+ {a.getDetails.addAll(b.details); a})
+ //ab.unitName = a.unitName
+ //ab.resourceType = a.resourceType
+ //ab*/
}
- re.foldLeft(TreeMap[String,ResourceEntry]()){ (map,r1) =>
- map.get(r1.resourceName) match {
- case None => map + ((r1.resourceName,r1))
- case Some(r0) => (map - r0.resourceName) +
- ((r0.resourceName, addResourceEntries(r0,r1)))
+ import scala.collection.JavaConverters.asScalaBufferConverter
+ val map0 = re.asScala.foldLeft(TreeMap[String,ResourceEntryMsg]()){ (map,r1) =>
+ map.get(r1.getResourceName) match {
+ case None => map + ((r1.getResourceName,r1))
+ case Some(r0) => (map - r0.getResourceName) +
+ ((r0.getResourceName, addResourceEntries(r0,r1)))
}
- }.values.toList
+ }
+ val map1 = map0.foldLeft(TreeMap[String,List[ResourceEntryMsg]]()){ case (map,(_,r1)) =>
+ map.get(r1.getResourceType) match {
+ case None => map + ((r1.getResourceType,List(r1)))
+ case Some(rl) => (map - r1.getResourceType) + ((r1.getResourceType,r1::rl))
+ }
+ }
+ import scala.collection.JavaConverters.seqAsJavaListConverter
+ map1.foldLeft(List[ServiceEntryMsg]()){ case (ret,(serviceName,resList)) =>
+ val (totalCredits,totalElapsedTime,totalUnits) =
+ resList.foldLeft((0.0D,0L,0.0D)){ case ((a,b,c),r) =>
+ (a+r.getTotalCredits.toDouble,
+ b+r.getTotalElapsedTime.toLong,
+ c+r.getTotalUnits.toDouble
+ )}
+ newServiceEntry(serviceName,totalCredits.toString,
+ totalElapsedTime.toString,totalUnits.toString,
+ resList.head.getUnitName,resList.asJava) :: ret
+ }.asJava
}
- def fromWorkingUserState(t:Timeslot,userID:String,w:Option[WorkingUserState]) : AbstractBillEntry = {
+ def addMissingServices(se0:java.util.List[ServiceEntryMsg],re:Map[String,ResourceType]) :
+ java.util.List[ServiceEntryMsg]= {
+ import scala.collection.JavaConverters.asScalaBufferConverter
+ val se = se0.asScala.toList
+ import scala.collection.JavaConverters.seqAsJavaListConverter
+ (se :::(re -- se.map(_.getServiceName).toSet).foldLeft(List[ServiceEntryMsg]()) { case (ret,(name,typ:ResourceType)) =>
+ newServiceEntry(name,"0.0","0","0.0",typ.unit,new java.util.ArrayList[ResourceEntryMsg]()) :: ret
+ }).asJava
+ }
+
+ def fromWorkingUserState(t0:Timeslot,userID:String,w:Option[UserStateMsg],
+ resourceTypes:Map[String,ResourceType]) : BillEntryMsg = {
+ val t = t0.roundMilliseconds /* we do not care about milliseconds */
+ //Console.err.println("Timeslot: " + t0)
+ //Console.err.println("After rounding timeslot: " + t)
val ret = w match {
case None =>
- new BillEntry(counter.getAndIncrement.toString,
+ val allMissing = addMissingServices(new util.ArrayList[ServiceEntryMsg](),resourceTypes)
+ newBillEntry(counter.getAndIncrement.toString,
userID,"processing",
"0.0",
"0.0",
t.from.getTime.toString,t.to.getTime.toString,
- Nil)
+ allMissing)
case Some(w) =>
+ val wjson = AvroHelpers.jsonStringOfSpecificRecord(w)
+ //Console.err.println("Working user state: %s".format(wjson))
val (rcEntries,rcEntriesCredits) = resourceEntriesAt(t,w)
- val resMap = aggregateResourceEntries(rcEntries)
- new BillEntry(counter.getAndIncrement.toString,
+ val resList0 = aggregateResourceEntries(rcEntries)
+ val resList1 = addMissingServices(resList0,resourceTypes)
+ newBillEntry(counter.getAndIncrement.toString,
userID,"ok",
- w.totalCredits.toString,
+ w.getTotalCredits.toString,
rcEntriesCredits.toString,
- t.from.getTime.toString,t.to.getTime.toString,
- resMap)
+ t.from.getTime.toString,
+ t.to.getTime.toString,
+ resList1)
}
- //Console.err.println("JSON: " + ret.toJsonString)
+ //Console.err.println("JSON: " + AvroHelpers.jsonStringOfSpecificRecord(ret))
ret
}
- val jsonSample = "{\n \"id\":\"2\",\n \"userID\":\"loverdos@grnet.gr\",\n \"status\":\"ok\",\n \"remainingCredits\":\"3130.0000027777783\",\n \"deductedCredits\":\"5739.9999944444435\",\n \"startTime\":\"1341090000000\",\n \"endTime\":\"1343768399999\",\n \"bill\":[{\n \"resourceName\":\"diskspace\",\n \"resourceType\":\"diskspace\",\n \"unitName\":\"MB/Hr\",\n \"totalCredits\":\"2869.9999972222217\",\n \"eventType\":\"object update@/Papers/GOTO_HARMFUL.PDF\",\n\t \"details\":[\n\t {\"totalCredits\":\"2869.9999972222217\",\n\t \"details\":[{\n\t \"id\":\"0\",\n\t \"unitPrice\":\"0.01\",\n\t \"startTime\":\"1342735200000\",\n\t \"endTime\":\"1343768399999\",\n\t \"ellapsedTime\":\"1033199999\",\n\t \"credits\":\"2869.9999972222217\"\n\t \t}]\n\t }\n\t ]\n },{\n \"resourceName\":\"diskspace\",\n \"resourceType\":\"diskspace\",\n \"unitName\":\"MB/Hr\",\n \"totalCredits\":\"2869.9999972222217\",\n \"eventType\":\"object update@/Papers/GOTO_HARMFUL.PDF\",\n \"details\":[\t {\"totalCredits\":\"2869.9999972222217\",\n\t \"details\":[{\n\t \"id\":\"0\",\n\t \"unitPrice\":\"0.01\",\n\t \"startTime\":\"1342735200000\",\n\t \"endTime\":\"1343768399999\",\n\t \"ellapsedTime\":\"1033199999\",\n\t \"credits\":\"2869.9999972222217\"\n\t \t}]\n\t }\n\t]\n }]\n}"
-
- def main0(args: Array[String]) = {
- val b : BillEntry = StdConverters.AllConverters.convertEx[BillEntry](CompactJsonTextFormat(jsonSample))
- val l0 = b.bill
- val l1 = aggregateResourceEntries(l0)
-
- Console.err.println("Initial resources: ")
- for{ i <- l0 } Console.err.println("RESOURCE: " + i.toJsonString)
- Console.err.println("Aggregate resources: ")
- for{ a <- l1 } {
- Console.err.println("RESOURCE: %s\n %s\nEND RESOURCE".format(a.resourceName,a.toJsonString))
- }
-
- val aggr = new BillEntry(b.id,b.userID,b.status,b.remainingCredits,b.deductedCredits,b.startTime,b.endTime,l1)
- Console.err.println("Aggregate:\n" + aggr.toJsonString)
- }
-
//
def main(args: Array[String]) = {
//Console.err.println("JSON: " + (new BillEntry).toJsonString)
val propsfile = new FileStreamResource(new File("aquarium.properties"))
var _props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
- val aquarium = new AquariumBuilder(_props, ResourceLocator.DefaultPolicyModel).
- update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
+ val aquarium = new AquariumBuilder(_props, ResourceLocator.DefaultPolicyMsg).
+ //update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
update(Aquarium.EnvKeys.eventsStoreFolder,Some(new File(".."))).
build()
aquarium.start()