class ResourceEventProcessorService extends AkkaAMQP with Loggable
with Lifecycle {
- case class AckData(deliveryTag: Long, queue: ActorRef)
+ case class AckData(msgId: String, deliveryTag: Long, queue: ActorRef)
case class Persist(event: ResourceEvent, sender: ActorRef, ackData: AckData)
case class PersistOK(ackData: AckData)
case class PersistFailed(ackData: AckData)
def receive = {
case Delivery(payload, _, deliveryTag, isRedeliver, _, queue) =>
val event = ResourceEvent.fromBytes(payload)
- PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(deliveryTag, queue.get))
+ PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
case PersistOK(ackData) =>
- logger.debug("Stored res event:%s".format(ackData.deliveryTag))
+ logger.debug("Stored event:%s".format(ackData.msgId))
ackData.queue ! Acknowledge(ackData.deliveryTag)
case PersistFailed(ackData) =>
- logger.debug("Storing res event:%s failed".format(ackData.deliveryTag))
+ logger.debug("Storing event:%s failed".format(ackData.msgId))
ackData.queue ! Reject(ackData.deliveryTag, true)
case Duplicate(ackData) =>
- logger.debug("Res event:%s is duplicate".format(ackData.deliveryTag))
+ logger.debug("Event:%s is duplicate".format(ackData.msgId))
ackData.queue ! Reject(ackData.deliveryTag, false)
case Acknowledged(deliveryTag) =>
object QueueReaderManager {
val numCPUs = Runtime.getRuntime.availableProcessors
var actors: List[ActorRef] = _
-
- // sets up load balancing among the actors created above to allow multithreading
lazy val lb = loadBalancerActor(new CyclicIterator(actors))
lazy val dispatcher =
}.toList
}
- def stop() = {
- actors.foreach(a => a.stop)
- }
+ def stop() = dispatcher.stopAllAttachedActors
}
object PersisterManager {
}.toList
}
- def stop() = {
- actors.foreach(a => a.stop)
- }
+ def stop() = dispatcher.stopAllAttachedActors
}
def start() {
QueueReaderManager.start()
PersisterManager.start()
-
consumer("event.#", "resource-events", "aquarium", QueueReaderManager.lb, false)
}