package gr.grnet.aquarium.service
-import gr.grnet.aquarium.Aquarium
import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle, Tag}
-import gr.grnet.aquarium.util.safeUnit
import java.util.concurrent.atomic.AtomicBoolean
import gr.grnet.aquarium.service.event.{StoreIsAliveBusEvent, StoreIsDeadBusEvent}
+import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable, Aquarium}
+import com.ckkloverdos.props.Props
+import gr.grnet.aquarium.store.StoreProvider
/**
* Watches for liveliness of stores.
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
-final class StoreWatcherService extends Lifecycle with Loggable {
+final class StoreWatcherService extends Lifecycle with Configurable with AquariumAwareSkeleton with Loggable {
+ private[this] var _reconnectPeriodMillis = 1000L
+
private[this] val _pingIsScheduled = new AtomicBoolean(false)
private[this] val _stopped = new AtomicBoolean(false)
private[this] val _rcIsAlive = new AtomicBoolean(true)
private[this] val _imIsAlive = new AtomicBoolean(true)
+ def propertyPrefix = Some(StoreProvider.Prefix)
+
+ /**
+ * Configure this instance with the provided properties.
+ *
+ * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
+ */
+ def configure(props: Props) = {
+ this._reconnectPeriodMillis = props.getLongEx(StoreProvider.Keys.reconnect_period_millis)
+ }
- def aquarium = Aquarium.Instance
+ private[this] def safePingStore(
+ tag: Tag,
+ pinger: () ⇒ Any,
+ getStatus: () ⇒ Boolean,
+ setStatus: (Boolean) ⇒ Any
+ ): Unit = {
- private[this] def safePingStore(tag: Tag,
- pinger: () ⇒ Any,
- getStatus: () ⇒ Boolean,
- setStatus: (Boolean) ⇒ Any): Unit = {
try {
val wasAlive = getStatus()
pinger()
// No exception happened, so we are alive
setStatus(true)
if(!wasAlive) {
- logger.info("Reconnected %s store".format(tag))
+ logger.info("Reconnected store for %s".format(tag))
aquarium.eventBus ! StoreIsAliveBusEvent(tag)
}
}
catch {
case e: Throwable ⇒
setStatus(false)
- logger.info("Store %s detected down".format(tag))
+ logger.info("Store for %s detected down".format(tag))
aquarium.eventBus ! StoreIsDeadBusEvent(tag)
}
}
- private[this] def doSchedulePing(tag: Tag,
- info: String,
- pinger: () ⇒ Any,
- getStatus: () ⇒ Boolean,
- setStatus: (Boolean) ⇒ Any): Unit = {
+ private[this] def doSchedulePing(
+ tag: Tag,
+ info: String,
+ pinger: () ⇒ Any,
+ getStatus: () ⇒ Boolean,
+ setStatus: (Boolean) ⇒ Any
+ ): Unit = {
+
aquarium.timerService.scheduleOnce(
info,
{
doSchedulePing(tag, info, pinger, getStatus, setStatus)
}
},
- 1000, // TODO: Get value from configuration
+ this._reconnectPeriodMillis,
true
)
}
def start(): Unit = {
if(!_pingIsScheduled.get()) {
// First time pings (once)
- safeUnit {
- safePingStore(
- Tags.ResourceEventTag,
- () ⇒ aquarium.resourceEventStore.pingResourceEventStore(),
- () ⇒ _rcIsAlive.get(),
- alive ⇒ _rcIsAlive.set(alive)
- )
- }
-
- safeUnit {
- safePingStore(
- Tags.IMEventTag,
- () ⇒ aquarium.resourceEventStore.pingResourceEventStore(),
- () ⇒ _imIsAlive.get(),
- alive ⇒ _imIsAlive.set(alive)
- )
- }
+ safePingStore(
+ Tags.ResourceEventTag,
+ () ⇒ aquarium.resourceEventStore.pingResourceEventStore(),
+ () ⇒ _rcIsAlive.get(),
+ alive ⇒ _rcIsAlive.set(alive)
+ )
+
+ safePingStore(
+ Tags.IMEventTag,
+ () ⇒ aquarium.resourceEventStore.pingResourceEventStore(),
+ () ⇒ _imIsAlive.get(),
+ alive ⇒ _imIsAlive.set(alive)
+ )
pingResourceEventStore()
pingIMEventStore()