import com.ckkloverdos.props.Props
import com.ckkloverdos.maybe.{Maybe, Failed, Just, NoVal}
import com.ckkloverdos.convert.Converters.{DefaultConverters => TheDefaultConverters}
-import processor.actor.ConfigureDispatcher
+import processor.actor.{ResourceEventProcessorService, ConfigureDispatcher}
import rest.RESTService
import store.{StoreProvider, EventStore, UserStore}
import util.Loggable
}
}
+ private[this] val _resEventProc: ResourceEventProcessorService = {
+ new ResourceEventProcessorService()
+ }
+
def get(prop: String): String =
props.get(prop) match {
case Just(y) => y
def startServices(): Unit = {
_restService.start()
_actorProvider.start()
+ _resEventProc.start()
_actorProvider.actorForRole(DispatcherRole) ! ConfigureDispatcher(this)
}
def stopServices(): Unit = {
+ _resEventProc.stop()
_restService.stop()
_actorProvider.stop()
-
+
+
// akka.actor.Actor.registry.shutdownAll()
}
+package gr.grnet.aquarium.processor.actor
+
/*
* Copyright 2011 GRNET S.A. All rights reserved.
*
* or implied, of GRNET S.A.
*/
-package gr.grnet.aquarium.actors
-
import gr.grnet.aquarium.messaging.AkkaAMQP
import akka.actor.{ActorRef, Actor}
import gr.grnet.aquarium.logic.events.ResourceEvent
import akka.amqp.{Reject, Acknowledge, Acknowledged, Delivery}
-import gr.grnet.aquarium.util.Loggable
import com.ckkloverdos.maybe.{NoVal, Failed, Just}
-import gr.grnet.aquarium.MasterConf
+import gr.grnet.aquarium.{MasterConf}
+import gr.grnet.aquarium.util.{Lifecycle, Loggable}
/**
* An actor that gets events from the queue, stores them persistently
*
* @author Georgios Gousios <gousiosg@gmail.com>
*/
-class EventProcessor extends AkkaAMQP with Loggable {
+class ResourceEventProcessorService extends AkkaAMQP with Loggable
+ with Lifecycle {
+
+ private var persister: ActorRef = _
+ private var queueReader: ActorRef = _
case class AckData(deliveryTag: Long, queue: ActorRef)
case class Persist(event: ResourceEvent, sender: ActorRef, ackData: AckData)
}
}
- def init() = {
+ def start() {
+ logger.info("Starting resource event processor service")
- val persister = Actor.actorOf(new Persister)
- val queueReader = Actor.actorOf(new QueueReader(persister))
+ persister = Actor.actorOf(new Persister)
+ queueReader = Actor.actorOf(new QueueReader(persister))
queueReader.link(persister)
queueReader.start()
persister.start()
-
+
consumer("event.#", "resource-events", "aquarium", queueReader, false)
}
+
+ def stop() {
+ logger.info("Stopping resource event processor service")
+
+ persister.stop()
+ queueReader.stop()
+ }
}
\ No newline at end of file