Support for starting the event reader as a service
authorGeorgios Gousios <gousiosg@gmail.com>
Thu, 15 Dec 2011 14:41:20 +0000 (16:41 +0200)
committerGeorgios Gousios <gousiosg@gmail.com>
Thu, 15 Dec 2011 15:20:52 +0000 (17:20 +0200)
logic/src/main/scala/gr/grnet/aquarium/MasterConf.scala
logic/src/main/scala/gr/grnet/aquarium/processor/actor/ResourceEventProcessorService.scala [moved from logic/src/main/scala/gr/grnet/aquarium/actors/EventProcessor.scala with 86% similarity]

index e5b8126..132f210 100644 (file)
@@ -41,7 +41,7 @@ import com.ckkloverdos.sys.SysProp
 import com.ckkloverdos.props.Props
 import com.ckkloverdos.maybe.{Maybe, Failed, Just, NoVal}
 import com.ckkloverdos.convert.Converters.{DefaultConverters => TheDefaultConverters}
-import processor.actor.ConfigureDispatcher
+import processor.actor.{ResourceEventProcessorService, ConfigureDispatcher}
 import rest.RESTService
 import store.{StoreProvider, EventStore, UserStore}
 import util.Loggable
@@ -118,6 +118,10 @@ class MasterConf(val props: Props) extends Loggable {
     }
   }
 
+  private[this] val _resEventProc: ResourceEventProcessorService = {
+    new ResourceEventProcessorService()
+  }
+
   def get(prop: String): String =
     props.get(prop) match {
       case Just(y) => y
@@ -129,14 +133,17 @@ class MasterConf(val props: Props) extends Loggable {
   def startServices(): Unit = {
     _restService.start()
     _actorProvider.start()
+    _resEventProc.start()
 
     _actorProvider.actorForRole(DispatcherRole) ! ConfigureDispatcher(this)
   }
 
   def stopServices(): Unit = {
+    _resEventProc.stop()
     _restService.stop()
     _actorProvider.stop()
-    
+
+
 //    akka.actor.Actor.registry.shutdownAll()
   }
 
@@ -1,3 +1,5 @@
+package gr.grnet.aquarium.processor.actor
+
 /*
  * Copyright 2011 GRNET S.A. All rights reserved.
  *
  * or implied, of GRNET S.A.
  */
 
-package gr.grnet.aquarium.actors
-
 import gr.grnet.aquarium.messaging.AkkaAMQP
 import akka.actor.{ActorRef, Actor}
 import gr.grnet.aquarium.logic.events.ResourceEvent
 import akka.amqp.{Reject, Acknowledge, Acknowledged, Delivery}
-import gr.grnet.aquarium.util.Loggable
 import com.ckkloverdos.maybe.{NoVal, Failed, Just}
-import gr.grnet.aquarium.MasterConf
+import gr.grnet.aquarium.{MasterConf}
+import gr.grnet.aquarium.util.{Lifecycle, Loggable}
 
 /**
  * An actor that gets events from the queue, stores them persistently
@@ -49,7 +49,11 @@ import gr.grnet.aquarium.MasterConf
  *
  * @author Georgios Gousios <gousiosg@gmail.com>
  */
-class EventProcessor extends AkkaAMQP with Loggable {
+class ResourceEventProcessorService extends AkkaAMQP with Loggable
+  with Lifecycle {
+
+  private var persister: ActorRef = _
+  private var queueReader: ActorRef = _
 
   case class AckData(deliveryTag: Long, queue: ActorRef)
   case class Persist(event: ResourceEvent, sender: ActorRef, ackData: AckData)
@@ -106,16 +110,24 @@ class EventProcessor extends AkkaAMQP with Loggable {
     }
   }
 
-  def init() = {
+  def start() {
+    logger.info("Starting resource event processor service")
 
-    val persister = Actor.actorOf(new Persister)
-    val queueReader = Actor.actorOf(new QueueReader(persister))
+    persister = Actor.actorOf(new Persister)
+    queueReader = Actor.actorOf(new QueueReader(persister))
 
     queueReader.link(persister)
 
     queueReader.start()
     persister.start()
-    
+
     consumer("event.#", "resource-events", "aquarium", queueReader, false)
   }
+
+  def stop() {
+    logger.info("Stopping resource event processor service")
+
+    persister.stop()
+    queueReader.stop()
+  }
 }
\ No newline at end of file