Refactor Aquarium to make it more configurable
[aquarium] / src / main / scala / gr / grnet / aquarium / service / StoreWatcherService.scala
index 2d501a1..271a409 100644 (file)
 
 package gr.grnet.aquarium.service
 
-import gr.grnet.aquarium.Aquarium
 import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle, Tag}
-import gr.grnet.aquarium.util.safeUnit
 import java.util.concurrent.atomic.AtomicBoolean
 import gr.grnet.aquarium.service.event.{StoreIsAliveBusEvent, StoreIsDeadBusEvent}
+import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable, Aquarium}
+import com.ckkloverdos.props.Props
+import gr.grnet.aquarium.store.StoreProvider
 
 /**
  * Watches for liveliness of stores.
@@ -47,42 +48,58 @@ import gr.grnet.aquarium.service.event.{StoreIsAliveBusEvent, StoreIsDeadBusEven
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 
-final class StoreWatcherService extends Lifecycle with Loggable {
+final class StoreWatcherService extends Lifecycle with Configurable with AquariumAwareSkeleton with Loggable {
+  private[this] var _reconnectPeriodMillis = 1000L
+
   private[this] val _pingIsScheduled = new AtomicBoolean(false)
   private[this] val _stopped = new AtomicBoolean(false)
   private[this] val _rcIsAlive = new AtomicBoolean(true)
   private[this] val _imIsAlive = new AtomicBoolean(true)
 
+  def propertyPrefix = Some(StoreProvider.Prefix)
+
+  /**
+   * Configure this instance with the provided properties.
+   *
+   * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
+   */
+  def configure(props: Props) = {
+    this._reconnectPeriodMillis = props.getLongEx(StoreProvider.Keys.reconnect_period_millis)
+  }
 
-  def aquarium = Aquarium.Instance
+  private[this] def safePingStore(
+      tag: Tag,
+      pinger: () ⇒ Any,
+      getStatus: () ⇒ Boolean,
+      setStatus: (Boolean) ⇒ Any
+  ): Unit = {
 
-  private[this] def safePingStore(tag: Tag,
-                                  pinger: () ⇒ Any,
-                                  getStatus: () ⇒ Boolean,
-                                  setStatus: (Boolean) ⇒ Any): Unit = {
     try {
       val wasAlive = getStatus()
       pinger()
       // No exception happened, so we are alive
       setStatus(true)
       if(!wasAlive) {
-        logger.info("Reconnected %s store".format(tag))
+        logger.info("Reconnected store for %s".format(tag))
         aquarium.eventBus ! StoreIsAliveBusEvent(tag)
       }
     }
     catch {
       case e: Throwable ⇒
         setStatus(false)
-        logger.info("Store %s detected down".format(tag))
+        logger.info("Store for %s detected down".format(tag))
         aquarium.eventBus ! StoreIsDeadBusEvent(tag)
     }
   }
 
-  private[this] def doSchedulePing(tag: Tag,
-                                   info: String,
-                                   pinger: () ⇒ Any,
-                                   getStatus: () ⇒ Boolean,
-                                   setStatus: (Boolean) ⇒ Any): Unit = {
+  private[this] def doSchedulePing(
+      tag: Tag,
+      info: String,
+      pinger: () ⇒ Any,
+      getStatus: () ⇒ Boolean,
+      setStatus: (Boolean) ⇒ Any
+  ): Unit = {
+
     aquarium.timerService.scheduleOnce(
       info,
       {
@@ -93,7 +110,7 @@ final class StoreWatcherService extends Lifecycle with Loggable {
           doSchedulePing(tag, info, pinger, getStatus, setStatus)
         }
       },
-      1000, // TODO: Get value from configuration
+      this._reconnectPeriodMillis,
       true
     )
   }
@@ -129,23 +146,19 @@ final class StoreWatcherService extends Lifecycle with Loggable {
   def start(): Unit = {
     if(!_pingIsScheduled.get()) {
       // First time pings (once)
-      safeUnit {
-        safePingStore(
-          Tags.ResourceEventTag,
-          () ⇒ aquarium.resourceEventStore.pingResourceEventStore(),
-          () ⇒ _rcIsAlive.get(),
-          alive ⇒ _rcIsAlive.set(alive)
-        )
-      }
-
-      safeUnit {
-        safePingStore(
-          Tags.IMEventTag,
-          () ⇒ aquarium.resourceEventStore.pingResourceEventStore(),
-          () ⇒ _imIsAlive.get(),
-          alive ⇒ _imIsAlive.set(alive)
-        )
-      }
+      safePingStore(
+        Tags.ResourceEventTag,
+        () ⇒ aquarium.resourceEventStore.pingResourceEventStore(),
+        () ⇒ _rcIsAlive.get(),
+        alive ⇒ _rcIsAlive.set(alive)
+      )
+
+      safePingStore(
+        Tags.IMEventTag,
+        () ⇒ aquarium.resourceEventStore.pingResourceEventStore(),
+        () ⇒ _imIsAlive.get(),
+        alive ⇒ _imIsAlive.set(alive)
+      )
 
       pingResourceEventStore()
       pingIMEventStore()