Separate rabbitmq configuration keys from the rabbitmq service
[aquarium] / src / main / scala / gr / grnet / aquarium / service / StoreWatcherService.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.service
37
38 import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle, Tag}
39 import java.util.concurrent.atomic.AtomicBoolean
40 import gr.grnet.aquarium.service.event.{StoreIsAliveBusEvent, StoreIsDeadBusEvent}
41 import gr.grnet.aquarium.{Configurable, Aquarium}
42 import com.ckkloverdos.props.Props
43 import gr.grnet.aquarium.store.StoreProvider
44
45 /**
46  * Watches for liveliness of stores.
47  *
48  * @author Christos KK Loverdos <loverdos@gmail.com>
49  */
50
51 final class StoreWatcherService extends Lifecycle with Configurable with Loggable {
52   private[this] var _reconnectPeriodMillis = 1000L
53
54   private[this] val _pingIsScheduled = new AtomicBoolean(false)
55   private[this] val _stopped = new AtomicBoolean(false)
56   private[this] val _rcIsAlive = new AtomicBoolean(true)
57   private[this] val _imIsAlive = new AtomicBoolean(true)
58
59   def aquarium = Aquarium.Instance
60
61
62   def propertyPrefix = Some(StoreProvider.Prefix)
63
64   /**
65    * Configure this instance with the provided properties.
66    *
67    * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
68    */
69   def configure(props: Props) = {
70     this._reconnectPeriodMillis = props.getLongEx(StoreProvider.Keys.reconnect_period_millis)
71   }
72
73   private[this] def safePingStore(tag: Tag,
74                                   pinger: () ⇒ Any,
75                                   getStatus: () ⇒ Boolean,
76                                   setStatus: (Boolean) ⇒ Any): Unit = {
77     try {
78       val wasAlive = getStatus()
79       pinger()
80       // No exception happened, so we are alive
81       setStatus(true)
82       if(!wasAlive) {
83         logger.info("Reconnected store for %s".format(tag))
84         aquarium.eventBus ! StoreIsAliveBusEvent(tag)
85       }
86     }
87     catch {
88       case e: Throwable ⇒
89         setStatus(false)
90         logger.info("Store for %s detected down".format(tag))
91         aquarium.eventBus ! StoreIsDeadBusEvent(tag)
92     }
93   }
94
95   private[this] def doSchedulePing(tag: Tag,
96                                    info: String,
97                                    pinger: () ⇒ Any,
98                                    getStatus: () ⇒ Boolean,
99                                    setStatus: (Boolean) ⇒ Any): Unit = {
100     aquarium.timerService.scheduleOnce(
101       info,
102       {
103         if(!aquarium.isStopping() && !_stopped.get()) {
104 //          logger.debug("Pinging %s store".format(tag))
105           safePingStore(tag, pinger, getStatus, setStatus)
106
107           doSchedulePing(tag, info, pinger, getStatus, setStatus)
108         }
109       },
110       this._reconnectPeriodMillis,
111       true
112     )
113   }
114
115   def pingResourceEventStore(): Unit = {
116     val tag = Tags.ResourceEventTag
117
118     logger.info("Scheduling ping for %s store".format(tag))
119
120     doSchedulePing(
121       tag,
122       aquarium.resourceEventStore.toString,
123       () ⇒ aquarium.resourceEventStore.pingResourceEventStore(),
124       () ⇒ _rcIsAlive.get(),
125       alive ⇒ _rcIsAlive.set(alive)
126     )
127   }
128
129   def pingIMEventStore(): Unit = {
130     val tag = Tags.IMEventTag
131
132     logger.info("Scheduling ping for %s store".format(tag))
133
134     doSchedulePing(
135       tag,
136       aquarium.imEventStore.toString,
137       () ⇒ aquarium.imEventStore.pingIMEventStore(),
138       () ⇒ _imIsAlive.get(),
139       alive ⇒ _imIsAlive.set(alive)
140     )
141   }
142
143   def start(): Unit = {
144     if(!_pingIsScheduled.get()) {
145       // First time pings (once)
146       safePingStore(
147         Tags.ResourceEventTag,
148         () ⇒ aquarium.resourceEventStore.pingResourceEventStore(),
149         () ⇒ _rcIsAlive.get(),
150         alive ⇒ _rcIsAlive.set(alive)
151       )
152
153       safePingStore(
154         Tags.IMEventTag,
155         () ⇒ aquarium.resourceEventStore.pingResourceEventStore(),
156         () ⇒ _imIsAlive.get(),
157         alive ⇒ _imIsAlive.set(alive)
158       )
159
160       pingResourceEventStore()
161       pingIMEventStore()
162
163       _stopped.set(false)
164     }
165   }
166
167   def stop(): Unit = {
168     _stopped.set(true)
169   }
170 }