/**
* The venerable /etc resource context
*/
- val SlashEtcResourceContext = new FileStreamResourceContext("/etc")
+ val SlashEtcResourceContext = new FileStreamResourceContext("/etc/aquarium")
/**
* Class loader resource context.
import gr.grnet.aquarium.util.date.TimeHelpers
import gr.grnet.aquarium.logic.events.PolicyEntry
import gr.grnet.aquarium.util.{CryptoUtils, Loggable}
+import java.util.concurrent.atomic.AtomicReference
/**
* Searches for and loads the applicable accounting policy
* @author Georgios Gousios <gousiosg@gmail.com>
*/
object Policy extends DSL with Loggable {
-
- private var policies = {reloadPolicies}
-
- lazy val policy = loadPolicyFromFile(policyFile)
+ /* Pointer to the latest policy */
+ private lazy val policies = new AtomicReference[Map[Timeslot, DSLPolicy]](reloadPolicies)
+
+ /* Pointer to the latest policy */
+ private val currentPolicy = new AtomicReference[DSLPolicy](policies.get.last._2)
+
+ /**
+ * Get the latest defined policy.
+ */
+ def policy = currentPolicy.get
+
+ /**
+ * Get the policy that is valid at the specified time instance.
+ */
def policy(at: Date): Maybe[DSLPolicy] = Maybe {
- policies.find {
- a => a._1.from.before(at) &&
- a._1.to.after(at)
+ policies.get.find {
+ a => (a._1.from.before(at) && a._1.to.after(at)) ||
+ (a._1.from.before(at) && a._1.to == -1)
} match {
case Some(x) => x._2
case None =>
}
}
+ /**
+ * Get the policies that are valid between the specified time instances
+ */
def policies(from: Date, to: Date): List[DSLPolicy] = {
- policies.filter {
+ policies.get.filter {
a => a._1.from.before(from) &&
a._1.to.after(to)
}.valuesIterator.toList
}
-
+
+ /**
+ * Get the policies that are valid throughout the specified
+ * [[gr.grnet.aquarium.logic.accounting.dsl.Timeslot]]
+ */
def policies(t: Timeslot): List[DSLPolicy] = policies(t.from, t.to)
+ /**
+ * Load and parse a policy from file.
+ */
def loadPolicyFromFile(pol: File): DSLPolicy = {
val stream = pol.exists() match {
parse(stream)
}
+ /**
+ * Trigger a policy update cycle.
+ */
+ def updatePolicies = synchronized {
+ //XXX: The following update should happen as one transaction
+ val tmpPol = reloadPolicies
+ currentPolicy.set(tmpPol.last._2)
+ policies.set(tmpPol)
+ }
+
+ /**
+ * Search for and open a stream to a policy.
+ */
private def policyFile: File =
MasterConfigurator.props.get(Keys.aquarium_policy) match {
case Just(x) => new File(x)
case _ => new File("/etc/aquarium/policy.yaml")
}
- private def reloadPolicies(): Map[Timeslot, DSLPolicy] = synchronized {
+ /**
+ * Check whether the policy definition file (in whichever path) is
+ * newer than the latest stored policy, reload and set it as current.
+ * This method has side-effects to this object's state.
+ */
+ private def reloadPolicies: Map[Timeslot, DSLPolicy] = {
//1. Load policies from db
val policies = MasterConfigurator.policyEventStore.loadPolicies(0)
package gr.grnet.aquarium.logic.events
import gr.grnet.aquarium.util.json.JsonHelpers
+import java.util.Date
/**
* A WalletEntry is a derived entity. Its data represent money/credits and are calculated based on
}
def copyWithReceivedMillis(millis: Long) = copy(receivedMillis = millis)
+
+ def occurredDate = new Date(occurredMillis)
+ def receivedDate = new Date(receivedMillis)
}
object WalletEntry {
/**
* Updates the policy record whose id
*/
- def updatePolicy(policy: PolicyEntry)
+ def updatePolicy(policy: PolicyEntry): Unit
}
\ No newline at end of file
package gr.grnet.aquarium.store
import com.ckkloverdos.maybe.Maybe
-import gr.grnet.aquarium.logic.events.{ResourceEvent, UserEvent}
+import gr.grnet.aquarium.logic.events.{UserEvent}
/**
- * * An abstraction for Aquarium user event stores.
+ * Store for external user events
*
* @author Georgios Gousios <gousiosg@gmail.com>
*/
trait UserEventStore {
+ /**
+ * Store an event
+ */
def storeUserEvent(event: UserEvent): Maybe[RecordID]
-
+ /**
+ * Find a user event by event ID
+ */
def findUserEventById(id: String): Maybe[UserEvent]
-
+ /**
+ * Find all user events by user ID
+ */
def findUserEventsByUserId(userId: String): List[UserEvent]
}
\ No newline at end of file
import com.ckkloverdos.maybe.Maybe
/**
- * A store for user state.
+ * A store for user state snapshots.
*
* This is used to hold snapshots of [[gr.grnet.aquarium.user.UserState]]
*
*/
trait UserStateStore {
+
+ /**
+ * Store a user state
+ */
def storeUserState(userState: UserState): Maybe[RecordID]
+
+ /**
+ * Find a state by user ID
+ */
def findUserStateByUserId(userId: String): Maybe[UserState]
+
+ /**
+ * Delete a state for a user
+ */
def deleteUserState(userId: String): Unit
}
\ No newline at end of file
import gr.grnet.aquarium.user.UserState
import gr.grnet.aquarium.Configurable
import com.ckkloverdos.props.Props
-import gr.grnet.aquarium.store.{RecordID, UserStateStore}
import com.ckkloverdos.maybe.{NoVal, Just, Maybe}
+import gr.grnet.aquarium.store._
+import scala.collection.JavaConversions._
+import java.util.Date
+import collection.mutable.ConcurrentMap
+import gr.grnet.aquarium.logic.events.{WalletEntry, ResourceEvent, UserEvent, PolicyEntry}
+import java.util.concurrent.ConcurrentHashMap
+import gr.grnet.aquarium.util.date.DateCalculator
/**
- * A user store backed by main memory.
- *
- * The IDs returned are the original user IDs.
+ * An implementation of various stores that persists data in memory
*
* @author Christos KK Loverdos <loverdos@gmail.com>
+ * @author Georgios Gousios <gousiosg@gmail.com>
*/
-class MemUserStateStore extends UserStateStore with Configurable {
- private[this] val userStateByUserId = new java.util.concurrent.ConcurrentHashMap[String, Just[UserState]]()
-
+class MemUserStateStore extends UserStateStore
+ with Configurable with PolicyStore
+ with ResourceEventStore with UserEventStore
+ with WalletEntryStore {
+
+ private[this] val userStateByUserId = new ConcurrentHashMap[String, Just[UserState]]()
+ private val policyById: ConcurrentMap[String, PolicyEntry] = new ConcurrentHashMap[String, PolicyEntry]()
+ private[this] val walletEntriesById: ConcurrentMap[String, WalletEntry] = new ConcurrentHashMap[String, WalletEntry]()
+ private val userEventById: ConcurrentMap[String, UserEvent] = new ConcurrentHashMap[String, UserEvent]()
+ private[this] val resourceEventsById: ConcurrentMap[String, ResourceEvent] = new ConcurrentHashMap[String, ResourceEvent]()
+
def configure(props: Props) = {
}
if (userStateByUserId.containsKey(userId))
userStateByUserId.remove(userId)
}
+
+ //- WalletEntryStore
+ def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] = {
+ walletEntriesById.put(entry.id, entry)
+ Just(RecordID(entry.id))
+ }
+
+ def findWalletEntryById(id: String): Maybe[WalletEntry] = {
+ Maybe(walletEntriesById.apply(id))
+ }
+
+ def findUserWalletEntries(userId: String): List[WalletEntry] = {
+ walletEntriesById.valuesIterator.filter(_.userId == userId).toList
+ }
+
+ def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date): List[WalletEntry] = {
+ walletEntriesById.valuesIterator.filter { we ⇒
+ val receivedDate = we.receivedDate
+
+ we.userId == userId &&
+ ( (from before receivedDate) || (from == receivedDate) ) &&
+ ( (to after receivedDate) || (to == receivedDate) )
+ true
+ }.toList
+ }
+
+ def findLatestUserWalletEntries(userId: String): Maybe[List[WalletEntry]] = NoVal
+
+ def findPreviousEntry(userId: String,
+ resource: String,
+ instanceId: String,
+ finalized: Option[Boolean]): List[WalletEntry] = Nil
+
+ def findWalletEntriesAfter(userId: String, from: Date): List[WalletEntry] = {
+ walletEntriesById.valuesIterator.filter { we ⇒
+ val occurredDate = we.occurredDate
+
+ we.userId == userId &&
+ ( (from before occurredDate) || (from == occurredDate) )
+ }.toList
+ }
+ //- WalletEntryStore
+
+ //+ ResourceEventStore
+ def storeResourceEvent(event: ResourceEvent) = {
+ resourceEventsById(event.id) = event
+ Just(RecordID(event.id))
+ }
+
+ def findResourceEventById(id: String) = {
+ Maybe(resourceEventsById(id))
+ }
+
+ def findResourceEventsByUserId(userId: String)
+ (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
+ val byUserId = resourceEventsById.valuesIterator.filter(_.userId == userId).toArray
+ val sorted = sortWith match {
+ case Some(sorter) ⇒
+ byUserId.sortWith(sorter)
+ case None ⇒
+ byUserId
+ }
+
+ sorted.toList
+ }
+
+ def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
+ resourceEventsById.valuesIterator.filter { ev ⇒
+ ev.userId == userId &&
+ (ev.occurredMillis > timestamp)
+ }.toList
+ }
+
+ def findResourceEventHistory(userId: String,
+ resName: String,
+ instid: Option[String],
+ upTo: Long): List[ResourceEvent] = {
+ Nil
+ }
+
+ def findResourceEventsForReceivedPeriod(userId: String,
+ startTimeMillis: Long,
+ stopTimeMillis: Long): List[ResourceEvent] = {
+ resourceEventsById.valuesIterator.filter { ev ⇒
+ ev.userId == userId &&
+ ev.receivedMillis >= startTimeMillis &&
+ ev.receivedMillis <= stopTimeMillis
+ }.toList
+ }
+
+ def countOutOfSyncEventsForBillingMonth(userId: String, yearOfBillingMonth: Int, billingMonth: Int): Maybe[Long] = Maybe {
+ val billingMonthDate = new DateCalculator(yearOfBillingMonth, billingMonth)
+ val billingDateStart = billingMonthDate
+ val billingDateEnd = billingDateStart.endOfThisMonth
+ resourceEventsById.valuesIterator.filter { case ev ⇒
+ // out of sync events are those that were received in the billing month but occurred in previous months
+ val receivedMillis = ev.receivedMillis
+ val occurredMillis = ev.occurredMillis
+
+ billingDateStart.afterEqMillis(receivedMillis) && // the events that...
+ billingDateEnd.beforeEqMillis (receivedMillis) && // ...were received withing the billing month
+ ( //
+ billingDateStart.afterMillis(occurredMillis) // but occurred before the billing period
+ )
+ }.size.toLong
+ }
+ //- ResourceEventStore
+
+ def storeUserEvent(event: UserEvent) = {userEventById += (event.id -> event); Just(RecordID(event.id))}
+
+ def findUserEventById(id: String) = Maybe{userEventById.getOrElse(id, null)}
+
+ def findUserEventsByUserId(userId: String) = userEventById.values.filter{v => v.userId == userId}.toList
+
+ def loadPolicies(after: Long) = policyById.values.foldLeft(List[PolicyEntry]()){
+ (acc, v) => if(v.validFrom > after) v :: acc else acc
+ }
+
+ def storePolicy(policy: PolicyEntry) = {policyById += (policy.id -> policy); Just(RecordID(policy.id))}
+
+ def updatePolicy(policy: PolicyEntry) = storePolicy(policy)
}
\ No newline at end of file
ensureNoGaps(data.sortWith((a,b) => if (b.validFrom > a.validFrom) true else false))
def ensureNoGaps(agreements: List[Agreement]): Unit = agreements match {
- case ha :: (t @ (hb :: tail)) => assert(ha.validTo - hb.validFrom == 1); ensureNoGaps(t)
- case h :: Nil => assert(h.validTo == -1)
+ case ha :: (t @ (hb :: tail)) =>
+ assert(ha.validTo - hb.validFrom == 1);
+ ensureNoGaps(t)
+ case h :: Nil =>
+ assert(h.validTo == -1)
case Nil => ()
}
def this(year: Int, monthOfYear: Int, dayOfMonth: Int) =
this(new MutableDateTime(year, monthOfYear, dayOfMonth, 0, 0, 0, 0))
+ def this(year: Int, monthOfYear: Int) =
+ this(year, monthOfYear, 1)
+
def nextMonths(n: Int): this.type = {
dateTime.addMonths(n)
def toDate: Date = {
dateTime.toDate
}
+
+ def beforeMillis(millis: Long): Boolean = {
+ toMillis < millis
+ }
+
+ def afterMillis(millis: Long): Boolean = {
+ toMillis > millis
+ }
+
+ def beforeEqMillis(millis: Long): Boolean = {
+ toMillis <= millis
+ }
+
+ def afterEqMillis(millis: Long): Boolean = {
+ toMillis >= millis
+ }
override def toString = {
dateTime.toString
--- /dev/null
+version = 0.0.2-SNAPSHOT
+
+# Location of the Aquarium accounting policy config file. If commented
+# out, Aquarium will look for the file policy.yaml first at the program
+# starting directory and then fall back to the classpath.
+aquarium.policy=policy.yaml
+
+### Queue related settings
+
+# Comma separated list of amqp servers to use. The servers must be in an
+# active-active mode.
+amqp.servers=localhost
+
+# Comma separated list of amqp servers to use. The servers must be in an
+# active-active mode.
+amqp.port=5672
+
+# User name for connecting with the AMQP server
+amqp.username=aquarium
+
+# Passwd for connecting with the AMQP server
+amqp.passwd=aquarium
+
+# Virtual host on the AMQP server
+amqp.vhost=/
+
+# REST service listening port
+rest.port=8080
+
+### Message store related settings
+
+# Provider for persistence services.
+# Currently one of: mongo
+persistence.provider=mongodb
+
+# Hostname for the persistence service
+persistence.host=localhost
+
+# Port for connecting to the persistence service
+persistence.port=27017
+
+# Username for connecting to the persistence service
+persistence.username=aquarium
+
+# Password for connecting to the persistence service
+persistence.password=aquarium
+
+### Performance options
+
+# Maximum number of open connections to MongoDB. Has no effect if
+# another driver is in use
+mongo.connection.pool.size=20
+
+#######
+# DO NOT TOUCH the following options, unless you know what you are doing
+#######
+
+# Actor subsystem
+actor.provider.class=gr.grnet.aquarium.actor.SimpleLocalActorProvider
+
+# Class that initializes the REST service
+rest.service.class=gr.grnet.aquarium.rest.actor.RESTActorService
+
+# Store subsystem
+store.provider.class=gr.grnet.aquarium.store.mongodb.MongoDBStoreProvider
+
+# Override the user store (if present, it will not be given by the store provider above)
+user.state.store.class=gr.grnet.aquarium.store.memory.MemUserStateStore
+
+# Override the event store (if present, it will not be given by the store provider above)
+resource.event.store.class=gr.grnet.aquarium.store.memory.MemUserStateStore
+
+# Override the WalletEntry store (if present, it will not be given by the store provider above)
+wallet.entry.store.class=gr.grnet.aquarium.store.memory.MemUserStateStore
+
+# Override the user event store (if present, it will not be given by the store provider above)
+user.event.store.class=gr.grnet.aquarium.store.memory.MemUserStateStore
+
+# Override the user event store (if present, it will not be given by the store provider above)
+policy.store.class=gr.grnet.aquarium.store.memory.MemUserStateStore
+
+# The lower mark for the UserActors' LRU, managed by UserActorManager.
+user.actor.LRU.lower.mark=800
+
+# The upper mark for the UserActors' LRU, managed by UserActorManager.
+user.actors.LRU.upper.mark=1000
+
+# A time period in milliseconds for which we can tolerate stale data regarding user state.
+user.state.timestamp.threshold=10000
+
+# Comma separated list of exchanges known to aquarium
+amqp.exchanges=aquarium
+
+# The name of the DB schema to use
+persistence.db=aquarium
+
+# This is an absolute constant for the lifetime of an Aquarium installation.
+# 1 means that every second counts
+time.unit.in.seconds = 1
\ No newline at end of file
0L,
ActiveSuspendedSnapshot(true, now),
CreditSnapshot(0, now),
- AgreementSnapshot("default", now),
+ AgreementSnapshot(Agreement("default", now, now) :: Nil, now),
RolesSnapshot(Nil, now),
PaymentOrdersSnapshot(Nil, now),
OwnedGroupsSnapshot(Nil, now),