From: Prodromos Gerakios Date: Fri, 27 Jul 2012 10:12:27 +0000 (+0300) Subject: RabbitMQ producer sends user balance >= 0 to astakos X-Git-Url: https://code.grnet.gr/git/aquarium/commitdiff_plain/7595d96387a172f8a2830bb4317e6388b3be455c RabbitMQ producer sends user balance >= 0 to astakos --- diff --git a/src/main/scala/gr/grnet/aquarium/Aquarium.scala b/src/main/scala/gr/grnet/aquarium/Aquarium.scala index e61a7ab..d078e76 100644 --- a/src/main/scala/gr/grnet/aquarium/Aquarium.scala +++ b/src/main/scala/gr/grnet/aquarium/Aquarium.scala @@ -38,6 +38,7 @@ package gr.grnet.aquarium import com.ckkloverdos.env.Env import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey} import com.ckkloverdos.props.Props +import connector.rabbitmq.RabbitMQProducer import gr.grnet.aquarium.store.{PolicyStore, StoreProvider} import java.io.File import gr.grnet.aquarium.util.{Loggable, Lifecycle} @@ -471,6 +472,9 @@ object Aquarium { final val rabbitMQService: TypedKey[RabbitMQService] = new AquariumEnvKey[RabbitMQService]("rabbitmq.service") + final val rabbitMQProducer: TypedKey[RabbitMQProducer] = + new AquariumEnvKey[RabbitMQProducer]("rabbitmq.client") + final val storeWatcherService: TypedKey[StoreWatcherService] = new AquariumEnvKey[StoreWatcherService]("store.watcher.service") diff --git a/src/main/scala/gr/grnet/aquarium/AquariumBuilder.scala b/src/main/scala/gr/grnet/aquarium/AquariumBuilder.scala index ede9a58..42305b3 100644 --- a/src/main/scala/gr/grnet/aquarium/AquariumBuilder.scala +++ b/src/main/scala/gr/grnet/aquarium/AquariumBuilder.scala @@ -357,6 +357,8 @@ final class AquariumBuilder( checkNoPropsOverride(EnvKeys.rabbitMQService) { newInstanceFromKey(_) } + checkNoPropsOverride(EnvKeys.rabbitMQProducer) { newInstanceFromKey(_) } + checkNoPropsOverride(EnvKeys.storeWatcherService) { newInstanceFromKey(_) } checkPropsOverride(EnvKeys.userStateTimestampThreshold) { (envKey, propValue) ⇒ diff --git a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala index 55e13e4..f933964 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala @@ -45,7 +45,7 @@ import gr.grnet.aquarium.util.date.TimeHelpers import gr.grnet.aquarium.event.model.im.IMEventModel import gr.grnet.aquarium.actor.message.{GetUserWalletResponseData, GetUserWalletResponse, GetUserWalletRequest, GetUserStateResponse, GetUserBalanceResponseData, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest} import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf} -import gr.grnet.aquarium.AquariumInternalError +import gr.grnet.aquarium.{Aquarium, AquariumInternalError} import gr.grnet.aquarium.computation.BillingMonthInfo import gr.grnet.aquarium.charging.state.UserStateBootstrap import gr.grnet.aquarium.charging.state.{WorkingAgreementHistory, WorkingUserState, UserStateModel} @@ -380,7 +380,10 @@ class UserActor extends ReflectiveRoleableActor { else { computeBatch() } - + aquarium(Aquarium.EnvKeys.rabbitMQProducer). + sendMessage("{userid: \"%s\", state: %s}". + format(this._userID, + this._workingUserState.totalCredits >= 0.0)) DEBUG("Updated %s", this._workingUserState) logSeparator() } diff --git a/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQProducer.scala b/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQProducer.scala index 8e95f8f..11cff75 100644 --- a/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQProducer.scala +++ b/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQProducer.scala @@ -1,13 +1,14 @@ package gr.grnet.aquarium.connector.rabbitmq +import conf.RabbitMQConsumerConf import conf.{RabbitMQKeys, RabbitMQConsumerConf} import conf.RabbitMQKeys.{RabbitMQConfKeys, RabbitMQConKeys} -import gr.grnet.aquarium.{ResourceLocator, AquariumBuilder, Aquarium} -import com.rabbitmq.client.{MessageProperties, Channel, Connection, ConnectionFactory} +import gr.grnet.aquarium.{Configurable, ResourceLocator, AquariumBuilder, Aquarium} +import com.rabbitmq.client._ import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import com.ckkloverdos.props.Props import gr.grnet.aquarium.converter.StdConverters -import gr.grnet.aquarium.util.Tags +import gr.grnet.aquarium.util.{Lock, Tags} import gr.grnet.aquarium.store.memory.MemStoreProvider import java.io.File import com.ckkloverdos.resource.FileStreamResource @@ -48,14 +49,24 @@ import com.ckkloverdos.resource.FileStreamResource * or implied, of GRNET S.A. */ -class RabbitMQProducer(val aquarium: Aquarium) { - lazy val conf: RabbitMQConsumerConf = { - var props = aquarium(Aquarium.EnvKeys.originalProps) +class RabbitMQProducer extends Configurable { + private[this] var _conf: RabbitMQConsumerConf = _ + private[this] var _factory: ConnectionFactory = _ + private[this] var _connection: Connection = _ + private[this] var _channel: Channel = _ + private[this] var _servers : Array[Address] = _ + private[this] final val lock = new Lock() + + def propertyPrefix: Option[String] = Some(RabbitMQKeys.PropertiesPrefix) + // Some(RabbitMQConfKeys.imevents_credit) + + + def configure(props: Props): Unit = { var prop = props.get(RabbitMQConfKeys.imevents_credit).getOr("") - Console.println("Prop: " + prop) + // Console.println("Prop: " + prop) val Array(exchange, routing) = prop.split(":") //Console.println("ex: " + exchange + " routing: " + routing) - val conf = RabbitMQConsumerConf( + _conf = RabbitMQConsumerConf( tag = Tags.IMEventTag, exchangeName = exchange, routingKey = routing, @@ -65,39 +76,25 @@ class RabbitMQProducer(val aquarium: Aquarium) { channelConf = RabbitMQKeys.DefaultChannelConf, queueConf = RabbitMQKeys.DefaultQueueConf ) - conf - } - private[this] var _factory: ConnectionFactory = { - val factory = new ConnectionFactory - factory.setConnectionTimeout(conf.connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt) - factory.setUsername(conf.connectionConf(RabbitMQConKeys.username)) - factory.setPassword(conf.connectionConf(RabbitMQConKeys.password)) - factory.setVirtualHost(conf.connectionConf(RabbitMQConKeys.vhost)) - factory.setRequestedHeartbeat(conf.connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt) - factory - } - - private[this] var _connection: Connection = _ - private[this] var _channel: Channel = _ - //private[this] val _state = new AtomicReference[State](Shutdown) - private[this] val _pingIsScheduled = new AtomicBoolean(false) - - private[this] lazy val servers = { - val s = conf.connectionConf(RabbitMQConKeys.servers) - for { s1 <- s } Console.err.println("Servers: " + s1.toString) - s + _factory = new ConnectionFactory + _factory.setConnectionTimeout(_conf.connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt) + _factory.setUsername(_conf.connectionConf(RabbitMQConKeys.username)) + _factory.setPassword(_conf.connectionConf(RabbitMQConKeys.password)) + _factory.setVirtualHost(_conf.connectionConf(RabbitMQConKeys.vhost)) + _factory.setRequestedHeartbeat(_conf.connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt) + _servers = _conf.connectionConf(RabbitMQConKeys.servers) } private[this] def withChannel[A]( next : => A) = { try { - var connection : Connection = null - var channel : Channel = null + lock.withLock { if (_connection == null ||_connection.isOpen == false ) - _connection =_factory.newConnection(servers) + _connection =_factory.newConnection(_servers) if (_channel == null ||_channel.isOpen == false ) _channel = _connection.createChannel - assert(_connection.isOpen && _channel.isOpen) - next + assert(_connection.isOpen && _channel.isOpen) + next + } } catch { case e: Exception => e.printStackTrace @@ -106,7 +103,7 @@ class RabbitMQProducer(val aquarium: Aquarium) { def sendMessage(payload:String) = withChannel { - _channel.basicPublish(conf.exchangeName, conf.routingKey, + _channel.basicPublish(_conf.exchangeName, _conf.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, payload.getBytes) } @@ -122,7 +119,10 @@ object RabbitMQProducer { def main(args: Array[String]) = { - new RabbitMQProducer(aquarium).sendMessage("{userid: \"pgerakios@grnet.gr\", state:true}") + aquarium(Aquarium.EnvKeys.rabbitMQProducer). + sendMessage("{userid: \"pgerakios@grnet.gr\", state:true}") + Console.err.println("Message sent") + aquarium.stop() () } } \ No newline at end of file