Merge branch 'master' into 1852_billing_period_calc
authorChristos KK Loverdos <loverdos@gmail.com>
Tue, 31 Jan 2012 14:39:03 +0000 (16:39 +0200)
committerChristos KK Loverdos <loverdos@gmail.com>
Tue, 31 Jan 2012 14:39:03 +0000 (16:39 +0200)
Conflicts:
src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala
src/main/scala/gr/grnet/aquarium/user/UserStateComputations.scala
src/main/scala/gr/grnet/aquarium/user/actor/UserActor.scala

src/main/scala/gr/grnet/aquarium/Configurator.scala
src/main/scala/gr/grnet/aquarium/logic/accounting/Policy.scala
src/main/scala/gr/grnet/aquarium/logic/events/WalletEntry.scala
src/main/scala/gr/grnet/aquarium/store/PolicyStore.scala
src/main/scala/gr/grnet/aquarium/store/UserEventStore.scala
src/main/scala/gr/grnet/aquarium/store/UserStateStore.scala
src/main/scala/gr/grnet/aquarium/store/memory/MemUserStateStore.scala
src/main/scala/gr/grnet/aquarium/user/UserDataSnapshot.scala
src/main/scala/gr/grnet/aquarium/util/date/DateCalculator.scala
src/test/resources/aquarium.properties [new file with mode: 0644]
src/test/scala/gr/grnet/aquarium/user/UserActorTest.scala

