protected def persisterManager: PersisterManager
protected def queueReaderManager: QueueReaderManager
-
+
+ protected val numCPUs = Runtime.getRuntime.availableProcessors
+
def start(): Unit
def stop() : Unit
if (isRedeliver) {
//Message could not be processed 3 times, just ignore it
if (redeliveries.contains(event.id)) {
- logger.warn("Event[%s] msg[%d] redelivered >2 times. Rejecting".format(event, deliveryTag))
+ logger.warn("Actor[%s] - Event[%s] msg[%d] redelivered >2 times. Rejecting".format(self.getUuid(), event, deliveryTag))
queue ! Reject(deliveryTag, false)
redeliveries.remove(event.id)
inFlightEvents.remove(deliveryTag)
}
case PersistOK(ackData) =>
- logger.debug("Stored event[%s] msg[%d] - %d left".format(ackData.msgId, ackData.deliveryTag, inFlightEvents.size))
+ logger.debug("Actor[%s] - Stored event[%s] msg[%d] - %d left".format(self.getUuid(), ackData.msgId, ackData.deliveryTag, inFlightEvents.size))
ackData.queue ! Acknowledge(ackData.deliveryTag)
case PersistFailed(ackData) =>
//Give the message a chance to be processed by other processors
- logger.debug("Storing event[%s] msg[%d] failed".format(ackData.msgId, ackData.deliveryTag))
+ logger.error("Actor[%s] - Storing event[%s] msg[%d] failed".format(self.getUuid(), ackData.msgId, ackData.deliveryTag))
inFlightEvents.remove(ackData.deliveryTag)
ackData.queue ! Reject(ackData.deliveryTag, true)
case Duplicate(ackData) =>
- logger.debug("Event[%s] msg[%d] is setRcvMillis".format(ackData.msgId, ackData.deliveryTag))
+ logger.debug("Actor[%s] - Event[%s] msg[%d] is duplicate".format(self.getUuid(), ackData.msgId, ackData.deliveryTag))
inFlightEvents.remove(ackData.deliveryTag)
ackData.queue ! Reject(ackData.deliveryTag, false)
case Acknowledged(deliveryTag) =>
- logger.debug("Msg with tag [%d] acked. Forwarding...".format(deliveryTag))
+ logger.debug("Actor[%s] - Msg with tag [%d] acked. Forwarding...".format(self.getUuid(), deliveryTag))
forward(inFlightEvents.remove(deliveryTag))
case Rejected(deliveryTag) =>
- logger.debug("Msg with tag [%d] rejected".format(deliveryTag))
+ logger.debug("Actor[%s] - Msg with tag [%d] rejected".format(self.getUuid(), deliveryTag))
case _ => logger.warn("Unknown message")
}
+ override def preStart = {
+ logger.debug("Starting actor QueueReader-%s".format(self.getUuid()))
+ super.preStart
+ }
+
self.dispatcher = queueReaderManager.dispatcher
}
def receive = {
case Persist(event, sender, ackData) =>
+ logger.debug("Persister-%s attempting store".format(self.getUuid()))
if (exists(event))
sender ! Duplicate(ackData)
else if (persist(event)) {
case _ => logger.warn("Unknown message")
}
+ override def preStart = {
+ logger.debug("Starting actor Persister-%s".format(self.getUuid()))
+ super.preStart
+ }
+
self.dispatcher = persisterManager.dispatcher
}
class QueueReaderManager {
- val numCPUs = Runtime.getRuntime.availableProcessors
lazy val lb = loadBalancerActor(new CyclicIterator(actors))
lazy val dispatcher =
Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "-queuereader")
.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(1000)
- .setMaxPoolSize(queueReaderThreads)
- .setCorePoolSize(1)
+ .setMaxPoolSize(2 * numCPUs)
+ .setCorePoolSize(queueReaderThreads)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy).build
lazy val actors =
- for (i <- 0 until numCPUs) yield {
+ for (i <- 0 until 4 * numCPUs) yield {
val actor = actorOf(new QueueReader)
supervisor.link(actor)
actor.start()
}
class PersisterManager {
- val numCPUs = Runtime.getRuntime.availableProcessors
lazy val lb = loadBalancerActor(new CyclicIterator(actors))
val dispatcher =
Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "-persister")
.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(1000)
- .setMaxPoolSize(persisterThreads)
- .setCorePoolSize(1)
+ .setMaxPoolSize(2 * numCPUs)
+ .setCorePoolSize(persisterThreads)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy).build
lazy val actors =
- for (i <- 0 until numCPUs) yield {
+ for (i <- 0 until 5 * numCPUs) yield {
val actor = actorOf(new Persister)
supervisor.link(actor)
actor.start()
package gr.grnet.aquarium.processor.actor
import com.ckkloverdos.maybe.{Just, Failed, NoVal}
-import gr.grnet.aquarium.actor.DispatcherRole
import gr.grnet.aquarium.messaging.MessagingNames
import gr.grnet.aquarium.logic.events.{AquariumEvent, ResourceEvent}
override def decode(data: Array[Byte]) : AquariumEvent = ResourceEvent.fromBytes(data)
- override def forward(evt: AquariumEvent): Unit = {
+ /*override def forward(evt: AquariumEvent): Unit = {
val resourceEvent = evt.asInstanceOf[ResourceEvent]
val businessLogicDispacther = _configurator.actorProvider.actorForRole(DispatcherRole)
businessLogicDispacther ! ProcessResourceEvent(resourceEvent)
- }
+ }*/
+
+ override def forward(evt: AquariumEvent): Unit = {}
override def exists(event: AquariumEvent): Boolean =
_configurator.resourceEventStore.findResourceEventById(event.id).isJust
_configurator.resourceEventStore.storeResourceEvent(event) match {
case Just(x) => true
case x: Failed =>
- logger.error("Could not save event: %s".format(event))
+ logger.error("Could not save event: %s. Reason:".format(event, x.toString))
false
case NoVal => false
}
}
- override def queueReaderThreads: Int = 1
- override def persisterThreads: Int = 2
+ override def queueReaderThreads: Int = 4
+ override def persisterThreads: Int = numCPUs
override def name = "resevtproc"
- override def persisterManager = new PersisterManager
- override def queueReaderManager = new QueueReaderManager
+ val persister = new PersisterManager
+ val queueReader = new QueueReaderManager
+
+ override def persisterManager = persister
+ override def queueReaderManager = queueReader
def start() {
logger.info("Starting resource event processor service")
import gr.grnet.aquarium.logic.events.{UserEvent, AquariumEvent}
import com.ckkloverdos.maybe.{NoVal, Failed, Just}
+
/**
* An event processor service for user events coming from the IM system
*
* @author Georgios Gousios <gousiosg@gmail.com>
*/
-final class UserEventProcessorService extends EventProcessorService {
+class UserEventProcessorService extends EventProcessorService {
override def decode(data: Array[Byte]) = UserEvent.fromBytes(data)
}
}
- override def queueReaderThreads: Int = 1
- override def persisterThreads: Int = 2
+ override def queueReaderThreads: Int = 4
+ override def persisterThreads: Int = numCPUs
override def name = "usrevtproc"
- override def persisterManager = new PersisterManager
- override def queueReaderManager = new QueueReaderManager
+ lazy val persister = new PersisterManager
+ lazy val queueReader = new QueueReaderManager
+
+ override def persisterManager = persister
+ override def queueReaderManager = queueReader
def start() {
logger.info("Starting user event processor service")