RabbitMQProducer handles Nacks. UserActor accepts user balance additions from Astakos...
authorProdromos Gerakios <pgerakios@grnet.gr>
Tue, 7 Aug 2012 13:55:23 +0000 (16:55 +0300)
committerProdromos Gerakios <pgerakios@grnet.gr>
Tue, 7 Aug 2012 13:55:23 +0000 (16:55 +0300)
src/main/resources/policy.json
src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQProducer.scala
src/main/scala/gr/grnet/aquarium/event/model/im/IMEventModel.scala
src/main/scala/gr/grnet/aquarium/event/model/im/StdIMEvent.scala
src/main/scala/gr/grnet/aquarium/logic/accounting/dsl/Timeslot.scala
src/main/scala/gr/grnet/aquarium/service/FinagleRESTService.scala
src/main/scala/gr/grnet/aquarium/service/RESTPaths.scala
src/main/scala/gr/grnet/aquarium/service/RabbitMQService.scala
src/main/scala/gr/grnet/aquarium/service/StoreWatcherService.scala

index 179e153..bbf452b 100644 (file)
@@ -8,6 +8,11 @@
 
   "resourceTypes":[
     {
+      "name":"addcredits",
+      "unit":"credits",
+      "chargingBehavior":"gr.grnet.aquarium.charging.OnceChargingBehavior"
+    },
+    {
       "name":"bandwidth",
       "unit":"MB/Hr",
       "chargingBehavior":"gr.grnet.aquarium.charging.DiscreteChargingBehavior"
             }
           ]
         }
+        "addcredits":{
+          "priceOverrides":[
+            {
+              "unitPrice":1
+            }
+          ]
+        }
       }
     }
   }
-}
\ No newline at end of file
+}
index 6f974e6..0c259be 100644 (file)
@@ -56,7 +56,7 @@ import gr.grnet.aquarium.charging.state.UserStateBootstrap
 import gr.grnet.aquarium.charging.state.{WorkingAgreementHistory, WorkingUserState, UserStateModel}
 import gr.grnet.aquarium.charging.reason.{InitialUserActorSetup, RealtimeChargingReason}
 import gr.grnet.aquarium.policy.{PolicyDefinedFullPriceTableRef, StdUserAgreement}
-import gr.grnet.aquarium.event.model.resource.ResourceEventModel
+import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
 import message.GetUserBalanceRequest
 import message.GetUserBalanceResponse
 import message.GetUserBalanceResponseData
@@ -298,9 +298,15 @@ class UserActor extends ReflectiveRoleableActor {
         //this._latestIMEventID = imEvent.id
         return
       }
+      if(imEvent.isAddCredits)  {
+        if(!hadUserCreationIMEvent && haveUserCreationIMEvent)
+        loadWorkingUserStateAndUpdateAgreementHistory()
+        onHandleAddCreditsEvent(imEvent)
 
+      } else {
       updateAgreementHistoryFrom(imEvent)
       updateLatestIMEventIDFrom(imEvent)
+      }
     }
 
     // Must also update user state if we know when in history the life of a user begins
@@ -312,6 +318,29 @@ class UserActor extends ReflectiveRoleableActor {
     logSeparator()
   }
 
+  /* Convert astakos message for adding credits
+    to a regular RESOURCE message */
+  def onHandleAddCreditsEvent(imEvent : IMEventModel) = {
+    val credits = -imEvent.details(IMEventModel.DetailsNames.credits).toInt.toDouble
+    val event = new StdResourceEvent(
+      imEvent.id,
+      imEvent.occurredMillis,
+      imEvent.receivedMillis,
+      imEvent.userID,
+      imEvent.clientID,
+      imEvent.eventType,
+      imEvent.eventType,
+      credits,
+      imEvent.eventVersion,
+      imEvent.details
+    )
+    //Console.err.println("Event: " + event)
+    //Console.err.println("Total credits before: " + _workingUserState.totalCredits)
+    onProcessResourceEvent(new ProcessResourceEvent(event))
+    //Console.err.println("Total credits after: " + _workingUserState.totalCredits)
+    //Console.err.println("OK.")
+  }
+
   def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
     val rcEvent = event.rcEvent
 
@@ -401,9 +430,10 @@ class UserActor extends ReflectiveRoleableActor {
     else {
       computeBatch()
     }
-    if(oldTotalCredits * this._workingUserState.totalCredits < 0)
+    var newTotal = this._workingUserState.totalCredits
+    if(oldTotalCredits * newTotal < 0)
       aquarium.eventBus ! new BalanceEvent(this._workingUserState.userID,
-                                           this._workingUserState.totalCredits>=0)
+                                           newTotal>=0)
     DEBUG("Updated %s", this._workingUserState)
     logSeparator()
   }
index abc4d62..7f911c8 100644 (file)
@@ -1,21 +1,17 @@
 package gr.grnet.aquarium.connector.rabbitmq
 
-import conf.RabbitMQConsumerConf
-import conf.RabbitMQConsumerConf
 import conf.{RabbitMQKeys, RabbitMQConsumerConf}
 import conf.RabbitMQKeys.{RabbitMQConfKeys, RabbitMQConKeys}
 import gr.grnet.aquarium._
 import com.rabbitmq.client._
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
 import com.ckkloverdos.props.Props
 import gr.grnet.aquarium.converter.StdConverters
-import gr.grnet.aquarium.util.{Lock, Tags}
+import gr.grnet.aquarium.util.Lock
 import gr.grnet.aquarium.store.memory.MemStoreProvider
 import java.io.File
 import com.ckkloverdos.resource.FileStreamResource
 import scala.Some
