Towards a perfect RabbitMQProducer -> actor not receiving msgs
authorProdromos Gerakios <pgerakios@grnet.gr>
Tue, 21 Aug 2012 13:26:11 +0000 (16:26 +0300)
committerProdromos Gerakios <pgerakios@grnet.gr>
Tue, 21 Aug 2012 13:26:11 +0000 (16:26 +0300)
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQProducer.scala
src/main/scala/gr/grnet/aquarium/service/AkkaService.scala
src/main/scala/gr/grnet/aquarium/service/RabbitMQService.scala

index f778467..f43a71b 100644 (file)
@@ -6,12 +6,17 @@ import gr.grnet.aquarium._
 import com.rabbitmq.client._
 import com.ckkloverdos.props.Props
 import gr.grnet.aquarium.converter.StdConverters
-import gr.grnet.aquarium.util.Lock
+import util.{Loggable, Lock}
 import gr.grnet.aquarium.store.memory.MemStoreProvider
 import java.io.File
 import com.ckkloverdos.resource.FileStreamResource
 import scala.Some
 import collection.immutable.{TreeMap, TreeSet}
+import java.util.concurrent.atomic.AtomicLong
+import akka.actor.{Actor, ActorRef}
+import com.google.common.eventbus.Subscribe
+import gr.grnet.aquarium.service.event.AquariumCreatedEvent
+import collection.mutable
 
 
 /*
@@ -54,32 +59,82 @@ import collection.immutable.{TreeMap, TreeSet}
  * @author Prodromos Gerakios <pgerakios@grnet.gr>
  */
 
