From: Prodromos Gerakios Date: Mon, 6 Aug 2012 12:25:55 +0000 (+0300) Subject: Reliable message passing for RabbitMQProducer X-Git-Url: https://code.grnet.gr/git/aquarium/commitdiff_plain/2bfa85c75d1a20a2e54370fffb36b68171790962 Reliable message passing for RabbitMQProducer --- diff --git a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala index a8db62b..6f974e6 100644 --- a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala +++ b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala @@ -42,7 +42,8 @@ import gr.grnet.aquarium.actor._ import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent} import gr.grnet.aquarium.actor.message.config.{InitializeUserActorState, AquariumPropertiesLoaded} import gr.grnet.aquarium.util.date.TimeHelpers -import gr.grnet.aquarium.event.model.im.{BalanceEvent, IMEventModel} +import gr.grnet.aquarium.service.event.BalanceEvent +import gr.grnet.aquarium.event.model.im.IMEventModel import message._ import config.AquariumPropertiesLoaded import config.InitializeUserActorState @@ -401,8 +402,8 @@ class UserActor extends ReflectiveRoleableActor { computeBatch() } if(oldTotalCredits * this._workingUserState.totalCredits < 0) - BalanceEvent.send(aquarium,this._workingUserState.userID, - this._workingUserState.totalCredits>=0) + aquarium.eventBus ! new BalanceEvent(this._workingUserState.userID, + this._workingUserState.totalCredits>=0) DEBUG("Updated %s", this._workingUserState) logSeparator() } diff --git a/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQProducer.scala b/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQProducer.scala index b77af94..abc4d62 100644 --- a/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQProducer.scala +++ b/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQProducer.scala @@ -14,6 +14,8 @@ import gr.grnet.aquarium.store.memory.MemStoreProvider import java.io.File import com.ckkloverdos.resource.FileStreamResource import scala.Some +import collection.immutable.{TreeMap, SortedSet, TreeSet} +import java.util.Collections /* @@ -70,6 +72,9 @@ class RabbitMQProducer extends Configurable { // Some(RabbitMQConfKeys.imevents_credit) + @volatile private[this] var _unconfirmedSet = new TreeSet[Long]() + @volatile private[this] var _unconfirmedMessages = new TreeMap[Long,String]() + def configure(props: Props): Unit = { val propName = RabbitMQConfKeys.imevents_credit def exn () = throw new AquariumInternalError(new Exception, "While obtaining value for key %s in properties".format(propName)) @@ -86,6 +91,42 @@ class RabbitMQProducer extends Configurable { _factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost)) _factory.setRequestedHeartbeat(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt) _servers = connectionConf(RabbitMQConKeys.servers) + _connection =_factory.newConnection(_servers) + _channel = _connection.createChannel + _channel.confirmSelect + _channel.addConfirmListener(new ConfirmListener { + + private [this] def subset(seqNo:Long,multiple:Boolean) : TreeMap[Long,String] = { + val set = if (multiple) + _unconfirmedSet.range(0,seqNo+1) + else + _unconfirmedSet.range(seqNo,seqNo) + _unconfirmedSet = _unconfirmedSet -- set + val ret : TreeMap[Long,String] = set.foldLeft(TreeMap[Long,String]())({(map,seq)=> + _unconfirmedMessages.get(seq) match{ + case None => map + case Some(s) => map + ((seq,s)) + }}) + _unconfirmedMessages = _unconfirmedMessages -- set + ret + } + + + def handleAck(seqNo:Long,multiple:Boolean) = { + withChannel { + Console.err.println("Received ack for msg " + _unconfirmedMessages.get(seqNo) ) + subset(seqNo,multiple) + } + } + + def handleNack(seqNo:Long,multiple:Boolean) = { + withChannel { + Console.err.println("Received Nack for msg " + _unconfirmedMessages.get(seqNo) ) + for { (_,msg) <- subset(seqNo,multiple) } + sendMessage(msg) + } + } + }) } private[this] def withChannel[A]( next : => A) = { @@ -106,8 +147,26 @@ class RabbitMQProducer extends Configurable { def sendMessage(payload:String) = withChannel { + var seq : Long = _channel.getNextPublishSeqNo() + _unconfirmedSet += seq + _unconfirmedMessages += ((seq,payload)) _channel.basicPublish(_exchangeName,_routingKey, - MessageProperties.PERSISTENT_TEXT_PLAIN, - payload.getBytes) + MessageProperties.PERSISTENT_TEXT_PLAIN, + payload.getBytes) } +} + +object RabbitMQProducer { + def main(args: Array[String]) = { + val propsfile = new FileStreamResource(new File("aquarium.properties")) + var _props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters)) + val aquarium = new AquariumBuilder(_props, ResourceLocator.DefaultPolicyModel). + update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider). + update(Aquarium.EnvKeys.eventsStoreFolder, Some(new File(".."))). + build() + aquarium(Aquarium.EnvKeys.rabbitMQProducer).sendMessage("Test string !!!!") + Console.err.println("Message sent") + aquarium.stop() + () + } } \ No newline at end of file diff --git a/src/main/scala/gr/grnet/aquarium/service/RabbitMQService.scala b/src/main/scala/gr/grnet/aquarium/service/RabbitMQService.scala index 3df48a1..256ee21 100644 --- a/src/main/scala/gr/grnet/aquarium/service/RabbitMQService.scala +++ b/src/main/scala/gr/grnet/aquarium/service/RabbitMQService.scala @@ -37,16 +37,17 @@ package gr.grnet.aquarium.service import com.ckkloverdos.props.Props import com.google.common.eventbus.Subscribe -import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable} +import gr.grnet.aquarium.{Aquarium, AquariumAwareSkeleton, Configurable} import gr.grnet.aquarium.converter.StdConverters import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle} import gr.grnet.aquarium.util.sameTags -import gr.grnet.aquarium.service.event.{AquariumCreatedEvent, StoreIsAliveBusEvent, StoreIsDeadBusEvent} +import event.{BalanceEvent, AquariumCreatedEvent, StoreIsAliveBusEvent, StoreIsDeadBusEvent} import gr.grnet.aquarium.connector.rabbitmq.service.PayloadHandlerPostNotifier import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys.RabbitMQConfKeys import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys import gr.grnet.aquarium.connector.handler.{SynchronousPayloadHandlerExecutor, ResourceEventPayloadHandler, IMEventPayloadHandler} +import gr.grnet.aquarium.util.json.JsonSupport /** * The service that is responsible to handle `RabbitMQ` connecrivity. @@ -199,6 +200,11 @@ class RabbitMQService extends Loggable with Lifecycle with Configurable with Aqu } @Subscribe + def handleUserBalance(event:BalanceEvent): Unit = { + aquarium(Aquarium.EnvKeys.rabbitMQProducer).sendMessage(event.toJsonString) + } + + @Subscribe def handleStoreFailure(event: StoreIsDeadBusEvent): Unit = { val eventTag = event.tag