index 9fa1319..74d4849 100644 (file)
@@ -234,7 +234,7 @@ object Configurator {
   /**
    * The venerable /etc resource context
    */
-  val SlashEtcResourceContext = new FileStreamResourceContext("/etc")
+  val SlashEtcResourceContext = new FileStreamResourceContext("/etc/aquarium")
 
   /**
    * Class loader resource context.
index ac5322e..bc3fdba 100644 (file)
@@ -43,6 +43,7 @@ import com.ckkloverdos.maybe.{Maybe, Just}
 import gr.grnet.aquarium.util.date.TimeHelpers
 import gr.grnet.aquarium.logic.events.PolicyEntry
 import gr.grnet.aquarium.util.{CryptoUtils, Loggable}
+import java.util.concurrent.atomic.AtomicReference
 
 /**
  * Searches for and loads the applicable accounting policy
@@ -50,15 +51,25 @@ import gr.grnet.aquarium.util.{CryptoUtils, Loggable}
  * @author Georgios Gousios <gousiosg@gmail.com>
  */
 object Policy extends DSL with Loggable {
-  
-  private var policies = {reloadPolicies}
-  
-  lazy val policy = loadPolicyFromFile(policyFile)
 
+  /* Pointer to the latest policy */
+  private lazy val policies = new AtomicReference[Map[Timeslot, DSLPolicy]](reloadPolicies)
+
+  /* Pointer to the latest policy */
+  private val currentPolicy = new AtomicReference[DSLPolicy](policies.get.last._2)
+
+  /**
+   * Get the latest defined policy.
+   */
+  def policy = currentPolicy.get
+
+  /**
+   * Get the policy that is valid at the specified time instance.
+   */
   def policy(at: Date): Maybe[DSLPolicy] = Maybe {
-    policies.find {
-      a => a._1.from.before(at) &&
-           a._1.to.after(at)
+    policies.get.find {
+      a => (a._1.from.before(at) && a._1.to.after(at)) ||
+           (a._1.from.before(at) && a._1.to == -1)
     } match {
       case Some(x) => x._2
       case None =>
@@ -66,15 +77,25 @@ object Policy extends DSL with Loggable {
     }
   }
 
+  /**
+   * Get the policies that are valid between the specified time instances
+   */
   def policies(from: Date, to: Date): List[DSLPolicy] = {
-    policies.filter {
+    policies.get.filter {
       a => a._1.from.before(from) &&
            a._1.to.after(to)
     }.valuesIterator.toList
   }
-  
+
+  /**
+   * Get the policies that are valid throughout the specified
+   * [[gr.grnet.aquarium.logic.accounting.dsl.Timeslot]]
+   */
   def policies(t: Timeslot): List[DSLPolicy] = policies(t.from, t.to)
 
+  /**
+   * Load and parse a policy from file.
+   */
   def loadPolicyFromFile(pol: File): DSLPolicy = {
 
     val stream = pol.exists() match {
@@ -96,13 +117,31 @@ object Policy extends DSL with Loggable {
     parse(stream)
   }
 
+  /**
+   * Trigger a policy update cycle.
+   */
+  def updatePolicies = synchronized {
+    //XXX: The following update should happen as one transaction
+    val tmpPol = reloadPolicies
+    currentPolicy.set(tmpPol.last._2)
+    policies.set(tmpPol)
+  }
+
+  /**
+   * Search for and open a stream to a policy.
+   */
   private def policyFile: File =
     MasterConfigurator.props.get(Keys.aquarium_policy) match {
       case Just(x) => new File(x)
       case _ => new File("/etc/aquarium/policy.yaml")
     }
 
-  private def reloadPolicies(): Map[Timeslot, DSLPolicy] = synchronized {
+  /**
+   * Check whether the policy definition file (in whichever path) is
+   * newer than the latest stored policy, reload and set it as current.
+   * This method has side-effects to this object's state.
+   */
+  private def reloadPolicies: Map[Timeslot, DSLPolicy] = {
     //1. Load policies from db
     val policies = MasterConfigurator.policyEventStore.loadPolicies(0)
 
index e75f637..be0a4a1 100644 (file)
@@ -36,6 +36,7 @@
 package gr.grnet.aquarium.logic.events
 
 import gr.grnet.aquarium.util.json.JsonHelpers
+import java.util.Date
 
 /**
  * A WalletEntry is a derived entity. Its data represent money/credits and are calculated based on
@@ -70,6 +71,9 @@ case class WalletEntry(
   }
 
   def copyWithReceivedMillis(millis: Long) = copy(receivedMillis = millis)
+
+  def occurredDate = new Date(occurredMillis)
+  def receivedDate = new Date(receivedMillis)
 }
 
 object WalletEntry {
index 8e4b157..3b13156 100644 (file)
@@ -59,5 +59,5 @@ trait PolicyStore {
   /**
    * Updates the policy record whose id
    */
-  def updatePolicy(policy: PolicyEntry)
+  def updatePolicy(policy: PolicyEntry): Unit
 }
\ No newline at end of file
index 6a31a3c..7a9bc1a 100644 (file)
@@ -1,20 +1,27 @@
 package gr.grnet.aquarium.store
 
 import com.ckkloverdos.maybe.Maybe
-import gr.grnet.aquarium.logic.events.{ResourceEvent, UserEvent}
+import gr.grnet.aquarium.logic.events.{UserEvent}
 
 /**
- * * An abstraction for Aquarium user event stores.
+ * Store for external user events
  *
  * @author Georgios Gousios <gousiosg@gmail.com>
  */
 trait UserEventStore {
 
+  /**
+   * Store an event
+   */
   def storeUserEvent(event: UserEvent): Maybe[RecordID]
 
-
+  /**
+   * Find a user event by event ID
+   */
   def findUserEventById(id: String): Maybe[UserEvent]
 
-
+  /**
+   * Find all user events by user ID
+   */
   def findUserEventsByUserId(userId: String): List[UserEvent]
 }
\ No newline at end of file
index a9cbe14..10b6150 100644 (file)
@@ -39,7 +39,7 @@ import gr.grnet.aquarium.user.UserState
 import com.ckkloverdos.maybe.Maybe
 
 /**
- * A store for user state.
+ * A store for user state snapshots.
  *
  * This is used to hold snapshots of [[gr.grnet.aquarium.user.UserState]]
  *
@@ -47,7 +47,19 @@ import com.ckkloverdos.maybe.Maybe
  */
 
 trait UserStateStore {
+
+  /**
+   * Store a user state
+   */
   def storeUserState(userState: UserState): Maybe[RecordID]
+
+  /**
+   * Find a state by user ID
+   */
   def findUserStateByUserId(userId: String): Maybe[UserState]
+
+  /**
+   * Delete a state for a user
+   */
   def deleteUserState(userId: String): Unit
 }
\ No newline at end of file
index 778b3d6..92137f4 100644 (file)
@@ -38,20 +38,33 @@ package gr.grnet.aquarium.store.memory
 import gr.grnet.aquarium.user.UserState
 import gr.grnet.aquarium.Configurable
 import com.ckkloverdos.props.Props
-import gr.grnet.aquarium.store.{RecordID, UserStateStore}
 import com.ckkloverdos.maybe.{NoVal, Just, Maybe}
+import gr.grnet.aquarium.store._
+import scala.collection.JavaConversions._
+import java.util.Date
+import collection.mutable.ConcurrentMap
+import gr.grnet.aquarium.logic.events.{WalletEntry, ResourceEvent, UserEvent, PolicyEntry}
+import java.util.concurrent.ConcurrentHashMap
+import gr.grnet.aquarium.util.date.DateCalculator
 
 /**
- * A user store backed by main memory.
- *
- * The IDs returned are the original user IDs.
+ * An implementation of various stores that persists data in memory
  * 
  * @author Christos KK Loverdos <loverdos@gmail.com>
+ * @author Georgios Gousios <gousiosg@gmail.com>
  */
 
-class MemUserStateStore extends UserStateStore with Configurable {
-  private[this] val userStateByUserId = new java.util.concurrent.ConcurrentHashMap[String, Just[UserState]]()
-  
+class MemUserStateStore extends UserStateStore
+  with Configurable with PolicyStore
+  with ResourceEventStore with UserEventStore
+  with WalletEntryStore {
+
+  private[this] val userStateByUserId = new ConcurrentHashMap[String, Just[UserState]]()
+  private val policyById: ConcurrentMap[String, PolicyEntry] = new ConcurrentHashMap[String, PolicyEntry]()
+  private[this] val walletEntriesById: ConcurrentMap[String, WalletEntry] = new ConcurrentHashMap[String, WalletEntry]()
+  private val userEventById: ConcurrentMap[String, UserEvent] = new ConcurrentHashMap[String, UserEvent]()
+  private[this] val resourceEventsById: ConcurrentMap[String, ResourceEvent] = new ConcurrentHashMap[String, ResourceEvent]()
+
   def configure(props: Props) = {
   }
 
@@ -73,4 +86,125 @@ class MemUserStateStore extends UserStateStore with Configurable {
     if (userStateByUserId.containsKey(userId))
       userStateByUserId.remove(userId)
   }
+
+  //- WalletEntryStore
+  def storeWalletEntry(entry: WalletEntry): Maybe[RecordID] = {
+    walletEntriesById.put(entry.id, entry)
+    Just(RecordID(entry.id))
+  }
+
+  def findWalletEntryById(id: String): Maybe[WalletEntry] = {
+    Maybe(walletEntriesById.apply(id))
+  }
+
+  def findUserWalletEntries(userId: String): List[WalletEntry] = {
+    walletEntriesById.valuesIterator.filter(_.userId == userId).toList
+  }
+
+  def findUserWalletEntriesFromTo(userId: String, from: Date, to: Date): List[WalletEntry] = {
+    walletEntriesById.valuesIterator.filter { we ⇒
+      val receivedDate = we.receivedDate
+
+      we.userId == userId &&
+      ( (from before receivedDate) || (from == receivedDate) ) &&
+      ( (to   after  receivedDate) || (to   == receivedDate) )
+      true
+    }.toList
+  }
+
+  def findLatestUserWalletEntries(userId: String): Maybe[List[WalletEntry]] = NoVal
+
+  def findPreviousEntry(userId: String,
+                        resource: String,
+                        instanceId: String,
+                        finalized: Option[Boolean]): List[WalletEntry] = Nil
+
+  def findWalletEntriesAfter(userId: String, from: Date): List[WalletEntry] = {
+    walletEntriesById.valuesIterator.filter { we ⇒
+      val occurredDate = we.occurredDate
+
+      we.userId == userId &&
+            ( (from before occurredDate) || (from == occurredDate) )
+    }.toList
+  }
+  //- WalletEntryStore
+
+  //+ ResourceEventStore
+  def storeResourceEvent(event: ResourceEvent) = {
+    resourceEventsById(event.id) = event
+    Just(RecordID(event.id))
+  }
+
+  def findResourceEventById(id: String) = {
+    Maybe(resourceEventsById(id))
+  }
+
+  def findResourceEventsByUserId(userId: String)
+                                (sortWith: Option[(ResourceEvent, ResourceEvent) => Boolean]): List[ResourceEvent] = {
+    val byUserId = resourceEventsById.valuesIterator.filter(_.userId == userId).toArray
+    val sorted = sortWith match {
+      case Some(sorter) ⇒
+        byUserId.sortWith(sorter)
+      case None ⇒
+        byUserId
+    }
+
+    sorted.toList
+  }
+
+  def findResourceEventsByUserIdAfterTimestamp(userId: String, timestamp: Long): List[ResourceEvent] = {
+    resourceEventsById.valuesIterator.filter { ev ⇒
+      ev.userId == userId &&
+      (ev.occurredMillis > timestamp)
+    }.toList
+  }
+
+  def findResourceEventHistory(userId: String,
+                               resName: String,
+                               instid: Option[String],
+                               upTo: Long): List[ResourceEvent] = {
+    Nil
+  }
+
+  def findResourceEventsForReceivedPeriod(userId: String,
+                                          startTimeMillis: Long,
+                                          stopTimeMillis: Long): List[ResourceEvent] = {
+    resourceEventsById.valuesIterator.filter { ev ⇒
+      ev.userId == userId &&
+      ev.receivedMillis >= startTimeMillis &&
+      ev.receivedMillis <= stopTimeMillis
+    }.toList
+  }
+
+  def countOutOfSyncEventsForBillingMonth(userId: String, yearOfBillingMonth: Int, billingMonth: Int): Maybe[Long] = Maybe {
+    val billingMonthDate = new DateCalculator(yearOfBillingMonth, billingMonth)
+    val billingDateStart = billingMonthDate
+    val billingDateEnd = billingDateStart.endOfThisMonth
+    resourceEventsById.valuesIterator.filter { case ev ⇒
+      // out of sync events are those that were received in the billing month but occurred in previous months
+      val receivedMillis = ev.receivedMillis
+      val occurredMillis = ev.occurredMillis
+
+      billingDateStart.afterEqMillis(receivedMillis) && // the events that...
+      billingDateEnd.beforeEqMillis (receivedMillis) && // ...were received withing the billing month
+      (                                                 //
+        billingDateStart.afterMillis(occurredMillis)    // but occurred before the billing period
+        )
+    }.size.toLong
+  }
+  //- ResourceEventStore
+
+  def storeUserEvent(event: UserEvent) = {userEventById += (event.id -> event); Just(RecordID(event.id))}
+
+  def findUserEventById(id: String) = Maybe{userEventById.getOrElse(id, null)}
+
+  def findUserEventsByUserId(userId: String) = userEventById.values.filter{v => v.userId == userId}.toList
+
+  def loadPolicies(after: Long) = policyById.values.foldLeft(List[PolicyEntry]()){
+    (acc, v) => if(v.validFrom > after) v :: acc else acc
+  }
+
+  def storePolicy(policy: PolicyEntry) = {policyById += (policy.id -> policy); Just(RecordID(policy.id))}
+
+  def updatePolicy(policy: PolicyEntry) = storePolicy(policy)
 }
\ No newline at end of file
index 16dd036..598910c 100644 (file)
@@ -90,8 +90,11 @@ case class AgreementSnapshot(data: List[Agreement], snapshotTime: Long)
   ensureNoGaps(data.sortWith((a,b) => if (b.validFrom > a.validFrom) true else false))
 
   def ensureNoGaps(agreements: List[Agreement]): Unit = agreements match {
-    case ha :: (t @ (hb :: tail)) => assert(ha.validTo - hb.validFrom == 1); ensureNoGaps(t)
-    case h :: Nil => assert(h.validTo == -1)
+    case ha :: (t @ (hb :: tail)) =>
+      assert(ha.validTo - hb.validFrom == 1);
+      ensureNoGaps(t)
+    case h :: Nil =>
+      assert(h.validTo == -1)
     case Nil => ()
   }
 
index d560295..cc61670 100644 (file)
@@ -20,6 +20,9 @@ class DateCalculator private(private[this] var dateTime: MutableDateTime) {
   def this(year: Int, monthOfYear: Int, dayOfMonth: Int) =
     this(new MutableDateTime(year, monthOfYear, dayOfMonth, 0, 0, 0, 0))
 
+  def this(year: Int, monthOfYear: Int) =
+    this(year, monthOfYear, 1)
+
   def nextMonths(n: Int): this.type = {
     dateTime.addMonths(n)
 
@@ -156,6 +159,22 @@ class DateCalculator private(private[this] var dateTime: MutableDateTime) {
   def toDate: Date = {
     dateTime.toDate
   }
+  
+  def beforeMillis(millis: Long): Boolean = {
+    toMillis < millis
+  }
+
+  def afterMillis(millis: Long): Boolean = {
+    toMillis > millis
+  }
+
+  def beforeEqMillis(millis: Long): Boolean = {
+    toMillis <= millis
+  }
+
+  def afterEqMillis(millis: Long): Boolean = {
+    toMillis >= millis
+  }
 
   override def toString = {
     dateTime.toString
diff --git a/src/test/resources/aquarium.properties b/src/test/resources/aquarium.properties
new file mode 100644 (file)
index 0000000..fd27664
--- /dev/null
@@ -0,0 +1,99 @@
+version = 0.0.2-SNAPSHOT
+
+# Location of the Aquarium accounting policy config file. If commented
+# out, Aquarium will look for the file policy.yaml first at the program
+# starting directory and then fall back to the classpath.
+aquarium.policy=policy.yaml
+
+### Queue related settings
+
+# Comma separated list of amqp servers to use. The servers must be in an
+# active-active mode.
+amqp.servers=localhost
+
+# Comma separated list of amqp servers to use. The servers must be in an
+# active-active mode.
+amqp.port=5672
+
+# User name for connecting with the AMQP server
+amqp.username=aquarium
+
+# Passwd for connecting with the AMQP server
+amqp.passwd=aquarium
+
+# Virtual host on the AMQP server
+amqp.vhost=/
+
+# REST service listening port
+rest.port=8080
+
+### Message store related settings
+
+# Provider for persistence services.
+# Currently one of: mongo
+persistence.provider=mongodb
+
+# Hostname for the persistence service
+persistence.host=localhost
+
+# Port for connecting to the persistence service
+persistence.port=27017
+
+# Username for connecting to the persistence service
+persistence.username=aquarium
+
+# Password for connecting to the persistence service
+persistence.password=aquarium
+
+### Performance options
+
+# Maximum number of open connections to MongoDB. Has no effect if
+# another driver is in use
+mongo.connection.pool.size=20
+
+#######
+# DO NOT TOUCH the following options, unless you know what you are doing
+#######
+
+# Actor subsystem
+actor.provider.class=gr.grnet.aquarium.actor.SimpleLocalActorProvider
+
+# Class that initializes the REST service
+rest.service.class=gr.grnet.aquarium.rest.actor.RESTActorService
+
+# Store subsystem
+store.provider.class=gr.grnet.aquarium.store.mongodb.MongoDBStoreProvider
+
+# Override the user store (if present, it will not be given by the store provider above)
+user.state.store.class=gr.grnet.aquarium.store.memory.MemUserStateStore
+
+# Override the event store (if present, it will not be given by the store provider above)
+resource.event.store.class=gr.grnet.aquarium.store.memory.MemUserStateStore
+
+# Override the WalletEntry store (if present, it will not be given by the store provider above)
+wallet.entry.store.class=gr.grnet.aquarium.store.memory.MemUserStateStore
+
+# Override the user event store (if present, it will not be given by the store provider above)
+user.event.store.class=gr.grnet.aquarium.store.memory.MemUserStateStore
+
+# Override the user event store (if present, it will not be given by the store provider above)
+policy.store.class=gr.grnet.aquarium.store.memory.MemUserStateStore
+
+# The lower mark for the UserActors' LRU, managed by UserActorManager.
+user.actor.LRU.lower.mark=800
+
+# The upper mark for the UserActors' LRU, managed by UserActorManager.
+user.actors.LRU.upper.mark=1000
+
+# A time period in milliseconds for which we can tolerate stale data regarding user state.
+user.state.timestamp.threshold=10000
+
+# Comma separated list of exchanges known to aquarium
+amqp.exchanges=aquarium
+
+# The name of the DB schema to use
+persistence.db=aquarium
+
+# This is an absolute constant for the lifetime of an Aquarium installation.
+# 1 means that every second counts
+time.unit.in.seconds = 1
\ No newline at end of file
index 2a9e716..a2707ab 100644 (file)
@@ -21,7 +21,7 @@ class UserActorTest {
       0L,
       ActiveSuspendedSnapshot(true, now),
       CreditSnapshot(0, now),
-      AgreementSnapshot("default", now),
+      AgreementSnapshot(Agreement("default", now, now) :: Nil, now),
       RolesSnapshot(Nil, now),
       PaymentOrdersSnapshot(Nil, now),
       OwnedGroupsSnapshot(Nil, now),