import com.rabbitmq.client._
import com.ckkloverdos.props.Props
import gr.grnet.aquarium.converter.StdConverters
-import gr.grnet.aquarium.util.Lock
+import util.{Loggable, Lock}
import gr.grnet.aquarium.store.memory.MemStoreProvider
import java.io.File
import com.ckkloverdos.resource.FileStreamResource
import scala.Some
import collection.immutable.{TreeMap, TreeSet}
+import java.util.concurrent.atomic.AtomicLong
+import akka.actor.{Actor, ActorRef}
+import com.google.common.eventbus.Subscribe
+import gr.grnet.aquarium.service.event.AquariumCreatedEvent
+import collection.mutable
/*
* @author Prodromos Gerakios <pgerakios@grnet.gr>
*/
-class RabbitMQProducer extends Configurable {
+class RabbitMQProducer extends AquariumAwareSkeleton with Configurable with Loggable {
private[this] var _conf: RabbitMQConsumerConf = _
private[this] var _factory: ConnectionFactory = _
private[this] var _connection: Connection = _
private[this] var _channel: Channel = _
private[this] var _servers : Array[Address] = _
private[this] final val lock = new Lock()
- private[this] var _exchangeName : String = _
- private[this] var _routingKey :String = _
def propertyPrefix: Option[String] = Some(RabbitMQKeys.PropertiesPrefix)
// Some(RabbitMQConfKeys.imevents_credit)
+ @volatile private[this] var _unsentMessages = mutable.Queue[()=>Unit]()
@volatile private[this] var _unconfirmedSet = new TreeSet[Long]()
- @volatile private[this] var _unconfirmedMessages = new TreeMap[Long,String]()
+ @volatile private[this] var _unconfirmedMessages = new TreeMap[Long,()=>Unit]()
+
+
+ @volatile private[this] var _actorRef : ActorRef = _
+ private[this] var _resendPeriodMillis = 1000L
+
+
+ @Subscribe
+ override def awareOfAquarium(event: AquariumCreatedEvent) = {
+ super.awareOfAquarium(event)
+ assert(aquarium!=null && aquarium.akkaService != null)
+ resendMessages // start our daemon
+ }
+
+ private[this] def resendMessages() : Unit = {
+ aquarium.timerService.scheduleOnce(
+ "RabbitMQProducer.resendMessages",
+ {
+ //Console.err.println("RabbitMQProducer Timer ...")
+ if(_actorRef==null) {
+ _actorRef = aquarium.akkaService.createNamedActor[RabbitMQProducerActor]("RabbitMQProducerActor")
+ }
+ if(_actorRef != null){
+ //Console.err.println("RabbitMQProducer Timer --> messages ...")
+ var msgs : mutable.Queue[()=>Unit] = null
+ lock.withLock {
+ if(isChannelOpen) {
+ msgs = _unsentMessages
+ _unsentMessages = mutable.Queue[()=>Unit]()
+ }
+ }
+ if(msgs!=null){
+ if(msgs.length>0) Console.err.println("RabbitMQProducer Timer --> messages ..." + msgs.length)
+ for {msg <- msgs} {
+ Console.err.println("RabbitMQProducer Timer sending message .." + msg.hashCode)
+ _actorRef ! msg
+ }
+ }
+ } else {
+ Console.err.println("Akka ActorSystem is null. Waiting ...")
+ }
+ resendMessages()
+ },
+ this._resendPeriodMillis,
+ true
+ )
+ ()
+ }
+
+ private class RabbitMQProducerActor extends Actor {
+ def receive = {
+ case sendMessage:(() => Unit) =>
+ Console.err.println("Executing msg ... " + sendMessage.hashCode)
+ sendMessage
+ case x : AnyRef =>
+ Console.err.println("Dammit ..." + x.getClass.getSimpleName)
+ ()
+ }
+ }
def configure(props: Props): Unit = {
- val propName = RabbitMQConfKeys.imevents_credit
- def exn () = throw new AquariumInternalError(new Exception, "While obtaining value for key %s in properties".format(propName))
- val prop = props.get(propName).getOr(exn())
- if (prop.isEmpty) exn()
val connectionConf = RabbitMQKeys.makeConnectionConf(props)
- val Array(exchangeName, routingKey) = prop.split(":")
- _exchangeName = exchangeName
- _routingKey = routingKey
_factory = new ConnectionFactory
_factory.setConnectionTimeout(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
_factory.setUsername(connectionConf(RabbitMQConKeys.username))
_channel.confirmSelect
_channel.addConfirmListener(new ConfirmListener {
- private [this] def subset(seqNo:Long,multiple:Boolean) : TreeMap[Long,String] = {
+ private [this] def cutSubset(seqNo:Long,multiple:Boolean) : TreeMap[Long,()=>Unit] =
+ lock.withLock {
val set = if (multiple)
_unconfirmedSet.range(0,seqNo+1)
else
_unconfirmedSet.range(seqNo,seqNo)
_unconfirmedSet = _unconfirmedSet -- set
- val ret : TreeMap[Long,String] = set.foldLeft(TreeMap[Long,String]())({(map,seq)=>
+ val ret : TreeMap[Long,()=>Unit] = set.foldLeft(TreeMap[Long,()=>Unit]())({(map,seq)=>
_unconfirmedMessages.get(seq) match{
case None => map
case Some(s) => map + ((seq,s))
def handleAck(seqNo:Long,multiple:Boolean) = {
- withChannel {
- //Console.err.println("Received ack for msg " + _unconfirmedMessages.get(seqNo) )
- subset(seqNo,multiple)
- }
+ Console.err.println("Received ack for " + seqNo)
+ cutSubset(seqNo,multiple)
}
def handleNack(seqNo:Long,multiple:Boolean) = {
- withChannel {
- //Console.err.println("Received Nack for msg " + _unconfirmedMessages.get(seqNo) )
- for { (_,msg) <- subset(seqNo,multiple) }
- sendMessage(msg)
- }
+ Console.err.println("Received Nack for msg for " + seqNo)
+ for {msg <- cutSubset(seqNo,multiple)} _actorRef ! msg
}
})
}
- private[this] def withChannel[A]( next : => A) = {
- try {
- lock.withLock {
- if (_connection == null ||_connection.isOpen == false )
- _connection =_factory.newConnection(_servers)
- if (_channel == null ||_channel.isOpen == false )
- _channel = _connection.createChannel
- assert(_connection.isOpen && _channel.isOpen)
- next
- }
- } catch {
- case e: Exception =>
- e.printStackTrace
+ private[this] def isChannelOpen: Boolean =
+ lock.withLock{
+ if (_connection == null ||_connection.isOpen == false )
+ _connection =_factory.newConnection(_servers)
+ if (_channel == null ||_channel.isOpen == false )
+ _channel = _connection.createChannel
+ _connection.isOpen && _channel.isOpen
}
+
+ def fun(s:String) : () => Unit = {
+ () => {}
}
- def sendMessage(payload:String) =
- withChannel {
- var seq : Long = _channel.getNextPublishSeqNo()
- _unconfirmedSet += seq
- _unconfirmedMessages += ((seq,payload))
- _channel.basicPublish(_exchangeName,_routingKey,
- MessageProperties.PERSISTENT_TEXT_PLAIN,
- payload.getBytes)
- }
+ def sendMessage(exchangeName:String,routingKey:String,payload:String) = {
+ def msg () : Unit =
+ lock.withLock {
+ try {
+ if(isChannelOpen) {
+ var seq : Long = _channel.getNextPublishSeqNo()
+ _unconfirmedSet += seq
+ _unconfirmedMessages += ((seq,msg))
+ _channel.basicPublish(exchangeName,routingKey,
+ MessageProperties.PERSISTENT_TEXT_PLAIN,
+ payload.getBytes)
+ Console.err.println("####Sent message " + payload + " with seqno=" + seq)
+ } else {
+ _unsentMessages += msg
+ }
+ } catch {
+ case e: Exception =>
+ _unsentMessages += msg
+ }
+ }
+ if(_actorRef != null)
+ _actorRef ! msg
+ else
+ lock.withLock(_unsentMessages += msg)
+ }
}
object RabbitMQProducer {
update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
update(Aquarium.EnvKeys.eventsStoreFolder, Some(new File(".."))).
build()
- aquarium(Aquarium.EnvKeys.rabbitMQProducer).sendMessage("Test string !!!!")
+
+ aquarium.start()
+
+ //RabbitMQProducer.wait(1000)
+ val propName = RabbitMQConfKeys.imevents_credit
+ def exn () =
+ throw new AquariumInternalError(new Exception, "While obtaining value for key %s in properties".format(propName))
+ val prop = _props.get(propName).getOr(exn())
+ if (prop.isEmpty) exn()
+ val Array(exchangeName, routingKey) = prop.split(":")
+ aquarium(Aquarium.EnvKeys.rabbitMQProducer).sendMessage(exchangeName,routingKey,"Test string !!!!")
Console.err.println("Message sent")
- //aquarium.start()
()
}
}
\ No newline at end of file
import com.ckkloverdos.props.Props
import com.google.common.eventbus.Subscribe
-import gr.grnet.aquarium.{Aquarium, AquariumAwareSkeleton, Configurable}
+import gr.grnet.aquarium.{AquariumInternalError, Aquarium, AquariumAwareSkeleton, Configurable}
import gr.grnet.aquarium.converter.StdConverters
import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle}
def converters = aquarium.converters
+ private[this] var _creditExchangeName : String = ""
+ private[this] var _creditRoutingKey : String = ""
+
+
/**
* Configure this instance with the provided properties.
*
lg("Got %s consumers".format(this._consumers.size))
this._consumers.foreach(logger.debug("Configured {}", _))
+
+ val propName = RabbitMQConfKeys.imevents_credit
+ def exn () =
+ throw new AquariumInternalError(new Exception, "While obtaining value for key %s in properties".format(propName))
+ val prop = _props.get(propName).getOr(exn())
+ if (prop.isEmpty) exn()
+ val Array(exchangeName, routingKey) = prop.split(":")
+ _creditExchangeName = exchangeName
+ _creditRoutingKey = routingKey
}
def start() = {
@Subscribe
def handleUserBalance(event:BalanceEvent): Unit = {
- aquarium(Aquarium.EnvKeys.rabbitMQProducer).sendMessage(event.toJsonString)
+ aquarium(Aquarium.EnvKeys.rabbitMQProducer).
+ sendMessage(this._creditExchangeName,this._creditRoutingKey,event.toJsonString)
}
@Subscribe