6d10baba8563b76908315d9678822e6a3b587970
[aquarium] / src / main / scala / gr / grnet / aquarium / actor / service / router / RouterActor.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.actor
37 package service
38 package router
39
40 import gr.grnet.aquarium.util.shortClassNameOf
41 import gr.grnet.aquarium.service.RoleableActorProviderService
42 import akka.actor.ActorRef
43 import user.{UserActorCache}
44 import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
45 import gr.grnet.aquarium.actor.message.admin.PingAllRequest
46 import gr.grnet.aquarium.actor.message.{UserActorRequestMessage, GetUserStateRequest, GetUserBalanceRequest}
47 import gr.grnet.aquarium.{AquariumException, AquariumInternalError}
48 import gr.grnet.aquarium.actor.message.config.{InitializeUserState, AquariumPropertiesLoaded, ActorProviderConfigured}
49
50 /**
51  * Business logic router. Incoming messages are routed to appropriate destinations. Replies are routed back
52  * appropriately.
53  *
54  * @author Christos KK Loverdos <loverdos@gmail.com>.
55  */
56 class RouterActor extends ReflectiveRoleableActor {
57   private[this] var _actorProvider: RoleableActorProviderService = _
58
59   def role = RouterRole
60
61   private[this] def _launchUserActor(userID: String, referenceTimeMillis: Long): ActorRef = {
62     // create a fresh instance
63     val userActor = _actorProvider.actorForRole(UserActorRole)
64     UserActorCache.put(userID, userActor)
65
66     userActor ! InitializeUserState(userID, referenceTimeMillis)
67
68     userActor
69   }
70
71   private[this] def _findOrCreateUserActor(userID: String, referenceTimeMillis: Long): ActorRef = {
72     UserActorCache.get(userID) match {
73       case Some(userActorRef) ⇒
74         userActorRef
75
76       case None ⇒
77         _launchUserActor(userID, referenceTimeMillis)
78     }
79   }
80
81   private[this] def _forwardToUserActor(userID: String, m: UserActorRequestMessage): Unit = {
82     _findOrCreateUserActor(userID, m.referenceTimeMillis) forward m
83   }
84
85
86   /**
87    * Handles an exception that occurred while servicing a message.
88    *
89    * @param t
90    * The exception.
91    * @param servicingMessage
92    * The message that was being served while the exception happened.
93    * Note that the message can be `null`, in which case the exception
94    * is an NPE.
95    */
96   override protected def onThrowable(t: Throwable, servicingMessage: AnyRef) = {
97     logChainOfCauses(t)
98
99     def logIgnore(e: Throwable) = {
100       logger.error("Ignoring %s".format(shortClassNameOf(e)), e)
101     }
102
103     t match {
104       case e: Error ⇒
105         throw e
106
107       case e: AquariumInternalError ⇒
108         logIgnore(e)
109
110       case e: AquariumException ⇒
111         logIgnore(e)
112
113       case e: Throwable ⇒
114         logIgnore(e)
115     }
116   }
117
118   def onAquariumPropertiesLoaded(m: AquariumPropertiesLoaded): Unit = {
119     logger.info("Configured with {}", shortClassNameOf(m))
120   }
121
122   def onActorProviderConfigured(m: ActorProviderConfigured): Unit = {
123     this._actorProvider = m.actorProvider
124     logger.info("Configured with {}", shortClassNameOf(m))
125   }
126
127   def onProcessIMEvent(m: ProcessIMEvent): Unit = {
128      _forwardToUserActor(m.imEvent.userID, m)
129   }
130
131   def onGetUserBalanceRequest(m: GetUserBalanceRequest): Unit = {
132     _forwardToUserActor(m.userID, m)
133   }
134
135   def onGetUserStateRequest(m: GetUserStateRequest): Unit = {
136     _forwardToUserActor(m.userID, m)
137   }
138
139   def onProcessResourceEvent(m: ProcessResourceEvent): Unit = {
140     _forwardToUserActor(m.rcEvent.userID, m)
141   }
142
143   def onPingAllRequest(m: PingAllRequest): Unit = {
144   }
145
146   override def postStop = {
147     UserActorCache.stop
148   }
149 }