2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.service
38 import akka.actor.{Props, ActorRef, ActorSystem}
39 import gr.grnet.aquarium.util.{Loggable, Lifecycle, shortClassNameOf}
40 import gr.grnet.aquarium.ResourceLocator.SysEnvs
41 import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable, AquariumException, AquariumInternalError}
42 import com.typesafe.config.ConfigFactory
43 import java.util.concurrent.atomic.AtomicBoolean
44 import com.google.common.cache.{CacheStats, RemovalNotification, RemovalListener, CacheBuilder, Cache}
45 import com.ckkloverdos.props.{Props ⇒ KKProps}
46 import gr.grnet.aquarium.actor.service.user.UserActor
47 import gr.grnet.aquarium.service.event.AquariumCreatedEvent
48 import gr.grnet.aquarium.actor.message.config.InitializeUserState
49 import gr.grnet.aquarium.util.date.TimeHelpers
50 import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Callable}
51 import akka.dispatch.{Await, Future}
52 import akka.util.Duration
55 * A wrapper around Akka, so that it is uniformly treated as an Aquarium service.
57 * @author Christos KK Loverdos <loverdos@gmail.com>
60 final class AkkaService extends AquariumAwareSkeleton with Configurable with Lifecycle with Loggable {
61 @volatile private[this] var _actorSystem: ActorSystem = _
62 @volatile private[this] var _userActorCache: Cache[String, ActorRef] = _
63 @volatile private[this] var _cacheEvictionListener: RemovalListener[String, ActorRef] = _
64 @volatile private[this] var _cacheMaximumSize: Int = _
65 @volatile private[this] var _cacheInitialCapacity: Int = _
66 @volatile private[this] var _cacheConcurrencyLevel: Int = _
68 private[this] val stoppingUserActors = new ConcurrentHashMap[String, Future[Boolean]]
70 private[this] val isShuttingDown = new AtomicBoolean(false)
72 def propertyPrefix: Option[String] = Some("actors")
75 * Configure this instance with the provided properties.
77 * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
79 def configure(props: KKProps): Unit = {
80 this._cacheMaximumSize = 1000
81 this._cacheInitialCapacity = 2 * this._cacheMaximumSize / 3
82 this._cacheConcurrencyLevel = Runtime.getRuntime.availableProcessors() * 2
86 if(this._actorSystem eq null) {
87 throw new AquariumInternalError("Akka actorSystem is null")
90 if(this.isShuttingDown.get()) {
91 throw new AquariumException("%s is shutting down".format(shortClassNameOf(this)))
97 def foreachCachedUserID[A](f: String ⇒ A): Unit = {
98 val keys = this._userActorCache.asMap().keySet().iterator()
99 while(keys.hasNext) { f(keys.next()) }
102 def cacheStats: CacheStats = {
103 this._userActorCache.stats()
106 def cacheSize: Long = {
107 this._userActorCache.size()
111 // We have AKKA builtin, so no need to mess with pre-existing installation.
112 if(SysEnvs.AKKA_HOME.value.isJust) {
113 val error = new AquariumInternalError("%s is set. Please unset and restart Aquarium".format(SysEnvs.Names.AKKA_HOME))
114 logger.error("%s is set".format(SysEnvs.Names.AKKA_HOME), error)
118 this._cacheEvictionListener = new RemovalListener[String, ActorRef] {
119 def onRemoval(rn: RemovalNotification[String, ActorRef]): Unit = {
120 if(isShuttingDown.get()) {
124 val userID = rn.getKey
125 val actorRef = rn.getValue
126 val cause = rn.getCause
128 if(rn.wasEvicted()) {
129 logger.debug("Evicted UserActor %s due to %s".format(userID, cause))
130 gracefullyStopUserActor(userID, actorRef)
132 logger.debug("UserActor %s cache notification for %s".format(userID, cause))
137 this._userActorCache = CacheBuilder.
140 maximumSize(this._cacheMaximumSize).
141 initialCapacity(this._cacheInitialCapacity).
142 concurrencyLevel(this._cacheConcurrencyLevel).
143 removalListener(this._cacheEvictionListener).
146 this._actorSystem = ActorSystem("aquarium-akka", ConfigFactory.load("akka.conf"))
150 this.isShuttingDown.set(true)
152 this.stoppingUserActors.clear()
154 logger.info("UserActor cache stats: {}", this._userActorCache.stats())
155 this._userActorCache.invalidateAll
156 this._userActorCache.cleanUp()
158 this._actorSystem.shutdown()
161 def notifyUserActorPostStop(userActor: UserActor): Unit = {
162 logger.debug("Removing UserActor %s from stopping set (after postStop())".format(userActor.userID))
163 this.stoppingUserActors.remove(userActor.userID)
166 private[this] def gracefullyStopUserActor(userID: String, actorRef: ActorRef): Unit = {
167 logger.debug("Gracefully stopping UserActor %s (and inserting into stopping set)".format(userID))
168 this.stoppingUserActors.put(
170 akka.pattern.gracefulStop(actorRef, Duration(1000, TimeUnit.MILLISECONDS))(this._actorSystem)
174 def invalidateUserActor(userActor: UserActor): Unit = {
175 if(this.isShuttingDown.get()) {
179 val userID = userActor.userID
180 val actorRef = userActor.self
182 this._userActorCache.invalidate(userID)
183 gracefullyStopUserActor(userID, actorRef)
186 def getOrCreateUserActor(userID: String): ActorRef = {
187 if(this.isShuttingDown.get()) {
188 throw new AquariumException(
189 "%s is shutting down. Cannot provide user actor %s".format(
190 shortClassNameOf(this),
194 // If stopping, wait to stop or ignore
195 this.stoppingUserActors.get(userID) match {
197 logger.debug("UserActor %s was not stopping (don't know if it exists yet)".format(userID))
201 logger.debug("Await.result(): Waiting while UserActor %s is stopping".format(userID))
202 val stopped = Await.result(future, Duration(1000, TimeUnit.MILLISECONDS))
205 logger.warn("Await.result(): UserActor %s id not stop. Will remove from stopping anayway".format(userID))
209 case e: java.util.concurrent.TimeoutException ⇒
211 logger.error("Timed-out while waiting for UserActor %s to stop. Will remove from stopping anayway".format(userID), e)
214 logger.error("While Await(ing) UserActor %s to stop. Will remove from stopping anayway".format(userID), e)
217 this.stoppingUserActors.remove(userID)
221 this._userActorCache.get(userID, new Callable[ActorRef] {
222 def call(): ActorRef = {
223 // Create new User Actor instance
224 logger.debug("Creating new UserActor instance for %s".format(userID))
225 val actorRef = _actorSystem.actorOf(Props.apply({
226 aquarium.newInstance(classOf[UserActor])
227 }), "userActor::%s".format(userID))
229 // Cache it for subsequent calls
230 _userActorCache.put(userID, actorRef)
232 // Send the initialization message
233 actorRef ! InitializeUserState(userID, TimeHelpers.nowMillis())