1b86d54f15ed6a71631c9c1bfc0a529884c19f13
[aquarium] / src / main / scala / gr / grnet / aquarium / service / AkkaService.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.service
37
38 import akka.actor.{Props, ActorRef, ActorSystem}
39 import gr.grnet.aquarium.util.{Loggable, Lifecycle, shortClassNameOf}
40 import gr.grnet.aquarium.ResourceLocator.SysEnvs
41 import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable, AquariumException, AquariumInternalError}
42 import com.typesafe.config.ConfigFactory
43 import java.util.concurrent.atomic.AtomicBoolean
44 import com.google.common.cache.{CacheStats, RemovalNotification, RemovalListener, CacheBuilder, Cache}
45 import com.ckkloverdos.props.{Props ⇒ KKProps}
46 import gr.grnet.aquarium.actor.service.user.UserActor
47 import gr.grnet.aquarium.service.event.AquariumCreatedEvent
48 import gr.grnet.aquarium.actor.message.config.InitializeUserState
49 import gr.grnet.aquarium.util.date.TimeHelpers
50 import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Callable}
51 import akka.dispatch.{Await, Future}
52 import akka.util.Duration
53
54 /**
55  * A wrapper around Akka, so that it is uniformly treated as an Aquarium service.
56  *
57  * @author Christos KK Loverdos <loverdos@gmail.com>
58  */
59
60 final class AkkaService extends AquariumAwareSkeleton with Configurable with Lifecycle with Loggable {
61   @volatile private[this] var _actorSystem: ActorSystem = _
62   @volatile private[this] var _userActorCache: Cache[String, ActorRef] = _
63   @volatile private[this] var _cacheEvictionListener: RemovalListener[String, ActorRef] = _
64   @volatile private[this] var _cacheMaximumSize: Int = _
65   @volatile private[this] var _cacheInitialCapacity: Int = _
66   @volatile private[this] var _cacheConcurrencyLevel: Int = _
67
68   private[this] val stoppingUserActors = new ConcurrentHashMap[String, Future[Boolean]]
69
70   private[this] val isShuttingDown = new AtomicBoolean(false)
71
72   def propertyPrefix: Option[String] = Some("actors")
73
74   /**
75    * Configure this instance with the provided properties.
76    *
77    * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
78    */
79   def configure(props: KKProps): Unit = {
80     this._cacheMaximumSize = 1000
81     this._cacheInitialCapacity = 2 * this._cacheMaximumSize / 3
82     this._cacheConcurrencyLevel = Runtime.getRuntime.availableProcessors() * 2
83   }
84
85   def actorSystem = {
86     if(this._actorSystem eq null) {
87       throw new AquariumInternalError("Akka actorSystem is null")
88     }
89
90     if(this.isShuttingDown.get()) {
91       throw new AquariumException("%s is shutting down".format(shortClassNameOf(this)))
92     }
93
94     this._actorSystem
95   }
96
97   def foreachCachedUserID[A](f: String ⇒ A): Unit = {
98     val keys = this._userActorCache.asMap().keySet().iterator()
99     while(keys.hasNext) { f(keys.next()) }
100   }
101
102   def cacheStats: CacheStats = {
103     this._userActorCache.stats()
104   }
105
106   def cacheSize: Long = {
107     this._userActorCache.size()
108   }
109
110   def start() = {
111     // We have AKKA builtin, so no need to mess with pre-existing installation.
112     if(SysEnvs.AKKA_HOME.value.isJust) {
113       val error = new AquariumInternalError("%s is set. Please unset and restart Aquarium".format(SysEnvs.Names.AKKA_HOME))
114       logger.error("%s is set".format(SysEnvs.Names.AKKA_HOME), error)
115       throw error
116     }
117
118     this._cacheEvictionListener = new RemovalListener[String, ActorRef] {
119       def onRemoval(rn: RemovalNotification[String, ActorRef]): Unit = {
120         if(isShuttingDown.get()) {
121           return
122         }
123
124         val userID   = rn.getKey
125         val actorRef = rn.getValue
126         val cause    = rn.getCause
127
128         if(rn.wasEvicted()) {
129           logger.debug("Evicted UserActor %s due to %s".format(userID, cause))
130           gracefullyStopUserActor(userID, actorRef)
131         } else {
132           logger.debug("UserActor %s cache notification for %s".format(userID, cause))
133         }
134       }
135     }
136
137     this._userActorCache = CacheBuilder.
138       newBuilder().
139         recordStats().
140         maximumSize(this._cacheMaximumSize).
141         initialCapacity(this._cacheInitialCapacity).
142         concurrencyLevel(this._cacheConcurrencyLevel).
143         removalListener(this._cacheEvictionListener).
144       build()
145
146     this._actorSystem = ActorSystem("aquarium-akka", ConfigFactory.load("akka.conf"))
147   }
148
149   def stop() = {
150     this.isShuttingDown.set(true)
151
152     this.stoppingUserActors.clear()
153
154     logger.info("UserActor cache stats: {}", this._userActorCache.stats())
155     this._userActorCache.invalidateAll
156     this._userActorCache.cleanUp()
157
158     this._actorSystem.shutdown()
159   }
160
161   def notifyUserActorPostStop(userActor: UserActor): Unit = {
162     logger.debug("Removing UserActor %s from stopping set (after postStop())".format(userActor.userID))
163     this.stoppingUserActors.remove(userActor.userID)
164   }
165
166   private[this] def gracefullyStopUserActor(userID: String, actorRef: ActorRef): Unit = {
167     logger.debug("Gracefully stopping UserActor %s (and inserting into stopping set)".format(userID))
168     this.stoppingUserActors.put(
169       userID,
170       akka.pattern.gracefulStop(actorRef, Duration(1000, TimeUnit.MILLISECONDS))(this._actorSystem)
171     )
172   }
173
174   def invalidateUserActor(userActor: UserActor): Unit = {
175     if(this.isShuttingDown.get()) {
176       return
177     }
178
179     val userID = userActor.userID
180     val actorRef = userActor.self
181
182     this._userActorCache.invalidate(userID)
183     gracefullyStopUserActor(userID, actorRef)
184   }
185
186   def getOrCreateUserActor(userID: String): ActorRef = {
187     if(this.isShuttingDown.get()) {
188       throw new AquariumException(
189         "%s is shutting down. Cannot provide user actor %s".format(
190           shortClassNameOf(this),
191           userID))
192     }
193
194     // If stopping, wait to stop or ignore
195     this.stoppingUserActors.get(userID) match {
196       case null ⇒
197         logger.debug("UserActor %s was not stopping (don't know if it exists yet)".format(userID))
198
199       case future ⇒
200         try {
201           logger.debug("Await.result(): Waiting while UserActor %s is stopping".format(userID))
202           val stopped = Await.result(future, Duration(1000, TimeUnit.MILLISECONDS))
203           if(!stopped) {
204             // TODO: Add metric
205             logger.warn("Await.result(): UserActor %s id not stop. Will remove from stopping anayway".format(userID))
206           }
207         }
208         catch {
209           case e: java.util.concurrent.TimeoutException ⇒
210             // TODO: Add metric
211             logger.error("Timed-out while waiting for UserActor %s to stop. Will remove from stopping anayway".format(userID), e)
212
213           case e: Throwable ⇒
214             logger.error("While Await(ing) UserActor %s to stop. Will remove from stopping anayway".format(userID), e)
215         }
216         finally {
217           this.stoppingUserActors.remove(userID)
218         }
219     }
220
221     this._userActorCache.get(userID, new Callable[ActorRef] {
222       def call(): ActorRef = {
223         // Create new User Actor instance
224         logger.debug("Creating new UserActor instance for %s".format(userID))
225         val actorRef = _actorSystem.actorOf(Props.apply({
226           aquarium.newInstance(classOf[UserActor], classOf[UserActor].getName)
227         }), "userActor::%s".format(userID))
228
229         // Cache it for subsequent calls
230         _userActorCache.put(userID, actorRef)
231
232         // Send the initialization message
233         actorRef ! InitializeUserState(userID, TimeHelpers.nowMillis())
234
235         actorRef
236       }
237     })
238   }
239 }