</dependency>
<dependency>
- <groupId>se.scalablesolutions.akka</groupId>
+ <groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor</artifactId>
- <version>1.3.1</version>
- <exclusions>
- <exclusion>
- <artifactId>scala-library</artifactId>
- <groupId>org.scala-lang</groupId>
- </exclusion>
- </exclusions>
+ <version>2.0.2</version>
</dependency>
<dependency>
- <groupId>se.scalablesolutions.akka</groupId>
+ <groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote</artifactId>
- <version>1.3.1</version>
- <exclusions>
- <exclusion>
- <artifactId>sjson_2.9.0</artifactId>
- <groupId>net.debasishg</groupId>
- </exclusion>
- <exclusion>
- <artifactId>akka-stm</artifactId>
- <groupId>se.scalablesolutions.akka</groupId>
- </exclusion>
- <exclusion>
- <artifactId>h2-lzf</artifactId>
- <groupId>voldemort.store.compress</groupId>
- </exclusion>
- <exclusion>
- <artifactId>akka-typed-actor</artifactId>
- <groupId>se.scalablesolutions.akka</groupId>
- </exclusion>
- </exclusions>
+ <version>2.0.2</version>
</dependency>
<dependency>
- <groupId>se.scalablesolutions.akka</groupId>
+ <groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j</artifactId>
- <version>1.3.1</version>
+ <version>2.0.2</version>
</dependency>
<!--
<dependency>
akka {
- version = "1.3.1"
+ version = "2.0.2"
event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
- event-handler-level = "WARNING" # Options: ERROR, WARNING, INFO, DEBUG
+
+ # Log level used by the configured loggers (see "event-handlers") as soon
+ # as they have been started; before that, see "stdout-loglevel"
+ # Options: ERROR, WARNING, INFO, DEBUG
+ loglevel = "INFO"
+
+ # Log level for the very basic logger activated during AkkaApplication startup
+ # Options: ERROR, WARNING, INFO, DEBUG
+ stdout-loglevel = "WARNING"
+
+ # Log the complete configuration at INFO level when the actor system is started.
+ # This is useful when you are uncertain of what configuration is used.
+ log-config-on-start = off
+
+ # Toggles whether the threads created by this ActorSystem should be daemons or not
+ daemonic = on
actor {
- # Default throughput for all ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness
- throughput = 1
+ default-dispatcher {
+ # Throughput defines the number of messages that are processed in a batch
+ # before the thread is returned to the pool. Set to 1 for as fair as possible.
+ throughput = 1
+ }
}
}
\ No newline at end of file
def converters = apply(EnvKeys.converters)
- def actorProvider = apply(EnvKeys.actorProvider)
+// def actorProvider = apply(EnvKeys.actorProvider)
def saveResourceEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveRCEvents)
def restPort = apply(EnvKeys.restPort)
+ def akkaService = apply(EnvKeys.akkaService)
+
def version = apply(EnvKeys.version)
}
final val ServiceKeys: List[TypedKey[_ <: Lifecycle]] = List(
EnvKeys.timerService,
EnvKeys.akkaService,
- EnvKeys.actorProvider,
EnvKeys.eventBus,
EnvKeys.restService,
EnvKeys.rabbitMQService,
* The fully qualified name of the class that implements the `RoleableActorProviderService`.
* Will be instantiated reflectively and should have a public default constructor.
*/
- final val actorProvider: TypedKey[RoleableActorProviderService] =
- new AquariumEnvKey[RoleableActorProviderService]("actor.provider.class")
+// final val actorProvider: TypedKey[RoleableActorProviderService] =
+// new AquariumEnvKey[RoleableActorProviderService]("actor.provider.class")
final val akkaService: TypedKey[AkkaService] =
new AquariumEnvKey[AkkaService]("akka.service")
newInstance(envKey.keyType, classOf[StoreWatcherService].getName)
}
- checkPropsOverride(EnvKeys.actorProvider) { (envKey, propValue) ⇒
- newInstance(envKey.keyType, propValue)
- }
-
checkPropsOverride(EnvKeys.userStateTimestampThreshold) { (envKey, propValue) ⇒
propValue.toLong
}
*/
package gr.grnet.aquarium.actor
-import service.router.RouterActor
import service.user.{UserActor}
import gr.grnet.aquarium.actor.message.event.{ProcessIMEvent, ProcessResourceEvent}
-import gr.grnet.aquarium.actor.message.admin.PingAllRequest
import gr.grnet.aquarium.actor.message.{GetUserStateRequest, GetUserBalanceRequest}
-import gr.grnet.aquarium.actor.message.config.{InitializeUserState, AquariumPropertiesLoaded, ActorProviderConfigured, ActorConfigurationMessage}
+import gr.grnet.aquarium.actor.message.config.{InitializeUserState, AquariumPropertiesLoaded, ActorConfigurationMessage}
/**
* Each actor within Aquarium plays one role.
}
/**
- * The generic router.
- */
-case object RouterRole
- extends ActorRole("RouterRole",
- true,
- classOf[RouterActor],
- Set(classOf[GetUserBalanceRequest],
- classOf[GetUserStateRequest],
- classOf[ProcessResourceEvent],
- classOf[ProcessIMEvent],
- classOf[PingAllRequest]),
- Set(classOf[ActorProviderConfigured],
- classOf[AquariumPropertiesLoaded]))
-
-/**
* User-oriented business logic handler role.
*/
case object UserActorRole
classOf[GetUserBalanceRequest],
classOf[GetUserStateRequest]),
Set(classOf[InitializeUserState],
- classOf[ActorProviderConfigured],
classOf[AquariumPropertiesLoaded]))
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
-case class GetUserBalanceRequest(userID: String, timestamp: Long)
-extends ActorMessage
- with RouterRequestMessage
- with UserActorRequestMessage {
-
+case class GetUserBalanceRequest(userID: String, timestamp: Long) extends ActorMessage with UserActorRequestMessage {
def referenceTimeMillis = timestamp
}
package gr.grnet.aquarium.actor.message
+import gr.grnet.aquarium.AquariumInternalError
+
/**
*
* @author Christos KK Loverdos <loverdos@gmail.com>
case class GetUserBalanceResponse(
balance: Either[String, GetUserBalanceResponseData],
override val suggestedHTTPStatus: Int = 200)
-extends RouterResponseMessage(balance, suggestedHTTPStatus)
+extends UserActorResponseMessage(balance, suggestedHTTPStatus) {
+ def userID = balance match {
+ case Left(error) ⇒
+ throw new AquariumInternalError("Could not obtain userID. %s".format(error))
+
+ case Right(data) ⇒
+ data.userID
+ }
+}
case class GetUserBalanceResponseData(userID: String, balance: Double)
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
-case class GetUserStateRequest(userID: String, timestamp: Long)
-extends ActorMessage
- with RouterRequestMessage
- with UserActorRequestMessage {
-
+case class GetUserStateRequest(userID: String, timestamp: Long) extends ActorMessage with UserActorRequestMessage {
def referenceTimeMillis = timestamp
}
package gr.grnet.aquarium.actor.message
import gr.grnet.aquarium.computation.state.UserState
+import gr.grnet.aquarium.AquariumInternalError
/**
*
case class GetUserStateResponse(
state: Either[String, UserState],
override val suggestedHTTPStatus: Int = 200)
-extends RouterResponseMessage(state, suggestedHTTPStatus)
+extends UserActorResponseMessage(state, suggestedHTTPStatus) {
+ def userID = state match {
+ case Left(error) ⇒
+ throw new AquariumInternalError("Could not obtain userID. %s".format(error))
+
+ case Right(data) ⇒
+ data.userID
+ }
+}
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.actor.message
-
-/**
- * Request message sent to [[gr.grnet.aquarium.actor.service.router.RouterActor]].
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>
- */
-
-
-trait RouterRequestMessage extends ActorMessage
*/
trait UserActorRequestMessage extends ActorMessage {
+ def userID: String
+
def referenceTimeMillis: Long
}
import gr.grnet.aquarium.converter.{StdConverters, PrettyJsonTextFormat}
/**
- * Response message sent as a reply by [[gr.grnet.aquarium.actor.service.router.RouterActor]].
+ * Response message sent as a reply by the [[gr.grnet.aquarium.actor.service.user.UserActor]].
*
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
-abstract class RouterResponseMessage[T](
+abstract class UserActorResponseMessage[T](
val response: Either[String, T],
val suggestedHTTPStatus: Int = 200)
extends ActorMessage {
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.actor.message.admin
-
-import gr.grnet.aquarium.actor.message.{RouterRequestMessage, ActorMessage}
-
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>
- */
-
-case class PingAllRequest() extends ActorMessage with RouterRequestMessage
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.actor.message.admin
-
-import gr.grnet.aquarium.actor.message.{RouterResponseMessage, RouterRequestMessage, ActorMessage}
-
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>
- */
-
-case class PingAllResponse() extends RouterResponseMessage(Right(""))
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.actor.message
-package config
-
-import gr.grnet.aquarium.service.RoleableActorProviderService
-
-/**
- * Sent with the configured [[gr.grnet.aquarium.service.RoleableActorProviderService]].
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>
- */
-
-case class ActorProviderConfigured(actorProvider: RoleableActorProviderService) extends ActorConfigurationMessage
\ No newline at end of file
import gr.grnet.aquarium.event.model.im.IMEventModel
/**
- * [[gr.grnet.aquarium.actor.service.router.RouterActor]] message
- * that triggers the user event processing pipeline.
+ * A message that triggers the user event processing pipeline.
*
* Note that the prefix `Process` means that no reply is created or needed.
*
*/
case class ProcessIMEvent(imEvent: IMEventModel) extends ActorMessage with UserActorRequestMessage {
+ def userID = imEvent.userID
+
def referenceTimeMillis = imEvent.occurredMillis
}
import gr.grnet.aquarium.event.model.resource.ResourceEventModel
/**
- * [[gr.grnet.aquarium.actor.service.router.RouterActor]] message
- * that triggers the resource event processing pipeline.
+ * A message that triggers the resource event processing pipeline.
*
* Note that the prefix `Process` means that no reply is created or needed.
*
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
-case class ProcessResourceEvent(rcEvent: ResourceEventModel)
-extends ActorMessage
- with UserActorRequestMessage {
+case class ProcessResourceEvent(rcEvent: ResourceEventModel) extends ActorMessage with UserActorRequestMessage {
+ def userID = rcEvent.userID
def referenceTimeMillis = rcEvent.occurredMillis
}
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.actor
-package service
-package router
-
-import gr.grnet.aquarium.service.RoleableActorProviderService
-import akka.actor.ActorRef
-import user.{UserActorCache}
-import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
-import gr.grnet.aquarium.actor.message.admin.PingAllRequest
-import gr.grnet.aquarium.actor.message.{UserActorRequestMessage, GetUserStateRequest, GetUserBalanceRequest}
-import gr.grnet.aquarium.{AquariumException, AquariumInternalError}
-import gr.grnet.aquarium.actor.message.config.{InitializeUserState, AquariumPropertiesLoaded, ActorProviderConfigured}
-import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
-
-/**
- * Business logic router. Incoming messages are routed to appropriate destinations. Replies are routed back
- * appropriately.
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-class RouterActor extends ReflectiveRoleableActor {
- private[this] var _actorProvider: RoleableActorProviderService = _
-
- def role = RouterRole
-
- private[this] def _launchUserActor(userID: String, referenceTimeMillis: Long): ActorRef = {
- // create a fresh instance
- val userActor = _actorProvider.actorForRole(UserActorRole)
- UserActorCache.put(userID, userActor)
-
- userActor ! InitializeUserState(userID, referenceTimeMillis)
-
- userActor
- }
-
- private[this] def _findOrCreateUserActor(userID: String, referenceTimeMillis: Long): ActorRef = {
- UserActorCache.get(userID) match {
- case Some(userActorRef) ⇒
- userActorRef
-
- case None ⇒
- _launchUserActor(userID, referenceTimeMillis)
- }
- }
-
- private[this] def _forwardToUserActor(userID: String, m: UserActorRequestMessage): Unit = {
- _findOrCreateUserActor(userID, m.referenceTimeMillis) forward m
- }
-
-
- /**
- * Handles an exception that occurred while servicing a message.
- *
- * @param t
- * The exception.
- * @param servicingMessage
- * The message that was being served while the exception happened.
- * Note that the message can be `null`, in which case the exception
- * is an NPE.
- */
- override protected def onThrowable(t: Throwable, servicingMessage: AnyRef) = {
- LogHelpers.logChainOfCauses(logger, t)
-
- def logIgnore(e: Throwable) = {
- logger.error("Ignoring %s".format(shortClassNameOf(e)), e)
- }
-
- // We ignore everything except serious errors.
- t match {
- case e: Error ⇒
- throw e
-
- case e: AquariumInternalError ⇒
- logIgnore(e)
-
- case e: AquariumException ⇒
- logIgnore(e)
-
- case e: Throwable ⇒
- logIgnore(e)
- }
- }
-
- def onAquariumPropertiesLoaded(m: AquariumPropertiesLoaded): Unit = {
- logger.info("Configured with {}", shortClassNameOf(m))
- }
-
- def onActorProviderConfigured(m: ActorProviderConfigured): Unit = {
- this._actorProvider = m.actorProvider
- logger.info("Configured with {}", shortClassNameOf(m))
- }
-
- def onProcessIMEvent(m: ProcessIMEvent): Unit = {
- _forwardToUserActor(m.imEvent.userID, m)
- }
-
- def onGetUserBalanceRequest(m: GetUserBalanceRequest): Unit = {
- _forwardToUserActor(m.userID, m)
- }
-
- def onGetUserStateRequest(m: GetUserStateRequest): Unit = {
- _forwardToUserActor(m.userID, m)
- }
-
- def onProcessResourceEvent(m: ProcessResourceEvent): Unit = {
- _forwardToUserActor(m.rcEvent.userID, m)
- }
-
- def onPingAllRequest(m: PingAllRequest): Unit = {
- }
-
- override def postStop = {
- UserActorCache.stop
- }
-}
\ No newline at end of file
import gr.grnet.aquarium.actor._
-import akka.config.Supervision.Temporary
import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
-import gr.grnet.aquarium.actor.message.config.{InitializeUserState, ActorProviderConfigured, AquariumPropertiesLoaded}
+import gr.grnet.aquarium.actor.message.config.{InitializeUserState, AquariumPropertiesLoaded}
import gr.grnet.aquarium.util.date.TimeHelpers
import gr.grnet.aquarium.event.model.im.IMEventModel
import gr.grnet.aquarium.actor.message.{GetUserStateResponse, GetUserBalanceResponseData, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
*/
class UserActor extends ReflectiveRoleableActor {
- private[this] var _userID: String = "<?>"
- private[this] var _imState: IMStateSnapshot = _
- private[this] var _userState: UserState = _
- private[this] var _latestResourceEventOccurredMillis = TimeHelpers.nowMillis() // some valid datetime
+ @volatile private[this] var _userID: String = "<?>"
+ @volatile private[this] var _imState: IMStateSnapshot = _
+ @volatile private[this] var _userState: UserState = _
+ @volatile private[this] var _latestResourceEventOccurredMillis = TimeHelpers.nowMillis() // some valid datetime
+
+ def userID = {
+ if(this._userID eq null) {
+ throw new AquariumInternalError("%s not initialized ")
+ }
- self.lifeCycle = Temporary
+ this._userID
+ }
- private[this] def _shutmedown(): Unit = {
- if(haveUserState) {
- UserActorCache.invalidate(_userID)
- }
+ override def postStop() {
+ aquarium.akkaService.notifyUserActorPostStop(this)
+ }
- self.stop()
+ private[this] def shutmedown(): Unit = {
+ if(haveIMState) {
+ aquarium.akkaService.invalidateUserActor(this)
+ }
}
override protected def onThrowable(t: Throwable, message: AnyRef) = {
LogHelpers.logChainOfCauses(logger, t)
ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
- _shutmedown()
+ shutmedown()
}
def role = UserActorRole
def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
}
- def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
- }
-
private[this] def _updateIMStateRoleHistory(imEvent: IMEventModel, roleCheck: Option[String]) = {
if(haveIMState) {
val (newState,
this._imState.hasBeenActivated match {
case true ⇒
// (have IMState, activated, have UserState)
- self reply GetUserBalanceResponse(Right(GetUserBalanceResponseData(userID, this._userState.totalCredits)))
+ sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(userID, this._userState.totalCredits)))
case false ⇒
// (have IMState, not activated, have UserState)
// Since we have user state, we should have been activated
- self reply GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
+ sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
}
case (true, false) ⇒
case true ⇒
// (have IMState, activated, no UserState)
// Since we are activated, we should have some state.
- self reply GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0002]"), 500)
+ sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0002]"), 500)
case false ⇒
// (have IMState, not activated, no UserState)
// The user is virtually unknown
- self reply GetUserBalanceResponse(Left("User %s not activated [AQU-BAL-0003]".format(userID)), 404 /*Not found*/)
+ sender ! GetUserBalanceResponse(Left("User %s not activated [AQU-BAL-0003]".format(userID)), 404 /*Not found*/)
}
case (false, true) ⇒
// (no IMState, have UserState)
// A bit ridiculous situation
- self reply GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
+ sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
case (false, false) ⇒
// (no IMState, no UserState)
- self reply GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0005]".format(userID)), 404/*Not found*/)
+ sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0005]".format(userID)), 404/*Not found*/)
}
}
def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
haveUserState match {
case true ⇒
- self reply GetUserStateResponse(Right(this._userState))
+ sender ! GetUserStateResponse(Right(this._userState))
case false ⇒
- self reply GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
+ sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
}
}
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.actor
-package service
-package user
-
-import akka.actor.ActorRef
-import gr.grnet.aquarium.util.{Loggable, Lifecycle}
-import com.google.common.cache._
-
-/**
- * An actor cache implementation using Guava.
- *
- * @author Georgios Gousios <gousiosg@gmail.com>
- */
-object UserActorCache extends Lifecycle with Loggable {
-
- private lazy val cache: Cache[String, ActorRef] =
- CacheBuilder.newBuilder()
- .maximumSize(1000)
- .initialCapacity(100)
- .concurrencyLevel(20)
- .removalListener(EvictionListener)
- .build()
-
- private[this] object EvictionListener extends RemovalListener[String, ActorRef] with Loggable {
-
- def onRemoval(rn: RemovalNotification[String, ActorRef]) {
- val userID = rn.getKey
- val userActor = rn.getValue
-
- logger.debug("Parking UserActor for userID = %s".format(userID))
- // Check this is received after any currently servicing business logic message.
- userActor.stop()
- }
- }
-
- def start() = {
- }
-
- def stop() = {
- cache.invalidateAll
- cache.cleanUp
- }
-
- def put(userID: String, userActor: ActorRef): Unit = {
- cache.put(userID, userActor)
- }
-
- def get(userID: String): Option[ActorRef] =
- cache.getIfPresent(userID) match {
- case null ⇒ None
- case actorRef ⇒ Some(actorRef)
- }
-
- def invalidate(userID: String): Unit = {
- cache.invalidate(userID)
- }
-}
import gr.grnet.aquarium.Aquarium
import org.slf4j.Logger
import gr.grnet.aquarium.converter.JsonTextFormat
-import gr.grnet.aquarium.actor.RouterRole
import gr.grnet.aquarium.store.{IMEventStore, LocalFSEventStore}
import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
import gr.grnet.aquarium.actor.message.event.ProcessIMEvent
// forwardAction: S ⇒ Unit
imEvent ⇒ {
- aquarium.actorProvider.actorForRole(RouterRole) ! ProcessIMEvent(imEvent)
+ aquarium.akkaService.getOrCreateUserActor(imEvent.userID) ! ProcessIMEvent(imEvent)
}
)
import gr.grnet.aquarium.Aquarium
import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
import gr.grnet.aquarium.actor.message.event.ProcessResourceEvent
-import gr.grnet.aquarium.actor.RouterRole
import gr.grnet.aquarium.util._
/**
// forwardAction: S ⇒ Unit
rcEvent ⇒ {
- aquarium.actorProvider.actorForRole(RouterRole) ! ProcessResourceEvent(rcEvent)
+ aquarium.akkaService.getOrCreateUserActor(rcEvent.userID) ! ProcessResourceEvent(rcEvent)
}
)
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.connector.rabbitmq.service
-
-import akka.dispatch.Future
-import gr.grnet.aquarium.connector.handler.{PayloadHandler, HandlerResult, PayloadHandlerExecutor}
-
-/**
- * An [[gr.grnet.aquarium.connector.handler.PayloadHandlerExecutor]] that uses `Akka` futures.
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>
- */
-
-final class PayloadHandlerFutureExecutor extends PayloadHandlerExecutor {
-
- def exec(payload: Array[Byte], handler: PayloadHandler)
- (onSuccess: HandlerResult ⇒ Any)
- (onError: Throwable ⇒ Any): Unit = {
-
- val result = Future { handler.handlePayload(payload) }
-
- result.onComplete { futureHandlerResult ⇒
- futureHandlerResult.value.get match {
- case Left(e) ⇒
- onError(e)
-
- case Right(handlerResult) ⇒
- onSuccess(handlerResult)
- }
- }
- }
-}
package gr.grnet.aquarium.service
-import akka.actor.Actor
-import gr.grnet.aquarium.util.{Loggable, Lifecycle}
+import akka.actor.{Props, ActorRef, ActorSystem}
+import gr.grnet.aquarium.util.{Loggable, Lifecycle, shortClassNameOf}
import gr.grnet.aquarium.ResourceLocator.SysEnvs
-import gr.grnet.aquarium.AquariumInternalError
+import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable, AquariumException, AquariumInternalError}
+import com.typesafe.config.ConfigFactory
+import java.util.concurrent.atomic.AtomicBoolean
+import com.google.common.cache.{RemovalNotification, RemovalListener, CacheBuilder, Cache}
+import com.ckkloverdos.props.{Props ⇒ KKProps}
+import gr.grnet.aquarium.actor.service.user.UserActor
+import gr.grnet.aquarium.service.event.AquariumCreatedEvent
+import gr.grnet.aquarium.actor.message.config.InitializeUserState
+import gr.grnet.aquarium.util.date.TimeHelpers
+import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Callable}
+import akka.dispatch.{Await, Future}
+import akka.util.Duration
/**
* A wrapper around Akka, so that it is uniformly treated as an Aquarium service.
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
-final class AkkaService extends Lifecycle with Loggable {
+final class AkkaService extends AquariumAwareSkeleton with Configurable with Lifecycle with Loggable {
+ @volatile private[this] var _actorSystem: ActorSystem = _
+ @volatile private[this] var _userActorCache: Cache[String, ActorRef] = _
+ @volatile private[this] var _cacheEvictionListener: RemovalListener[String, ActorRef] = _
+ @volatile private[this] var _cacheMaximumSize = 1000
+ @volatile private[this] var _cacheInitialCapacity = 100
+ @volatile private[this] var _cacheConcurrencyLevel = 20
+
+ private[this] val stoppingUserActors = new ConcurrentHashMap[String, Future[Boolean]]
+
+ private[this] val isShuttingDown = new AtomicBoolean(false)
+
+ def propertyPrefix: Option[String] = Some("actors")
+
+ /**
+ * Configure this instance with the provided properties.
+ *
+ * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
+ */
+ def configure(props: KKProps): Unit = {}
+
+ def actorSystem = {
+ if(this._actorSystem eq null) {
+ throw new AquariumInternalError("Akka actorSystem is null")
+ }
+
+ if(this.isShuttingDown.get()) {
+ throw new AquariumException("%s is shutting down".format(shortClassNameOf(this)))
+ }
+
+ this._actorSystem
+ }
+
def start() = {
// We have AKKA builtin, so no need to mess with pre-existing installation.
if(SysEnvs.AKKA_HOME.value.isJust) {
logger.error("%s is set".format(SysEnvs.Names.AKKA_HOME), error)
throw error
}
+
+ this._cacheEvictionListener = new RemovalListener[String, ActorRef] {
+ def onRemoval(rn: RemovalNotification[String, ActorRef]): Unit = {
+ if(isShuttingDown.get()) {
+ return
+ }
+
+ val userID = rn.getKey
+ val actorRef = rn.getValue
+
+ logger.debug("Due to memory constraints, unloading UserActor for userID = %s".format(userID))
+
+ gracefullyStopUserActor(userID, actorRef)
+ }
+ }
+
+ this._userActorCache = CacheBuilder.
+ newBuilder().
+ maximumSize(_cacheMaximumSize).
+ initialCapacity(_cacheInitialCapacity).
+ concurrencyLevel(_cacheConcurrencyLevel).
+ removalListener(this._cacheEvictionListener).
+ build()
+
+ this._actorSystem = ActorSystem("aquarium-akka", ConfigFactory.load("akka.conf"))
+ logger.debug("Created %s".format(this._actorSystem))
}
- def stop()= {
- Actor.registry.shutdownAll()
+ def stop() = {
+ this.isShuttingDown.set(true)
+
+ this.stoppingUserActors.clear()
+
+ this._userActorCache.invalidateAll
+ this._userActorCache.cleanUp()
+
+ this._actorSystem.shutdown()
+
+ logger.info("Shut down %s".format(this._actorSystem))
+ }
+
+ def notifyUserActorPostStop(userActor: UserActor): Unit = {
+ this.stoppingUserActors.remove(userActor.userID)
+ }
+
+ private[this] def gracefullyStopUserActor(userID: String, actorRef: ActorRef): Unit = {
+ this.stoppingUserActors.put(
+ userID,
+ akka.pattern.gracefulStop(actorRef, Duration(1000, TimeUnit.MILLISECONDS))(this._actorSystem)
+ )
+ }
+
+ def invalidateUserActor(userActor: UserActor): Unit = {
+ if(this.isShuttingDown.get()) {
+ return
+ }
+
+ val userID = userActor.userID
+ val actorRef = userActor.self
+
+ this._userActorCache.invalidate(userID)
+ gracefullyStopUserActor(userID, actorRef)
+ }
+
+ def getOrCreateUserActor(userID: String): ActorRef = {
+ if(this.isShuttingDown.get()) {
+ throw new AquariumException(
+ "%s is shutting down. Cannot provide user actor %s".format(
+ shortClassNameOf(this),
+ userID))
+ }
+
+ // If stopping, wait to stop or ignore
+ this.stoppingUserActors.get(userID) match {
+ case null ⇒
+
+ case future ⇒
+ try {
+ Await.result(future, Duration(500, TimeUnit.MILLISECONDS))
+ }
+ catch {
+ case e: Throwable ⇒
+ logger.error("While Await(ing) UserActor %s to stop".format(userID), e)
+ }
+ this.stoppingUserActors.remove(userID)
+ }
+
+ this._userActorCache.get(userID, new Callable[ActorRef] {
+ def call(): ActorRef = {
+ // Create new User Actor instance
+ val actorRef = _actorSystem.actorOf(Props.apply({
+ val actor = aquarium.newInstance(classOf[UserActor], classOf[UserActor].getName)
+ actor.awareOfAquariumEx(AquariumCreatedEvent(aquarium))
+ actor
+ }), "userActor::%s".format(userID))
+
+ // Cache it for subsequent calls
+ _userActorCache.put(userID, actorRef)
+
+ // Send the initialization message
+ actorRef ! InitializeUserState(userID, TimeHelpers.nowMillis())
+
+ actorRef
+ }
+ })
}
}
import java.util.concurrent.{Executors, TimeUnit}
import gr.grnet.aquarium.util.date.TimeHelpers
import org.joda.time.format.ISODateTimeFormat
-import gr.grnet.aquarium.actor.message.{GetUserStateRequest, GetUserBalanceRequest, RouterResponseMessage, RouterRequestMessage}
-import gr.grnet.aquarium.actor.RouterRole
+import gr.grnet.aquarium.actor.message.{UserActorRequestMessage, GetUserStateRequest, GetUserBalanceRequest, UserActorResponseMessage}
import com.ckkloverdos.resource.StreamResource
import com.ckkloverdos.maybe.{Just, Failed}
import gr.grnet.aquarium.event.model.ExternalEventModel
+import akka.util.{Timeout ⇒ ATimeout, Duration ⇒ ADuration}
/**
*
}
final case class MainService() extends Service[THttpRequest, THttpResponse] {
- final class ActorRouterService extends Service[RouterRequestMessage, RouterResponseMessage[_]] {
- def apply(message: RouterRequestMessage): TFuture[RouterResponseMessage[_]] = {
+ final case class UserActorService() extends Service[UserActorRequestMessage, UserActorResponseMessage[_]] {
+ def apply(request: UserActorRequestMessage): TFuture[UserActorResponseMessage[_]] = {
// We want to asynchronously route the message via akka and get the whole computation as a
// twitter future.
- val actorProvider = aquarium.actorProvider
- val router = actorProvider.actorForRole(RouterRole)
- val promise = new TPromise[RouterResponseMessage[_]]()
+ val actorRef = aquarium.akkaService.getOrCreateUserActor(request.userID)
+ val promise = new TPromise[UserActorResponseMessage[_]]()
- val actualWork = router.ask(message)
+ val actualWork = akka.pattern.ask(actorRef, request)(ATimeout(ADuration(500, TimeUnit.MILLISECONDS))).
+ asInstanceOf[TFuture[UserActorResponseMessage[_]]]
- actualWork onComplete { akkaFuture ⇒
- akkaFuture.value match {
- case Some(eitherValue) ⇒
- eitherValue match {
- case Left(throwable) ⇒
- promise.setException(throwable)
-
- case Right(value) ⇒
- promise.setValue(value.asInstanceOf[RouterResponseMessage[_]])
- }
-
- case None ⇒
- promise.setException(new Exception("Got no response for %s".format(message)))
- }
- }
+ actualWork.
+ onSuccess(promise.setValue).
+ onFailure(promise.setException).
+ onCancellation(promise.setException(new Exception("Processing of %s has been cancelled".format(request))))
promise
}
}
- final val actorRouterService = new ActorRouterService
+ final val actorRouterService = UserActorService()
- def callRouter(requestMessage: RouterRequestMessage): TFuture[THttpResponse] = {
+ def callUserActor(requestMessage: UserActorRequestMessage): TFuture[THttpResponse] = {
actorRouterService(requestMessage).transform { tryResponse ⇒
tryResponse match {
- case TReturn(responseMessage: RouterResponseMessage[_]) ⇒
+ case TReturn(responseMessage: UserActorResponseMessage[_]) ⇒
val statusCode = responseMessage.suggestedHTTPStatus
val status = THttpResponseStatus.valueOf(statusCode)
case RESTPaths.UserBalancePath(userID) ⇒
// /user/(.+)/balance/?
- callRouter(GetUserBalanceRequest(userID, millis))
+ callUserActor(GetUserBalanceRequest(userID, millis))
case RESTPaths.UserStatePath(userId) ⇒
// /user/(.+)/state/?
- callRouter(GetUserStateRequest(userId, millis))
+ callUserActor(GetUserStateRequest(userId, millis))
case RESTPaths.ResourcesAquariumPropertiesPath() ⇒
resourceInfoResponse(ResourceLocator.Resources.AquariumPropertiesResource, TEXT_PLAIN)
import com.ckkloverdos.props.Props
import com.google.common.eventbus.Subscribe
-import gr.grnet.aquarium.{AquariumAwareSkeleton, Aquarium, Configurable}
+import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable}
import gr.grnet.aquarium.converter.StdConverters
-import gr.grnet.aquarium.actor.RouterRole
import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle}
import gr.grnet.aquarium.util.sameTags
import gr.grnet.aquarium.service.event.{AquariumCreatedEvent, StoreIsAliveBusEvent, StoreIsDeadBusEvent}
-import gr.grnet.aquarium.connector.rabbitmq.service.{PayloadHandlerFutureExecutor, PayloadHandlerPostNotifier}
+import gr.grnet.aquarium.connector.rabbitmq.service.PayloadHandlerPostNotifier
import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys.RabbitMQConfKeys
import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys
import gr.grnet.aquarium.connector.handler.{SynchronousPayloadHandlerExecutor, ResourceEventPayloadHandler, IMEventPayloadHandler}
/**
+ * The service that is responsible to handle `RabbitMQ` connecrivity.
*
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
def converters = aquarium.converters
- def router = aquarium.actorProvider.actorForRole(RouterRole)
-
/**
* Configure this instance with the provided properties.
*
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.service
-
-import com.ckkloverdos.props.Props
-import akka.actor.ActorRef
-import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable}
-import java.util.concurrent.ConcurrentHashMap
-import gr.grnet.aquarium.util.Loggable
-import gr.grnet.aquarium.actor.message.config.{AquariumPropertiesLoaded, ActorProviderConfigured}
-import gr.grnet.aquarium.actor._
-import gr.grnet.aquarium.service.event.AquariumCreatedEvent
-
-/**
- * All actors are provided locally.
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-class SimpleLocalRoleableActorProviderService
- extends RoleableActorProviderService
- with AquariumAwareSkeleton
- with Configurable
- with Loggable {
-
- private[this] val actorCache = new ConcurrentHashMap[ActorRole, ActorRef]
- @volatile private[this] var _props: Props = _
-
- def propertyPrefix = None
-
- def configure(props: Props): Unit = {
- this._props = props
- }
-
- def start(): Unit = {
- // Start and configure actors
- import SimpleLocalRoleableActorProviderService.RolesToBeStarted
-
- for(role <- RolesToBeStarted) {
- actorForRole(role)
- }
- }
-
- def stop(): Unit = {
- }
-
- private[this] def _newActor(role: ActorRole): ActorRef = {
- val actorFactory = (_class: Class[_ <: RoleableActor]) ⇒ {
- val actor = aquarium.newInstance(_class, _class.getName)
- actor.awareOfAquariumEx(AquariumCreatedEvent(aquarium))
- actor
- }
-
- val actorRef = akka.actor.Actor.actorOf(actorFactory(role.actorType)).start()
-
- val propsMsg = AquariumPropertiesLoaded(this._props)
- if(role.canHandleConfigurationMessage(propsMsg)) {
- actorRef ! propsMsg
- }
-
- val providerMsg = ActorProviderConfigured(this)
- if(role.canHandleConfigurationMessage(providerMsg)) {
- actorRef ! providerMsg
- }
-
- actorRef
- }
-
- private[this] def _fromCacheOrNew(role: ActorRole): ActorRef = synchronized {
- actorCache.get(role) match {
- case null ⇒
- val actorRef = _newActor(role)
- actorCache.put(role, actorRef)
- actorRef
- case actorRef ⇒
- actorRef
- }
- }
-
- @throws(classOf[Exception])
- def actorForRole(role: ActorRole, hints: Props = Props.empty) = synchronized {
- if(role.isCacheable) {
- _fromCacheOrNew(role)
- } else {
- _newActor(role)
- }
- }
-
- override def toString = gr.grnet.aquarium.util.shortClassNameOf(this)
-}
-
-object SimpleLocalRoleableActorProviderService {
- // Always set Router at the end.
- // We could definitely use some automatic dependency sorting here (topological sorting anyone?)
- final val RolesToBeStarted = List(RouterRole)
-
- lazy val ActorClassByRole: Map[ActorRole, Class[_ <: RoleableActor]] =
- RolesToBeStarted map {
- role ⇒
- (role, role.actorType)
- } toMap
-
- lazy val ActorRefByRole: Map[ActorRole, ActorRef] =
- ActorClassByRole map {
- case (role, clazz) ⇒
- (role, akka.actor.Actor.actorOf(clazz).start())
- }
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.uid
-
-import com.eaio.uuid.UUID
-
-/**
- * [[gr.grnet.aquarium.uid.UIDGenerator]] based on [[com.eaio.uuid.UUID]].
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>
- */
-
-object EAIOUUIDGenerator extends UIDGenerator[UUID] {
- def nextUIDObject() = new UUID()
-}
akka {
- version = "1.3.1"
-
+ version = "2.0.2"
+
event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
- event-handler-level = "WARNING" # Options: ERROR, WARNING, INFO, DEBUG
+
+ # Log level used by the configured loggers (see "event-handlers") as soon
+ # as they have been started; before that, see "stdout-loglevel"
+ # Options: ERROR, WARNING, INFO, DEBUG
+ loglevel = "INFO"
+
+ # Log level for the very basic logger activated during AkkaApplication startup
+ # Options: ERROR, WARNING, INFO, DEBUG
+ stdout-loglevel = "WARNING"
+
+ # Log the complete configuration at INFO level when the actor system is started.
+ # This is useful when you are uncertain of what configuration is used.
+ log-config-on-start = off
+
+ # Toggles whether the threads created by this ActorSystem should be daemons or not
+ daemonic = on
actor {
- # Default throughput for all ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness
- throughput = 1
+ default-dispatcher {
+ # Throughput defines the number of messages that are processed in a batch
+ # before the thread is returned to the pool. Set to 1 for as fair as possible.
+ throughput = 1
+ }
}
}
\ No newline at end of file
# How often do we attempt a reconnection to the store(s)?
anystore.reconnect.period.millis=1000
-#######
-# DO NOT TOUCH the following options, unless you know what you are doing
-#######
-
-# Actor subsystem
-actor.provider.class=gr.grnet.aquarium.service.SimpleLocalRoleableActorProviderService
# Class that initializes the REST service
rest.service.class=gr.grnet.aquarium.service.FinagleRESTService
rest.shutdown.timeout.millis=2000
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.processor.actor
-
-import akka.actor.Actor
-
-/**
- * A proxy for a remote actor.
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-abstract class ActorProxy(name: String, host: String = Constants.RemoteHost, port: Int = Constants.RemotePort) extends Actor {
- private[this] lazy val remote = Actor.remote.actorFor(name, host, port)
-
- def receive = {
- case message =>
- remote forward message
- }
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.processor.actor
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-object Constants {
- val RemoteHost = "localhost"
- val RemotePort = 2552
- val LocalHost = "localhost"
- val LocalPort = 2551
- val ActorNameEcho = "echo"
- val ActorNameSilent = "silent"
-}
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.processor.actor
-
-import org.junit.Test
-import org.junit.Assert._
-import org.junit.Assume.assumeTrue
-
-import akka.actor.Actor
-import java.lang.Object
-
-//import akka.actor.Actor.
-
-
-class EchoActor extends Actor {
- def receive = {
- case message =>
- println("%s received: %s".format(this, message))
- //self.reply("REPLY from EchoActor for '%s'".format(message))
- }
-}
-
-class SilentActor extends Actor {
- def receive = {
- case message =>
- }
-}
-
-class EchoProxy extends ActorProxy("echo")
-class SilentProxy extends ActorProxy("silent")
-
-object ClientPart {
- import Constants._
-
- def start(): Unit = Actor.remote.start(LocalHost, LocalPort)
- def stop(): Unit = Actor.remote.shutdownClientModule()
-
- val echo = Actor.actorOf[EchoProxy].start()
- val silent = Actor.actorOf[SilentProxy].start()
-}
-
-object ServerPart {
- import Constants._
-
- def start(): Unit = {
- Actor.remote.start(RemoteHost, RemotePort)
-
- Actor.remote.register(ActorNameEcho, Actor.actorOf[EchoActor])
- Actor.remote.register(ActorNameSilent, Actor.actorOf[SilentActor])
- }
-
- def stop(): Unit = Actor.remote.shutdownServerModule()
-}
-
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-class RemoteActorTest {
- @Test
- def testSendMessage: Unit = {
- try {
- ServerPart.start()
- ClientPart.start()
-
- ClientPart.echo ! "one"
- ClientPart.echo ! "two"
- ClientPart.echo ! "three"
-
- // Give us some delay to print to the console...
- Thread.sleep(100)
- }
- finally {
- ServerPart.stop()
- ClientPart.stop()
- }
- }
-}
\ No newline at end of file