-import collection.immutable.{TreeMap, SortedSet, TreeSet}
-import java.util.Collections
+import collection.immutable.{TreeMap, TreeSet}
 
 
 /*
@@ -166,7 +162,7 @@ object RabbitMQProducer  {
     build()
     aquarium(Aquarium.EnvKeys.rabbitMQProducer).sendMessage("Test string !!!!")
     Console.err.println("Message sent")
-    aquarium.stop()
+    //aquarium.start()
     ()
   }
 }
\ No newline at end of file
index 20ac755..afcf43f 100644 (file)
@@ -64,6 +64,8 @@ trait IMEventModel extends ExternalEventModel {
 
   def isModifyUser = eventType.equalsIgnoreCase(IMEventModel.EventTypeNames.modify)
 
+  def isAddCredits = eventType.equalsIgnoreCase(IMEventModel.EventTypeNames.addcredits)
+
   def userCreationMillisOption = if(isCreateUser) Some(this.occurredMillis) else None
 
   override def toDebugString = {
@@ -92,7 +94,14 @@ object IMEventModel {
   trait EventTypeNamesT {
     final val create = "create"
     final val modify = "modify"
+    final val addcredits = "addcredits"
   }
 
   object EventTypeNames extends EventTypeNamesT
+
+  trait DetailsNamesT {
+    final val credits = "credits"
+  }
+  object DetailsNames extends DetailsNamesT
+
 }
index 1948d54..754c3ab 100644 (file)
@@ -165,7 +165,7 @@ final case class Timeslot(from: Date, to: Date) extends Ordered[Timeslot] {
    * contained in the timeslot are trimmed to this timeslot's
    * start and end time.
    */
-  def align(l: List[Timeslot]): List[Timeslot] = {
+  private[this] def align0(l: List[Timeslot]): List[Timeslot] = {
     if (l.isEmpty) return List()
 
     val result : Option[Timeslot] =
@@ -181,6 +181,10 @@ final case class Timeslot(from: Date, to: Date) extends Ordered[Timeslot] {
     }
   }
 
+  def align(l: List[Timeslot]): List[Timeslot] = {
+    Timeslot.mergeOverlaps(align0(l))
+  }
+
   /* align a time slot in "bound_size" boundaries so that
    * start0 <= start and end0 >= end */
   def align(bound_size : Long) : Timeslot = {
index 0273a6a..e6e4f4f 100644 (file)
@@ -236,12 +236,21 @@ class FinagleRESTService extends Lifecycle with AquariumAwareSkeleton with Confi
       logger.debug("%s %s %s".format(method, request.getProtocolVersion, uri))
 
       type URIPF = PartialFunction[String, TFuture[THttpResponse]]
-
+      def pong(ok:Boolean) = {
+        val now = TimeHelpers.nowMillis()
+        val nowFormatted = ISODateTimeFormat.dateTime().print(now)
+        val reply = if(ok) "PONG" else "DOWN"
+        stringResponseOK("%s\n%s\n%s".format(reply,now, nowFormatted), TEXT_PLAIN)
+      }
       val PingHandler: URIPF = {
-        case RESTPaths.PingPath() ⇒
-          val now = TimeHelpers.nowMillis()
-          val nowFormatted = ISODateTimeFormat.dateTime().print(now)
-          stringResponseOK("PONG\n%s\n%s".format(now, nowFormatted), TEXT_PLAIN)
+        case RESTPaths.AquariumPingPath() ⇒
+          pong(true)
+        case RESTPaths.RabbitMQPingPath() ⇒
+          pong(aquarium(Aquarium.EnvKeys.rabbitMQService).areConsumersLive)
+        case RESTPaths.IMStorePingPath() ⇒
+          pong(aquarium(Aquarium.EnvKeys.storeWatcherService).isIMAlive)
+        case RESTPaths.RCStorePingPath() ⇒
+          pong(aquarium(Aquarium.EnvKeys.storeWatcherService).isRCAlive)
       }
 
       val UserActorCacheHandler: URIPF = {
index 6fe8202..867ef8a 100644 (file)
@@ -43,7 +43,10 @@ import gr.grnet.aquarium.ResourceLocator
  * @author Christos KK Loverdos <loverdos@gmail.com>.
  */
 object RESTPaths {
-  final val PingPath = "/ping".r
+  final val AquariumPingPath = "/ping/aquarium".r
+  final val RabbitMQPingPath = "/ping/rabbitmq".r
+  final val IMStorePingPath = "/ping/imstore".r
+  final val RCStorePingPath = "/ping/rcstore".r
 
   final val AdminPrefix = "/admin"
 
index 256ee21..9bd9fd9 100644 (file)
@@ -187,6 +187,13 @@ class RabbitMQService extends Loggable with Lifecycle with Configurable with Aqu
     }
   }
 
+  def areConsumersLive() : Boolean = {
+    for(consumer ← this._consumers)
+      if(!consumer.isAlive())  return false
+    return true
+  }
+
+
   def stop() = {
     safeStop()
   }
index a5bc21f..1d23e32 100644 (file)
@@ -58,6 +58,9 @@ final class StoreWatcherService extends Lifecycle with Configurable with Aquariu
 
   def propertyPrefix = Some(StoreProvider.Prefix)
 
+  def isRCAlive() : Boolean = _rcIsAlive.get
+  def isIMAlive() : Boolean = _imIsAlive.get
+
   /**
    * Configure this instance with the provided properties.
    *