WIP Reowrk AMQP stuff.
[aquarium] / src / main / scala / gr / grnet / aquarium / service / SimpleLocalRoleableActorProviderService.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.service
37
38 import com.ckkloverdos.props.Props
39 import akka.actor.ActorRef
40 import gr.grnet.aquarium.Configurable
41 import java.util.concurrent.ConcurrentHashMap
42 import gr.grnet.aquarium.util.Loggable
43 import gr.grnet.aquarium.util.date.TimeHelpers
44 import gr.grnet.aquarium.actor.message.config.{AquariumPropertiesLoaded, ActorProviderConfigured}
45 import gr.grnet.aquarium.actor._
46
47
48 /**
49  * All actors are provided locally.
50  *
51  * @author Christos KK Loverdos <loverdos@gmail.com>.
52  */
53 class SimpleLocalRoleableActorProviderService extends RoleableActorProviderService with Configurable with Loggable {
54   private[this] val actorCache = new ConcurrentHashMap[ActorRole, ActorRef]
55   private[this] var _props: Props = _
56
57   def propertyPrefix = None
58
59   def configure(props: Props): Unit = {
60     this._props = props
61     logger.debug("Configured with props")
62   }
63
64   private[this] def __doStart(): Unit = {
65     // Start and configure actors
66     import SimpleLocalRoleableActorProviderService.RolesToBeStarted
67
68     for(role <- RolesToBeStarted) {
69       actorForRole(role)
70     }
71   }
72
73   def start(): Unit = {
74     logStarting()
75     val (ms0, ms1, _) = TimeHelpers.timed {
76       __doStart()
77     }
78     logStarted(ms0, ms1)
79   }
80
81   def stop(): Unit = {
82     logStopped(TimeHelpers.nowMillis(), TimeHelpers.nowMillis())
83   }
84
85   private[this] def _newActor(role: ActorRole): ActorRef = {
86     val actorRef = akka.actor.Actor.actorOf(role.actorType).start()
87
88     val propsMsg = AquariumPropertiesLoaded(this._props)
89     if(role.canHandleConfigurationMessage(propsMsg)) {
90       actorRef ! propsMsg
91     }
92
93     val providerMsg = ActorProviderConfigured(this)
94     if(role.canHandleConfigurationMessage(providerMsg)) {
95       actorRef ! providerMsg
96     }
97
98     actorRef
99   }
100
101   private[this] def _fromCacheOrNew(role: ActorRole): ActorRef = synchronized {
102     actorCache.get(role) match {
103       case null ⇒
104         val actorRef = _newActor(role)
105         actorCache.put(role, actorRef)
106         actorRef
107       case actorRef ⇒
108         actorRef
109     }
110   }
111
112   @throws(classOf[Exception])
113   def actorForRole(role: ActorRole, hints: Props = Props.empty) = synchronized {
114     if(role.isCacheable) {
115       _fromCacheOrNew(role)
116     } else {
117       _newActor(role)
118     }
119   }
120
121   override def toString = gr.grnet.aquarium.util.shortClassNameOf(this)
122 }
123
124 object SimpleLocalRoleableActorProviderService {
125   // Always set Dispatcher at the end.
126   // We could definitely use some automatic dependency sorting here (topological sorting anyone?)
127   final val RolesToBeStarted = List(
128     //    ResourceProcessorRole,
129     RESTRole,
130     PingerRole,
131     RouterRole)
132
133   lazy val ActorClassByRole: Map[ActorRole, Class[_ <: RoleableActor]] =
134     RolesToBeStarted map {
135       role ⇒
136         (role, role.actorType)
137     } toMap
138
139   lazy val ActorRefByRole: Map[ActorRole, ActorRef] =
140     ActorClassByRole map {
141       case (role, clazz) ⇒
142         (role, akka.actor.Actor.actorOf(clazz).start())
143     }
144 }