-class RabbitMQProducer extends Configurable {
+class RabbitMQProducer extends AquariumAwareSkeleton with Configurable with Loggable  {
   private[this] var _conf: RabbitMQConsumerConf = _
   private[this] var _factory: ConnectionFactory = _
   private[this] var _connection: Connection = _
   private[this] var _channel: Channel = _
   private[this] var _servers : Array[Address] = _
   private[this] final val lock = new Lock()
-  private[this] var _exchangeName : String = _
-  private[this] var _routingKey :String = _
 
   def propertyPrefix: Option[String] = Some(RabbitMQKeys.PropertiesPrefix)
   //  Some(RabbitMQConfKeys.imevents_credit)
 
 
+  @volatile private[this] var _unsentMessages = mutable.Queue[()=>Unit]()
   @volatile private[this] var _unconfirmedSet = new TreeSet[Long]()
-  @volatile private[this] var _unconfirmedMessages = new TreeMap[Long,String]()
+  @volatile private[this] var _unconfirmedMessages = new TreeMap[Long,()=>Unit]()
+
+
+  @volatile private[this] var _actorRef : ActorRef = _
+  private[this] var _resendPeriodMillis = 1000L
+
+
+  @Subscribe
+  override def awareOfAquarium(event: AquariumCreatedEvent) = {
+    super.awareOfAquarium(event)
+    assert(aquarium!=null && aquarium.akkaService != null)
+    resendMessages        // start our daemon
+  }
+
+  private[this] def resendMessages() : Unit = {
+    aquarium.timerService.scheduleOnce(
+    "RabbitMQProducer.resendMessages",
+    {
+      //Console.err.println("RabbitMQProducer Timer ...")
+      if(_actorRef==null) {
+        _actorRef =  aquarium.akkaService.createNamedActor[RabbitMQProducerActor]("RabbitMQProducerActor")
+      }
+      if(_actorRef != null){
+       //Console.err.println("RabbitMQProducer Timer --> messages ...")
+       var msgs : mutable.Queue[()=>Unit] = null
+       lock.withLock {
+          if(isChannelOpen) {
+            msgs  = _unsentMessages
+            _unsentMessages = mutable.Queue[()=>Unit]()
+          }
+       }
+       if(msgs!=null){
+         if(msgs.length>0) Console.err.println("RabbitMQProducer Timer --> messages ..." + msgs.length)
+         for {msg <- msgs} {
+           Console.err.println("RabbitMQProducer Timer sending message .." + msg.hashCode)
+           _actorRef ! msg
+         }
+       }
+      } else {
+        Console.err.println("Akka ActorSystem is null. Waiting ...")
+      }
+      resendMessages()
+    },
+    this._resendPeriodMillis,
+    true
+    )
+    ()
+  }
+
+  private class RabbitMQProducerActor extends Actor {
+    def receive = {
+      case sendMessage:(() => Unit) =>
+        Console.err.println("Executing msg ... " + sendMessage.hashCode)
+        sendMessage
+      case x  : AnyRef     =>
+        Console.err.println("Dammit  ..." + x.getClass.getSimpleName)
+        ()
+    }
+  }
 
   def configure(props: Props): Unit = {
-    val propName = RabbitMQConfKeys.imevents_credit
-    def exn () = throw new AquariumInternalError(new Exception, "While obtaining value for key %s in properties".format(propName))
-    val prop = props.get(propName).getOr(exn())
-    if (prop.isEmpty) exn()
     val connectionConf = RabbitMQKeys.makeConnectionConf(props)
-    val Array(exchangeName, routingKey) = prop.split(":")
-    _exchangeName = exchangeName
-    _routingKey = routingKey
     _factory = new ConnectionFactory
     _factory.setConnectionTimeout(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
     _factory.setUsername(connectionConf(RabbitMQConKeys.username))
@@ -92,13 +147,14 @@ class RabbitMQProducer extends Configurable {
     _channel.confirmSelect
     _channel.addConfirmListener(new ConfirmListener {
 
-      private [this] def subset(seqNo:Long,multiple:Boolean) : TreeMap[Long,String] = {
+      private [this] def cutSubset(seqNo:Long,multiple:Boolean) : TreeMap[Long,()=>Unit] =
+        lock.withLock {
          val set = if (multiple)
                     _unconfirmedSet.range(0,seqNo+1)
                    else
                     _unconfirmedSet.range(seqNo,seqNo)
          _unconfirmedSet = _unconfirmedSet -- set
-         val ret : TreeMap[Long,String] = set.foldLeft(TreeMap[Long,String]())({(map,seq)=>
+         val ret : TreeMap[Long,()=>Unit] = set.foldLeft(TreeMap[Long,()=>Unit]())({(map,seq)=>
            _unconfirmedMessages.get(seq) match{
              case None => map
              case Some(s) => map + ((seq,s))
@@ -109,47 +165,55 @@ class RabbitMQProducer extends Configurable {
 
 
       def handleAck(seqNo:Long,multiple:Boolean) = {
-        withChannel {
-          //Console.err.println("Received ack for msg " + _unconfirmedMessages.get(seqNo) )
-          subset(seqNo,multiple)
-        }
+        Console.err.println("Received ack for  " + seqNo)
+        cutSubset(seqNo,multiple)
       }
 
       def handleNack(seqNo:Long,multiple:Boolean) = {
-        withChannel {
-          //Console.err.println("Received Nack for msg " + _unconfirmedMessages.get(seqNo) )
-          for { (_,msg) <- subset(seqNo,multiple) }
-            sendMessage(msg)
-        }
+        Console.err.println("Received Nack for msg for " + seqNo)
+        for {msg <- cutSubset(seqNo,multiple)} _actorRef ! msg
       }
     })
   }
 
-  private[this] def withChannel[A]( next : => A) = {
-    try {
-      lock.withLock {
-      if (_connection == null ||_connection.isOpen == false )
-         _connection =_factory.newConnection(_servers)
-      if (_channel == null ||_channel.isOpen == false )
-        _channel = _connection.createChannel
-        assert(_connection.isOpen && _channel.isOpen)
-        next
-     }
-    } catch {
-        case e: Exception =>
-          e.printStackTrace
+  private[this] def isChannelOpen: Boolean =
+    lock.withLock{
+    if (_connection == null ||_connection.isOpen == false )
+      _connection =_factory.newConnection(_servers)
+    if (_channel == null ||_channel.isOpen == false )
+      _channel = _connection.createChannel
+    _connection.isOpen && _channel.isOpen
     }
+
+  def fun(s:String)  : () => Unit = {
+     () => {}
   }
 
-  def sendMessage(payload:String) =
-    withChannel {
-      var seq : Long = _channel.getNextPublishSeqNo()
-      _unconfirmedSet += seq
-      _unconfirmedMessages += ((seq,payload))
-      _channel.basicPublish(_exchangeName,_routingKey,
-                            MessageProperties.PERSISTENT_TEXT_PLAIN,
-                            payload.getBytes)
-    }
+  def sendMessage(exchangeName:String,routingKey:String,payload:String) = {
+    def msg () : Unit =
+      lock.withLock {
+        try {
+             if(isChannelOpen) {
+              var seq : Long = _channel.getNextPublishSeqNo()
+              _unconfirmedSet += seq
+              _unconfirmedMessages += ((seq,msg))
+              _channel.basicPublish(exchangeName,routingKey,
+                MessageProperties.PERSISTENT_TEXT_PLAIN,
+                payload.getBytes)
+               Console.err.println("####Sent message " + payload + " with seqno=" + seq)
+            } else {
+              _unsentMessages += msg
+             }
+        } catch {
+            case e: Exception =>
+              _unsentMessages += msg
+        }
+      }
+    if(_actorRef != null)
+      _actorRef ! msg
+    else
+      lock.withLock(_unsentMessages += msg)
+ }
 }
 
 object RabbitMQProducer  {
@@ -160,9 +224,18 @@ object RabbitMQProducer  {
     update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
     update(Aquarium.EnvKeys.eventsStoreFolder, Some(new File(".."))).
     build()
-    aquarium(Aquarium.EnvKeys.rabbitMQProducer).sendMessage("Test string !!!!")
+
+    aquarium.start()
+
+    //RabbitMQProducer.wait(1000)
+    val propName = RabbitMQConfKeys.imevents_credit
+    def exn () =
+      throw new AquariumInternalError(new Exception, "While obtaining value for key %s in properties".format(propName))
+    val prop = _props.get(propName).getOr(exn())
+    if (prop.isEmpty) exn()
+    val Array(exchangeName, routingKey) = prop.split(":")
+    aquarium(Aquarium.EnvKeys.rabbitMQProducer).sendMessage(exchangeName,routingKey,"Test string !!!!")
     Console.err.println("Message sent")
-    //aquarium.start()
     ()
   }
 }
\ No newline at end of file
index f9219cc..423fa62 100644 (file)
@@ -35,7 +35,7 @@
 
 package gr.grnet.aquarium.service
 
-import akka.actor.{Props, ActorRef, ActorSystem}
+import akka.actor.{Actor, Props, ActorRef, ActorSystem}
 import gr.grnet.aquarium.util.{Loggable, Lifecycle, shortClassNameOf}
 import gr.grnet.aquarium.ResourceLocator.SysEnvs
 import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable, AquariumException, AquariumInternalError}
@@ -183,6 +183,9 @@ final class AkkaService extends AquariumAwareSkeleton with Configurable with Lif
     gracefullyStopUserActor(userID, actorRef)
   }
 
+  def createNamedActor[T <: Actor:ClassManifest](name:String) : ActorRef=
+     this.actorSystem.actorOf(Props[T],name)
+
   def getOrCreateUserActor(userID: String): ActorRef = {
     if(this.isShuttingDown.get()) {
       throw new AquariumException(
index 9bd9fd9..fc646f6 100644 (file)
@@ -37,7 +37,7 @@ package gr.grnet.aquarium.service
 
 import com.ckkloverdos.props.Props
 import com.google.common.eventbus.Subscribe
-import gr.grnet.aquarium.{Aquarium, AquariumAwareSkeleton, Configurable}
+import gr.grnet.aquarium.{AquariumInternalError, Aquarium, AquariumAwareSkeleton, Configurable}
 import gr.grnet.aquarium.converter.StdConverters
 import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
 import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle}
@@ -69,6 +69,10 @@ class RabbitMQService extends Loggable with Lifecycle with Configurable with Aqu
 
   def converters = aquarium.converters
 
+  private[this] var _creditExchangeName : String = ""
+  private[this] var _creditRoutingKey   : String = ""
+
+
   /**
    * Configure this instance with the provided properties.
    *
@@ -167,6 +171,15 @@ class RabbitMQService extends Loggable with Lifecycle with Configurable with Aqu
     lg("Got %s consumers".format(this._consumers.size))
 
     this._consumers.foreach(logger.debug("Configured {}", _))
+
+    val propName = RabbitMQConfKeys.imevents_credit
+    def exn () =
+      throw new AquariumInternalError(new Exception, "While obtaining value for key %s in properties".format(propName))
+    val prop = _props.get(propName).getOr(exn())
+    if (prop.isEmpty) exn()
+    val Array(exchangeName, routingKey) = prop.split(":")
+    _creditExchangeName = exchangeName
+    _creditRoutingKey = routingKey
   }
 
   def start() = {
@@ -208,7 +221,8 @@ class RabbitMQService extends Loggable with Lifecycle with Configurable with Aqu
 
   @Subscribe
   def handleUserBalance(event:BalanceEvent): Unit = {
-    aquarium(Aquarium.EnvKeys.rabbitMQProducer).sendMessage(event.toJsonString)
+    aquarium(Aquarium.EnvKeys.rabbitMQProducer).
+    sendMessage(this._creditExchangeName,this._creditRoutingKey,event.toJsonString)
   }
 
   @Subscribe