WIP: ResourceEvent-related refactorings
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / router / RouterActor.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.actor
37 package service
38 package router
39
40 import gr.grnet.aquarium.util.shortClassNameOf
41 import gr.grnet.aquarium.service.RoleableActorProviderService
42 import akka.actor.ActorRef
43 import user.{UserActorCache, UserActorSupervisor}
44 import message.config.{AquariumPropertiesLoaded, ActorProviderConfigured}
45 import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
46 import gr.grnet.aquarium.actor.message.admin.PingAllRequest
47 import gr.grnet.aquarium.actor.message.{UserActorRequestMessage, GetUserStateRequest, GetUserBalanceRequest}
48 import gr.grnet.aquarium.{AquariumException, AquariumInternalError}
49
50 /**
51  * Business logic router. Incoming messages are routed to appropriate destinations. Replies are routed back
52  * appropriately.
53  *
54  * @author Christos KK Loverdos <loverdos@gmail.com>.
55  */
56 class RouterActor extends ReflectiveRoleableActor {
57   private[this] var _actorProvider: RoleableActorProviderService = _
58
59   def role = RouterRole
60
61   private[this] def _launchUserActor(userID: String): ActorRef = {
62     // create a fresh instance
63     val userActor = _actorProvider.actorForRole(UserActorRole)
64     UserActorCache.put(userID, userActor)
65     UserActorSupervisor.supervisor.link(userActor)
66
67     userActor
68   }
69
70   private[this] def _findOrCreateUserActor(userID: String): ActorRef = {
71     UserActorCache.get(userID) match {
72       case Some(userActorRef) ⇒
73         userActorRef
74
75       case None ⇒
76         _launchUserActor(userID)
77     }
78   }
79
80   private[this] def _forwardToUserActor(userID: String, m: UserActorRequestMessage): Unit = {
81     _findOrCreateUserActor(userID) forward m
82   }
83
84
85   /**
86    * Handles an exception that occurred while servicing a message.
87    *
88    * @param t
89    * The exception.
90    * @param servicingMessage
91    * The message that was being served while the exception happened.
92    * Note that the message can be `null`, in which case the exception
93    * is an NPE.
94    */
95   override protected def onThrowable(t: Throwable, servicingMessage: AnyRef) = {
96     logChainOfCauses(t)
97
98     def logIgnore(e: Throwable) = {
99       logger.error("Ignoring %s".format(shortClassNameOf(e)), e)
100     }
101
102     t match {
103       case e: Error ⇒
104         throw e
105
106       case e: AquariumInternalError ⇒
107         logIgnore(e)
108
109       case e: AquariumException ⇒
110         logIgnore(e)
111
112       case e: Throwable ⇒
113         logIgnore(e)
114     }
115   }
116
117   def onAquariumPropertiesLoaded(m: AquariumPropertiesLoaded): Unit = {
118     logger.info("Configured with {}", shortClassNameOf(m))
119   }
120
121   def onActorProviderConfigured(m: ActorProviderConfigured): Unit = {
122     this._actorProvider = m.actorProvider
123     logger.info("Configured with {}", shortClassNameOf(m))
124   }
125
126   def onProcessIMEvent(m: ProcessIMEvent): Unit = {
127      _forwardToUserActor(m.imEvent.userID, m)
128   }
129
130   def onGetUserBalanceRequest(m: GetUserBalanceRequest): Unit = {
131     _forwardToUserActor(m.userID, m)
132   }
133
134   def onGetUserStateRequest(m: GetUserStateRequest): Unit = {
135     _forwardToUserActor(m.userID, m)
136   }
137
138   def onProcessResourceEvent(m: ProcessResourceEvent): Unit = {
139     _forwardToUserActor(m.rcEvent.userID, m)
140   }
141
142   def onPingAllRequest(m: PingAllRequest): Unit = {
143   }
144
145   override def postStop = {
146     UserActorCache.stop
147   }
148 }