Reliable message passing for RabbitMQProducer
authorProdromos Gerakios <pgerakios@grnet.gr>
Mon, 6 Aug 2012 12:25:55 +0000 (15:25 +0300)
committerProdromos Gerakios <pgerakios@grnet.gr>
Mon, 6 Aug 2012 12:25:55 +0000 (15:25 +0300)
src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQProducer.scala
src/main/scala/gr/grnet/aquarium/service/RabbitMQService.scala

index a8db62b..6f974e6 100644 (file)
@@ -42,7 +42,8 @@ import gr.grnet.aquarium.actor._
 import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
 import gr.grnet.aquarium.actor.message.config.{InitializeUserActorState, AquariumPropertiesLoaded}
 import gr.grnet.aquarium.util.date.TimeHelpers
-import gr.grnet.aquarium.event.model.im.{BalanceEvent, IMEventModel}
+import gr.grnet.aquarium.service.event.BalanceEvent
+import gr.grnet.aquarium.event.model.im.IMEventModel
 import message._
 import config.AquariumPropertiesLoaded
 import config.InitializeUserActorState
@@ -401,8 +402,8 @@ class UserActor extends ReflectiveRoleableActor {
       computeBatch()
     }
     if(oldTotalCredits * this._workingUserState.totalCredits < 0)
-      BalanceEvent.send(aquarium,this._workingUserState.userID,
-                        this._workingUserState.totalCredits>=0)
+      aquarium.eventBus ! new BalanceEvent(this._workingUserState.userID,
+                                           this._workingUserState.totalCredits>=0)
     DEBUG("Updated %s", this._workingUserState)
     logSeparator()
   }
index b77af94..abc4d62 100644 (file)
@@ -14,6 +14,8 @@ import gr.grnet.aquarium.store.memory.MemStoreProvider
 import java.io.File
 import com.ckkloverdos.resource.FileStreamResource
 import scala.Some
+import collection.immutable.{TreeMap, SortedSet, TreeSet}
+import java.util.Collections
 
 
 /*
@@ -70,6 +72,9 @@ class RabbitMQProducer extends Configurable {
   //  Some(RabbitMQConfKeys.imevents_credit)
 
 
+  @volatile private[this] var _unconfirmedSet = new TreeSet[Long]()
+  @volatile private[this] var _unconfirmedMessages = new TreeMap[Long,String]()
+
   def configure(props: Props): Unit = {
     val propName = RabbitMQConfKeys.imevents_credit
     def exn () = throw new AquariumInternalError(new Exception, "While obtaining value for key %s in properties".format(propName))
@@ -86,6 +91,42 @@ class RabbitMQProducer extends Configurable {
     _factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
     _factory.setRequestedHeartbeat(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
     _servers = connectionConf(RabbitMQConKeys.servers)
+    _connection =_factory.newConnection(_servers)
+    _channel = _connection.createChannel
+    _channel.confirmSelect
+    _channel.addConfirmListener(new ConfirmListener {
+
+      private [this] def subset(seqNo:Long,multiple:Boolean) : TreeMap[Long,String] = {
+         val set = if (multiple)
+                    _unconfirmedSet.range(0,seqNo+1)
+                   else
+                    _unconfirmedSet.range(seqNo,seqNo)
+         _unconfirmedSet = _unconfirmedSet -- set
+         val ret : TreeMap[Long,String] = set.foldLeft(TreeMap[Long,String]())({(map,seq)=>
+           _unconfirmedMessages.get(seq) match{
+             case None => map
+             case Some(s) => map + ((seq,s))
+         }})
+         _unconfirmedMessages = _unconfirmedMessages -- set
+        ret
+       }
+
+
+      def handleAck(seqNo:Long,multiple:Boolean) = {
+        withChannel {
+          Console.err.println("Received ack for msg " + _unconfirmedMessages.get(seqNo) )
+          subset(seqNo,multiple)
+        }
+      }
+
+      def handleNack(seqNo:Long,multiple:Boolean) = {
+        withChannel {
+          Console.err.println("Received Nack for msg " + _unconfirmedMessages.get(seqNo) )
+          for { (_,msg) <- subset(seqNo,multiple) }
+            sendMessage(msg)
+        }
+      }
+    })
   }
 
   private[this] def withChannel[A]( next : => A) = {
@@ -106,8 +147,26 @@ class RabbitMQProducer extends Configurable {
 
   def sendMessage(payload:String) =
     withChannel {
+      var seq : Long = _channel.getNextPublishSeqNo()
+      _unconfirmedSet += seq
+      _unconfirmedMessages += ((seq,payload))
       _channel.basicPublish(_exchangeName,_routingKey,
-        MessageProperties.PERSISTENT_TEXT_PLAIN,
-        payload.getBytes)
+                            MessageProperties.PERSISTENT_TEXT_PLAIN,
+                            payload.getBytes)
     }
+}
+
+object RabbitMQProducer  {
+  def main(args: Array[String]) = {
+    val propsfile = new FileStreamResource(new File("aquarium.properties"))
+    var _props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
+    val aquarium = new AquariumBuilder(_props, ResourceLocator.DefaultPolicyModel).
+    update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
+    update(Aquarium.EnvKeys.eventsStoreFolder, Some(new File(".."))).
+    build()
+    aquarium(Aquarium.EnvKeys.rabbitMQProducer).sendMessage("Test string !!!!")
+    Console.err.println("Message sent")
+    aquarium.stop()
+    ()
+  }
 }
\ No newline at end of file
index 3df48a1..256ee21 100644 (file)
@@ -37,16 +37,17 @@ package gr.grnet.aquarium.service
 
 import com.ckkloverdos.props.Props
 import com.google.common.eventbus.Subscribe
-import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable}
+import gr.grnet.aquarium.{Aquarium, AquariumAwareSkeleton, Configurable}
 import gr.grnet.aquarium.converter.StdConverters
 import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
 import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle}
 import gr.grnet.aquarium.util.sameTags
-import gr.grnet.aquarium.service.event.{AquariumCreatedEvent, StoreIsAliveBusEvent, StoreIsDeadBusEvent}
+import event.{BalanceEvent, AquariumCreatedEvent, StoreIsAliveBusEvent, StoreIsDeadBusEvent}
 import gr.grnet.aquarium.connector.rabbitmq.service.PayloadHandlerPostNotifier
 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys.RabbitMQConfKeys
 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys
 import gr.grnet.aquarium.connector.handler.{SynchronousPayloadHandlerExecutor, ResourceEventPayloadHandler, IMEventPayloadHandler}
+import gr.grnet.aquarium.util.json.JsonSupport
 
 /**
  * The service that is responsible to handle `RabbitMQ` connecrivity.
@@ -199,6 +200,11 @@ class RabbitMQService extends Loggable with Lifecycle with Configurable with Aqu
   }
 
   @Subscribe
+  def handleUserBalance(event:BalanceEvent): Unit = {
+    aquarium(Aquarium.EnvKeys.rabbitMQProducer).sendMessage(event.toJsonString)
+  }
+
+  @Subscribe
   def handleStoreFailure(event: StoreIsDeadBusEvent): Unit = {
     val eventTag = event.tag