import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
import gr.grnet.aquarium.actor.message.config.{InitializeUserActorState, AquariumPropertiesLoaded}
import gr.grnet.aquarium.util.date.TimeHelpers
-import gr.grnet.aquarium.event.model.im.{BalanceEvent, IMEventModel}
+import gr.grnet.aquarium.service.event.BalanceEvent
+import gr.grnet.aquarium.event.model.im.IMEventModel
import message._
import config.AquariumPropertiesLoaded
import config.InitializeUserActorState
computeBatch()
}
if(oldTotalCredits * this._workingUserState.totalCredits < 0)
- BalanceEvent.send(aquarium,this._workingUserState.userID,
- this._workingUserState.totalCredits>=0)
+ aquarium.eventBus ! new BalanceEvent(this._workingUserState.userID,
+ this._workingUserState.totalCredits>=0)
DEBUG("Updated %s", this._workingUserState)
logSeparator()
}
import java.io.File
import com.ckkloverdos.resource.FileStreamResource
import scala.Some
+import collection.immutable.{TreeMap, SortedSet, TreeSet}
+import java.util.Collections
/*
// Some(RabbitMQConfKeys.imevents_credit)
+ @volatile private[this] var _unconfirmedSet = new TreeSet[Long]()
+ @volatile private[this] var _unconfirmedMessages = new TreeMap[Long,String]()
+
def configure(props: Props): Unit = {
val propName = RabbitMQConfKeys.imevents_credit
def exn () = throw new AquariumInternalError(new Exception, "While obtaining value for key %s in properties".format(propName))
_factory.setVirtualHost(connectionConf(RabbitMQConKeys.vhost))
_factory.setRequestedHeartbeat(connectionConf(RabbitMQConKeys.reconnect_period_millis).toInt)
_servers = connectionConf(RabbitMQConKeys.servers)
+ _connection =_factory.newConnection(_servers)
+ _channel = _connection.createChannel
+ _channel.confirmSelect
+ _channel.addConfirmListener(new ConfirmListener {
+
+ private [this] def subset(seqNo:Long,multiple:Boolean) : TreeMap[Long,String] = {
+ val set = if (multiple)
+ _unconfirmedSet.range(0,seqNo+1)
+ else
+ _unconfirmedSet.range(seqNo,seqNo)
+ _unconfirmedSet = _unconfirmedSet -- set
+ val ret : TreeMap[Long,String] = set.foldLeft(TreeMap[Long,String]())({(map,seq)=>
+ _unconfirmedMessages.get(seq) match{
+ case None => map
+ case Some(s) => map + ((seq,s))
+ }})
+ _unconfirmedMessages = _unconfirmedMessages -- set
+ ret
+ }
+
+
+ def handleAck(seqNo:Long,multiple:Boolean) = {
+ withChannel {
+ Console.err.println("Received ack for msg " + _unconfirmedMessages.get(seqNo) )
+ subset(seqNo,multiple)
+ }
+ }
+
+ def handleNack(seqNo:Long,multiple:Boolean) = {
+ withChannel {
+ Console.err.println("Received Nack for msg " + _unconfirmedMessages.get(seqNo) )
+ for { (_,msg) <- subset(seqNo,multiple) }
+ sendMessage(msg)
+ }
+ }
+ })
}
private[this] def withChannel[A]( next : => A) = {
def sendMessage(payload:String) =
withChannel {
+ var seq : Long = _channel.getNextPublishSeqNo()
+ _unconfirmedSet += seq
+ _unconfirmedMessages += ((seq,payload))
_channel.basicPublish(_exchangeName,_routingKey,
- MessageProperties.PERSISTENT_TEXT_PLAIN,
- payload.getBytes)
+ MessageProperties.PERSISTENT_TEXT_PLAIN,
+ payload.getBytes)
}
+}
+
+object RabbitMQProducer {
+ def main(args: Array[String]) = {
+ val propsfile = new FileStreamResource(new File("aquarium.properties"))
+ var _props: Props = Props(propsfile)(StdConverters.AllConverters).getOr(Props()(StdConverters.AllConverters))
+ val aquarium = new AquariumBuilder(_props, ResourceLocator.DefaultPolicyModel).
+ update(Aquarium.EnvKeys.storeProvider, new MemStoreProvider).
+ update(Aquarium.EnvKeys.eventsStoreFolder, Some(new File(".."))).
+ build()
+ aquarium(Aquarium.EnvKeys.rabbitMQProducer).sendMessage("Test string !!!!")
+ Console.err.println("Message sent")
+ aquarium.stop()
+ ()
+ }
}
\ No newline at end of file
import com.ckkloverdos.props.Props
import com.google.common.eventbus.Subscribe
-import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable}
+import gr.grnet.aquarium.{Aquarium, AquariumAwareSkeleton, Configurable}
import gr.grnet.aquarium.converter.StdConverters
import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle}
import gr.grnet.aquarium.util.sameTags
-import gr.grnet.aquarium.service.event.{AquariumCreatedEvent, StoreIsAliveBusEvent, StoreIsDeadBusEvent}
+import event.{BalanceEvent, AquariumCreatedEvent, StoreIsAliveBusEvent, StoreIsDeadBusEvent}
import gr.grnet.aquarium.connector.rabbitmq.service.PayloadHandlerPostNotifier
import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys.RabbitMQConfKeys
import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys
import gr.grnet.aquarium.connector.handler.{SynchronousPayloadHandlerExecutor, ResourceEventPayloadHandler, IMEventPayloadHandler}
+import gr.grnet.aquarium.util.json.JsonSupport
/**
* The service that is responsible to handle `RabbitMQ` connecrivity.
}
@Subscribe
+ def handleUserBalance(event:BalanceEvent): Unit = {
+ aquarium(Aquarium.EnvKeys.rabbitMQProducer).sendMessage(event.toJsonString)
+ }
+
+ @Subscribe
def handleStoreFailure(event: StoreIsDeadBusEvent): Unit = {
val eventTag = event.tag