RabbitMQ producer sends user balance >= 0 to astakos
authorProdromos Gerakios <pgerakios@grnet.gr>
Fri, 27 Jul 2012 10:12:27 +0000 (13:12 +0300)
committerProdromos Gerakios <pgerakios@grnet.gr>
Fri, 27 Jul 2012 10:12:27 +0000 (13:12 +0300)
src/main/scala/gr/grnet/aquarium/Aquarium.scala
src/main/scala/gr/grnet/aquarium/AquariumBuilder.scala
src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQProducer.scala

index e61a7ab..d078e76 100644 (file)
@@ -38,6 +38,7 @@ package gr.grnet.aquarium
 import com.ckkloverdos.env.Env
 import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey}
 import com.ckkloverdos.props.Props
+import connector.rabbitmq.RabbitMQProducer
 import gr.grnet.aquarium.store.{PolicyStore, StoreProvider}
 import java.io.File
 import gr.grnet.aquarium.util.{Loggable, Lifecycle}
@@ -471,6 +472,9 @@ object Aquarium {
     final val rabbitMQService: TypedKey[RabbitMQService] =
       new AquariumEnvKey[RabbitMQService]("rabbitmq.service")
 
+    final val rabbitMQProducer: TypedKey[RabbitMQProducer] =
+      new AquariumEnvKey[RabbitMQProducer]("rabbitmq.client")
+
     final val storeWatcherService: TypedKey[StoreWatcherService] =
       new AquariumEnvKey[StoreWatcherService]("store.watcher.service")
 
index ede9a58..42305b3 100644 (file)
@@ -357,6 +357,8 @@ final class AquariumBuilder(
 
     checkNoPropsOverride(EnvKeys.rabbitMQService) { newInstanceFromKey(_) }
 
+    checkNoPropsOverride(EnvKeys.rabbitMQProducer) { newInstanceFromKey(_) }
+
     checkNoPropsOverride(EnvKeys.storeWatcherService) { newInstanceFromKey(_) }
 
     checkPropsOverride(EnvKeys.userStateTimestampThreshold) { (envKey, propValue) ⇒
index 55e13e4..f933964 100644 (file)
@@ -45,7 +45,7 @@ import gr.grnet.aquarium.util.date.TimeHelpers
 import gr.grnet.aquarium.event.model.im.IMEventModel
 import gr.grnet.aquarium.actor.message.{GetUserWalletResponseData, GetUserWalletResponse, GetUserWalletRequest, GetUserStateResponse, GetUserBalanceResponseData, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
 import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
-import gr.grnet.aquarium.AquariumInternalError
+import gr.grnet.aquarium.{Aquarium, AquariumInternalError}
 import gr.grnet.aquarium.computation.BillingMonthInfo
 import gr.grnet.aquarium.charging.state.UserStateBootstrap
 import gr.grnet.aquarium.charging.state.{WorkingAgreementHistory, WorkingUserState, UserStateModel}
@@ -380,7 +380,10 @@ class UserActor extends ReflectiveRoleableActor {
     else {
       computeBatch()
     }
-
+    aquarium(Aquarium.EnvKeys.rabbitMQProducer).
+    sendMessage("{userid: \"%s\", state: %s}".
+                  format(this._userID,
+                  this._workingUserState.totalCredits >= 0.0))
     DEBUG("Updated %s", this._workingUserState)
     logSeparator()
   }
index 8e95f8f..11cff75 100644 (file)
@@ -1,13 +1,14 @@
 package gr.grnet.aquarium.connector.rabbitmq
 
+import conf.RabbitMQConsumerConf
 import conf.{RabbitMQKeys, RabbitMQConsumerConf}
 import conf.RabbitMQKeys.{RabbitMQConfKeys, RabbitMQConKeys}
-import gr.grnet.aquarium.{ResourceLocator, AquariumBuilder, Aquarium}
-import com.rabbitmq.client.{MessageProperties, Channel, Connection, ConnectionFactory}
+import gr.grnet.aquarium.{Configurable, ResourceLocator, AquariumBuilder, Aquarium}
+import com.rabbitmq.client._
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
 import com.ckkloverdos.props.Props
 import gr.grnet.aquarium.converter.StdConverters
-import gr.grnet.aquarium.util.Tags
+import gr.grnet.aquarium.util.{Lock, Tags}
 import gr.grnet.aquarium.store.memory.MemStoreProvider
 import java.io.File
 import com.ckkloverdos.resource.FileStreamResource
@@ -48,14 +49,24 @@ import com.ckkloverdos.resource.FileStreamResource
  * or implied, of GRNET S.A.
  */
 
-class RabbitMQProducer(val aquarium: Aquarium) {
-  lazy val conf: RabbitMQConsumerConf = {
-    var props = aquarium(Aquarium.EnvKeys.originalProps)
+class RabbitMQProducer extends Configurable {
+  private[this] var _conf: RabbitMQConsumerConf = _
+  private[this] var _factory: ConnectionFactory = _
+  private[this] var _connection: Connection = _
+  private[this] var _channel: Channel = _
+  private[this] var _servers : Array[Address] = _
+  private[this] final val lock = new Lock()
+
+  def propertyPrefix: Option[String] = Some(RabbitMQKeys.PropertiesPrefix)
+  //  Some(RabbitMQConfKeys.imevents_credit)
+
+
+  def configure(props: Props): Unit = {
     var prop = props.get(RabbitMQConfKeys.imevents_credit).getOr("")
-    Console.println("Prop: " + prop)
+    // Console.println("Prop: " + prop)
     val Array(exchange, routing) = prop.split(":")
     //Console.println("ex: " + exchange + " routing: " + routing)
-    val conf = RabbitMQConsumerConf(
+     _conf = RabbitMQConsumerConf(
       tag = Tags.IMEventTag,
       exchangeName = exchange,
       routingKey = routing,
@@ -65,39 +76,25 @@ class RabbitMQProducer(val aquarium: Aquarium) {
       channelConf = RabbitMQKeys.DefaultChannelConf,
       queueConf = RabbitMQKeys.DefaultQueueConf
     )
-    conf
-  }
-  private[this] var _factory: ConnectionFactory = {
-    val factory = new ConnectionFactory
-    factory.setConnectionTimeout(conf.connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
-    factory.setUsername(conf.connectionConf(RabbitMQConKeys.username))
-    factory.setPassword(conf.connectionConf(RabbitMQConKeys.password))
-    factory.setVirtualHost(conf.connectionConf(RabbitMQConKeys.vhost))
-    factory.setRequestedHeartbeat(conf.connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
-    factory
-  }
-
-  private[this] var _connection: Connection = _
-  private[this] var _channel: Channel = _
-  //private[this] val _state = new AtomicReference[State](Shutdown)
-  private[this] val _pingIsScheduled = new AtomicBoolean(false)
-
-  private[this] lazy val servers = {
-    val s = conf.connectionConf(RabbitMQConKeys.servers)
-    for { s1 <- s }  Console.err.println("Servers: " + s1.toString)
-    s
+    _factory = new ConnectionFactory
+    _factory.setConnectionTimeout(_conf.connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
+    _factory.setUsername(_conf.connectionConf(RabbitMQConKeys.username))
+    _factory.setPassword(_conf.connectionConf(RabbitMQConKeys.password))
+    _factory.setVirtualHost(_conf.connectionConf(RabbitMQConKeys.vhost))
+    _factory.setRequestedHeartbeat(_conf.connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
+    _servers = _conf.connectionConf(RabbitMQConKeys.servers)
   }
 
   private[this] def withChannel[A]( next : => A) = {
     try {
-      var connection : Connection =  null
-      var channel : Channel = null
+      lock.withLock {
       if (_connection == null ||_connection.isOpen == false )
-         _connection =_factory.newConnection(servers)
+         _connection =_factory.newConnection(_servers)
       if (_channel == null ||_channel.isOpen == false )
         _channel = _connection.createChannel
-      assert(_connection.isOpen && _channel.isOpen)
-      next
+        assert(_connection.isOpen && _channel.isOpen)
+        next
+     }
     } catch {
         case e: Exception =>
           e.printStackTrace
@@ -106,7 +103,7 @@ class RabbitMQProducer(val aquarium: Aquarium) {
 
   def sendMessage(payload:String) =
     withChannel {
-      _channel.basicPublish(conf.exchangeName, conf.routingKey,
+      _channel.basicPublish(_conf.exchangeName, _conf.routingKey,
         MessageProperties.PERSISTENT_TEXT_PLAIN,
         payload.getBytes)
     }
@@ -122,7 +119,10 @@ object RabbitMQProducer {
 
 
   def main(args: Array[String]) = {
-    new RabbitMQProducer(aquarium).sendMessage("{userid: \"pgerakios@grnet.gr\", state:true}")
+    aquarium(Aquarium.EnvKeys.rabbitMQProducer).
+    sendMessage("{userid: \"pgerakios@grnet.gr\", state:true}")
+    Console.err.println("Message sent")
+    aquarium.stop()
     ()
   }
 }
\ No newline at end of file