import gr.grnet.aquarium.event.model.im.IMEventModel
import gr.grnet.aquarium.actor.message.{GetUserWalletResponseData, GetUserWalletResponse, GetUserWalletRequest, GetUserStateResponse, GetUserBalanceResponseData, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
-import gr.grnet.aquarium.AquariumInternalError
+import gr.grnet.aquarium.{Aquarium, AquariumInternalError}
import gr.grnet.aquarium.computation.BillingMonthInfo
import gr.grnet.aquarium.charging.state.UserStateBootstrap
import gr.grnet.aquarium.charging.state.{WorkingAgreementHistory, WorkingUserState, UserStateModel}
else {
computeBatch()
}
-
+ aquarium(Aquarium.EnvKeys.rabbitMQProducer).
+ sendMessage("{userid: \"%s\", state: %s}".
+ format(this._userID,
+ this._workingUserState.totalCredits >= 0.0))
DEBUG("Updated %s", this._workingUserState)
logSeparator()
}
package gr.grnet.aquarium.connector.rabbitmq
+import conf.RabbitMQConsumerConf
import conf.{RabbitMQKeys, RabbitMQConsumerConf}
import conf.RabbitMQKeys.{RabbitMQConfKeys, RabbitMQConKeys}
-import gr.grnet.aquarium.{ResourceLocator, AquariumBuilder, Aquarium}
-import com.rabbitmq.client.{MessageProperties, Channel, Connection, ConnectionFactory}
+import gr.grnet.aquarium.{Configurable, ResourceLocator, AquariumBuilder, Aquarium}
+import com.rabbitmq.client._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import com.ckkloverdos.props.Props
import gr.grnet.aquarium.converter.StdConverters
-import gr.grnet.aquarium.util.Tags
+import gr.grnet.aquarium.util.{Lock, Tags}
import gr.grnet.aquarium.store.memory.MemStoreProvider
import java.io.File
import com.ckkloverdos.resource.FileStreamResource
* or implied, of GRNET S.A.
*/
-class RabbitMQProducer(val aquarium: Aquarium) {
- lazy val conf: RabbitMQConsumerConf = {
- var props = aquarium(Aquarium.EnvKeys.originalProps)
+class RabbitMQProducer extends Configurable {
+ private[this] var _conf: RabbitMQConsumerConf = _
+ private[this] var _factory: ConnectionFactory = _
+ private[this] var _connection: Connection = _
+ private[this] var _channel: Channel = _
+ private[this] var _servers : Array[Address] = _
+ private[this] final val lock = new Lock()
+
+ def propertyPrefix: Option[String] = Some(RabbitMQKeys.PropertiesPrefix)
+ // Some(RabbitMQConfKeys.imevents_credit)
+
+
+ def configure(props: Props): Unit = {
var prop = props.get(RabbitMQConfKeys.imevents_credit).getOr("")
- Console.println("Prop: " + prop)
+ // Console.println("Prop: " + prop)
val Array(exchange, routing) = prop.split(":")
//Console.println("ex: " + exchange + " routing: " + routing)
- val conf = RabbitMQConsumerConf(
+ _conf = RabbitMQConsumerConf(
tag = Tags.IMEventTag,
exchangeName = exchange,
routingKey = routing,
channelConf = RabbitMQKeys.DefaultChannelConf,
queueConf = RabbitMQKeys.DefaultQueueConf
)
- conf
- }
- private[this] var _factory: ConnectionFactory = {
- val factory = new ConnectionFactory
- factory.setConnectionTimeout(conf.connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
- factory.setUsername(conf.connectionConf(RabbitMQConKeys.username))
- factory.setPassword(conf.connectionConf(RabbitMQConKeys.password))
- factory.setVirtualHost(conf.connectionConf(RabbitMQConKeys.vhost))
- factory.setRequestedHeartbeat(conf.connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
- factory
- }
-
- private[this] var _connection: Connection = _
- private[this] var _channel: Channel = _
- //private[this] val _state = new AtomicReference[State](Shutdown)
- private[this] val _pingIsScheduled = new AtomicBoolean(false)
-
- private[this] lazy val servers = {
- val s = conf.connectionConf(RabbitMQConKeys.servers)
- for { s1 <- s } Console.err.println("Servers: " + s1.toString)
- s
+ _factory = new ConnectionFactory
+ _factory.setConnectionTimeout(_conf.connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
+ _factory.setUsername(_conf.connectionConf(RabbitMQConKeys.username))
+ _factory.setPassword(_conf.connectionConf(RabbitMQConKeys.password))
+ _factory.setVirtualHost(_conf.connectionConf(RabbitMQConKeys.vhost))
+ _factory.setRequestedHeartbeat(_conf.connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
+ _servers = _conf.connectionConf(RabbitMQConKeys.servers)
}
private[this] def withChannel[A]( next : => A) = {
try {
- var connection : Connection = null
- var channel : Channel = null
+ lock.withLock {
if (_connection == null ||_connection.isOpen == false )
- _connection =_factory.newConnection(servers)
+ _connection =_factory.newConnection(_servers)
if (_channel == null ||_channel.isOpen == false )
_channel = _connection.createChannel
- assert(_connection.isOpen && _channel.isOpen)
- next
+ assert(_connection.isOpen && _channel.isOpen)
+ next
+ }
} catch {
case e: Exception =>
e.printStackTrace
def sendMessage(payload:String) =
withChannel {
- _channel.basicPublish(conf.exchangeName, conf.routingKey,
+ _channel.basicPublish(_conf.exchangeName, _conf.routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
payload.getBytes)
}
def main(args: Array[String]) = {
- new RabbitMQProducer(aquarium).sendMessage("{userid: \"pgerakios@grnet.gr\", state:true}")
+ aquarium(Aquarium.EnvKeys.rabbitMQProducer).
+ sendMessage("{userid: \"pgerakios@grnet.gr\", state:true}")
+ Console.err.println("Message sent")
+ aquarium.stop()
()
}
}
\ No newline at end of file