Upgrade Akka to 2.0.2
authorChristos KK Loverdos <loverdos@gmail.com>
Thu, 28 Jun 2012 16:02:07 +0000 (19:02 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Thu, 28 Jun 2012 16:02:07 +0000 (19:02 +0300)
In the process, cleaned up code and make a few changes for more robust
actor cache handling.

33 files changed:
pom.xml
src/main/resources/akka.conf
src/main/scala/gr/grnet/aquarium/Aquarium.scala
src/main/scala/gr/grnet/aquarium/AquariumBuilder.scala
src/main/scala/gr/grnet/aquarium/actor/ActorRole.scala
src/main/scala/gr/grnet/aquarium/actor/message/GetUserBalanceRequest.scala
src/main/scala/gr/grnet/aquarium/actor/message/GetUserBalanceResponse.scala
src/main/scala/gr/grnet/aquarium/actor/message/GetUserStateRequest.scala
src/main/scala/gr/grnet/aquarium/actor/message/GetUserStateResponse.scala
src/main/scala/gr/grnet/aquarium/actor/message/RouterRequestMessage.scala [deleted file]
src/main/scala/gr/grnet/aquarium/actor/message/UserActorRequestMessage.scala
src/main/scala/gr/grnet/aquarium/actor/message/UserActorResponseMessage.scala [moved from src/main/scala/gr/grnet/aquarium/actor/message/RouterResponseMessage.scala with 93% similarity]
src/main/scala/gr/grnet/aquarium/actor/message/admin/PingAllRequest.scala [deleted file]
src/main/scala/gr/grnet/aquarium/actor/message/admin/PingAllResponse.scala [deleted file]
src/main/scala/gr/grnet/aquarium/actor/message/config/ActorProviderConfigured.scala [deleted file]
src/main/scala/gr/grnet/aquarium/actor/message/event/ProcessIMEvent.scala
src/main/scala/gr/grnet/aquarium/actor/message/event/ProcessResourceEvent.scala
src/main/scala/gr/grnet/aquarium/actor/service/router/RouterActor.scala [deleted file]
src/main/scala/gr/grnet/aquarium/actor/service/user/UserActor.scala
src/main/scala/gr/grnet/aquarium/actor/service/user/UserActorCache.scala [deleted file]
src/main/scala/gr/grnet/aquarium/connector/handler/IMEventPayloadHandler.scala
src/main/scala/gr/grnet/aquarium/connector/handler/ResourceEventPayloadHandler.scala
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/PayloadHandlerFutureExecutor.scala [deleted file]
src/main/scala/gr/grnet/aquarium/service/AkkaService.scala
src/main/scala/gr/grnet/aquarium/service/FinagleRESTService.scala
src/main/scala/gr/grnet/aquarium/service/RabbitMQService.scala
src/main/scala/gr/grnet/aquarium/service/SimpleLocalRoleableActorProviderService.scala [deleted file]
src/main/scala/gr/grnet/aquarium/uid/EAIOUUIDGenerator.scala [deleted file]
src/test/resources/akka.conf
src/test/resources/aquarium.properties
src/test/scala/gr/grnet/aquarium/processor/actor/ActorProxy.scala [deleted file]
src/test/scala/gr/grnet/aquarium/processor/actor/Constants.scala [deleted file]
src/test/scala/gr/grnet/aquarium/processor/actor/RemoteActorTest.scala [deleted file]

diff --git a/pom.xml b/pom.xml
index 3478dbe..6b4e7c7 100644 (file)
--- a/pom.xml
+++ b/pom.xml
     </dependency>
 
     <dependency>
-      <groupId>se.scalablesolutions.akka</groupId>
+      <groupId>com.typesafe.akka</groupId>
       <artifactId>akka-actor</artifactId>
-      <version>1.3.1</version>
-      <exclusions>
-        <exclusion>
-          <artifactId>scala-library</artifactId>
-          <groupId>org.scala-lang</groupId>
-        </exclusion>
-      </exclusions>
+      <version>2.0.2</version>
     </dependency>
 
     <dependency>
-      <groupId>se.scalablesolutions.akka</groupId>
+      <groupId>com.typesafe.akka</groupId>
       <artifactId>akka-remote</artifactId>
-      <version>1.3.1</version>
-      <exclusions>
-        <exclusion>
-          <artifactId>sjson_2.9.0</artifactId>
-          <groupId>net.debasishg</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>akka-stm</artifactId>
-          <groupId>se.scalablesolutions.akka</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>h2-lzf</artifactId>
-          <groupId>voldemort.store.compress</groupId>
-        </exclusion>
-        <exclusion>
-          <artifactId>akka-typed-actor</artifactId>
-          <groupId>se.scalablesolutions.akka</groupId>
-        </exclusion>
-      </exclusions>
+      <version>2.0.2</version>
     </dependency>
 
     <dependency>
-      <groupId>se.scalablesolutions.akka</groupId>
+      <groupId>com.typesafe.akka</groupId>
       <artifactId>akka-slf4j</artifactId>
-      <version>1.3.1</version>
+      <version>2.0.2</version>
     </dependency>
 <!--
     <dependency>
index f801921..2468e15 100644 (file)
@@ -1,11 +1,29 @@
 akka {
-  version = "1.3.1"
+  version = "2.0.2"
 
   event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
-  event-handler-level = "WARNING" # Options: ERROR, WARNING, INFO, DEBUG
+
+  # Log level used by the configured loggers (see "event-handlers") as soon
+  # as they have been started; before that, see "stdout-loglevel"
+  # Options: ERROR, WARNING, INFO, DEBUG
+  loglevel = "INFO"
+
+  # Log level for the very basic logger activated during AkkaApplication startup
+  # Options: ERROR, WARNING, INFO, DEBUG
+  stdout-loglevel = "WARNING"
+
+  # Log the complete configuration at INFO level when the actor system is started.
+  # This is useful when you are uncertain of what configuration is used.
+  log-config-on-start = off
+
+  # Toggles whether the threads created by this ActorSystem should be daemons or not
+  daemonic = on
 
   actor {
-    # Default throughput for all ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness
-    throughput = 1
+    default-dispatcher {
+      # Throughput defines the number of messages that are processed in a batch
+      # before the thread is returned to the pool. Set to 1 for as fair as possible.
+      throughput = 1
+    }
   }
 }
\ No newline at end of file
index d6105e3..bba0fe1 100644 (file)
@@ -265,7 +265,7 @@ final class Aquarium(env: Env) extends Lifecycle with Loggable {
 
   def converters = apply(EnvKeys.converters)
 
-  def actorProvider = apply(EnvKeys.actorProvider)
+//  def actorProvider = apply(EnvKeys.actorProvider)
 
   def saveResourceEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveRCEvents)
 
@@ -275,6 +275,8 @@ final class Aquarium(env: Env) extends Lifecycle with Loggable {
 
   def restPort = apply(EnvKeys.restPort)
 
+  def akkaService = apply(EnvKeys.akkaService)
+
   def version = apply(EnvKeys.version)
 }
 
@@ -305,7 +307,6 @@ object Aquarium {
   final val ServiceKeys: List[TypedKey[_ <: Lifecycle]] = List(
     EnvKeys.timerService,
     EnvKeys.akkaService,
-    EnvKeys.actorProvider,
     EnvKeys.eventBus,
     EnvKeys.restService,
     EnvKeys.rabbitMQService,
@@ -403,8 +404,8 @@ object Aquarium {
      * The fully qualified name of the class that implements the `RoleableActorProviderService`.
      * Will be instantiated reflectively and should have a public default constructor.
      */
-    final val actorProvider: TypedKey[RoleableActorProviderService] =
-      new AquariumEnvKey[RoleableActorProviderService]("actor.provider.class")
+//    final val actorProvider: TypedKey[RoleableActorProviderService] =
+//      new AquariumEnvKey[RoleableActorProviderService]("actor.provider.class")
 
     final val akkaService: TypedKey[AkkaService] =
       new AquariumEnvKey[AkkaService]("akka.service")
index f8dd65e..8603801 100644 (file)
@@ -390,10 +390,6 @@ final class AquariumBuilder(val originalProps: Props) extends Loggable {
       newInstance(envKey.keyType, classOf[StoreWatcherService].getName)
     }
 
-    checkPropsOverride(EnvKeys.actorProvider) { (envKey, propValue) ⇒
-      newInstance(envKey.keyType, propValue)
-    }
-
     checkPropsOverride(EnvKeys.userStateTimestampThreshold) { (envKey, propValue) ⇒
       propValue.toLong
     }
index ca889b2..22a634e 100644 (file)
  */
 package gr.grnet.aquarium.actor
 
-import service.router.RouterActor
 import service.user.{UserActor}
 import gr.grnet.aquarium.actor.message.event.{ProcessIMEvent, ProcessResourceEvent}
-import gr.grnet.aquarium.actor.message.admin.PingAllRequest
 import gr.grnet.aquarium.actor.message.{GetUserStateRequest, GetUserBalanceRequest}
-import gr.grnet.aquarium.actor.message.config.{InitializeUserState, AquariumPropertiesLoaded, ActorProviderConfigured, ActorConfigurationMessage}
+import gr.grnet.aquarium.actor.message.config.{InitializeUserState, AquariumPropertiesLoaded, ActorConfigurationMessage}
 
 /**
  * Each actor within Aquarium plays one role.
@@ -82,21 +80,6 @@ sealed abstract class ActorRole(
 }
 
 /**
- * The generic router.
- */
-case object RouterRole
-    extends ActorRole("RouterRole",
-                      true,
-                      classOf[RouterActor],
-                      Set(classOf[GetUserBalanceRequest],
-                          classOf[GetUserStateRequest],
-                          classOf[ProcessResourceEvent],
-                          classOf[ProcessIMEvent],
-                          classOf[PingAllRequest]),
-                      Set(classOf[ActorProviderConfigured],
-                          classOf[AquariumPropertiesLoaded]))
-
-/**
  * User-oriented business logic handler role.
  */
 case object UserActorRole
@@ -108,5 +91,4 @@ case object UserActorRole
                           classOf[GetUserBalanceRequest],
                           classOf[GetUserStateRequest]),
                       Set(classOf[InitializeUserState],
-                          classOf[ActorProviderConfigured],
                           classOf[AquariumPropertiesLoaded]))
index 9fcf6e1..0ada850 100644 (file)
@@ -40,10 +40,6 @@ package gr.grnet.aquarium.actor.message
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 
-case class GetUserBalanceRequest(userID: String, timestamp: Long)
-extends ActorMessage
-   with RouterRequestMessage
-   with UserActorRequestMessage {
-
+case class GetUserBalanceRequest(userID: String, timestamp: Long) extends ActorMessage with UserActorRequestMessage {
   def referenceTimeMillis = timestamp
 }
index 47cf87d..9b29c23 100644 (file)
@@ -35,6 +35,8 @@
 
 package gr.grnet.aquarium.actor.message
 
+import gr.grnet.aquarium.AquariumInternalError
+
 /**
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>
@@ -43,6 +45,14 @@ package gr.grnet.aquarium.actor.message
 case class GetUserBalanceResponse(
     balance: Either[String, GetUserBalanceResponseData],
     override val suggestedHTTPStatus: Int = 200)
-extends RouterResponseMessage(balance, suggestedHTTPStatus)
+extends UserActorResponseMessage(balance, suggestedHTTPStatus) {
+  def userID = balance match {
+    case Left(error) ⇒
+      throw new AquariumInternalError("Could not obtain userID. %s".format(error))
+
+    case Right(data) ⇒
+      data.userID
+  }
+}
 
 case class GetUserBalanceResponseData(userID: String, balance: Double)
index 1086df2..0e5dbc7 100644 (file)
@@ -40,10 +40,6 @@ package gr.grnet.aquarium.actor.message
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 
-case class GetUserStateRequest(userID: String, timestamp: Long)
-extends ActorMessage
-  with RouterRequestMessage
-  with UserActorRequestMessage {
-
+case class GetUserStateRequest(userID: String, timestamp: Long) extends ActorMessage with UserActorRequestMessage {
   def referenceTimeMillis = timestamp
 }
index b506e49..119ae79 100644 (file)
@@ -36,6 +36,7 @@
 package gr.grnet.aquarium.actor.message
 
 import gr.grnet.aquarium.computation.state.UserState
+import gr.grnet.aquarium.AquariumInternalError
 
 /**
  *
@@ -45,5 +46,13 @@ import gr.grnet.aquarium.computation.state.UserState
 case class GetUserStateResponse(
     state: Either[String, UserState],
     override val suggestedHTTPStatus: Int = 200)
-extends RouterResponseMessage(state, suggestedHTTPStatus)
+extends UserActorResponseMessage(state, suggestedHTTPStatus) {
+  def userID = state match {
+    case Left(error) ⇒
+      throw new AquariumInternalError("Could not obtain userID. %s".format(error))
+
+    case Right(data) ⇒
+      data.userID
+  }
+}
 
diff --git a/src/main/scala/gr/grnet/aquarium/actor/message/RouterRequestMessage.scala b/src/main/scala/gr/grnet/aquarium/actor/message/RouterRequestMessage.scala
deleted file mode 100644 (file)
index d4a1607..0000000
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- *   1. Redistributions of source code must retain the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer in the documentation and/or other materials
- *      provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.actor.message
-
-/**
- * Request message sent to [[gr.grnet.aquarium.actor.service.router.RouterActor]].
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>
- */
-
-
-trait RouterRequestMessage extends ActorMessage
index 1b327d6..67aa662 100644 (file)
@@ -42,5 +42,7 @@ package gr.grnet.aquarium.actor.message
  */
 
 trait UserActorRequestMessage extends ActorMessage {
+  def userID: String
+
   def referenceTimeMillis: Long
 }
@@ -38,12 +38,12 @@ package gr.grnet.aquarium.actor.message
 import gr.grnet.aquarium.converter.{StdConverters, PrettyJsonTextFormat}
 
 /**
- * Response message sent as a reply by [[gr.grnet.aquarium.actor.service.router.RouterActor]].
+ * Response message sent as a reply by the [[gr.grnet.aquarium.actor.service.user.UserActor]].
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 
-abstract class RouterResponseMessage[T](
+abstract class UserActorResponseMessage[T](
     val response: Either[String, T],
     val suggestedHTTPStatus: Int = 200)
 extends ActorMessage {
diff --git a/src/main/scala/gr/grnet/aquarium/actor/message/admin/PingAllRequest.scala b/src/main/scala/gr/grnet/aquarium/actor/message/admin/PingAllRequest.scala
deleted file mode 100644 (file)
index 13d0ad9..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- *   1. Redistributions of source code must retain the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer in the documentation and/or other materials
- *      provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.actor.message.admin
-
-import gr.grnet.aquarium.actor.message.{RouterRequestMessage, ActorMessage}
-
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>
- */
-
-case class PingAllRequest() extends ActorMessage with RouterRequestMessage
diff --git a/src/main/scala/gr/grnet/aquarium/actor/message/admin/PingAllResponse.scala b/src/main/scala/gr/grnet/aquarium/actor/message/admin/PingAllResponse.scala
deleted file mode 100644 (file)
index 1f564a7..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- *   1. Redistributions of source code must retain the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer in the documentation and/or other materials
- *      provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.actor.message.admin
-
-import gr.grnet.aquarium.actor.message.{RouterResponseMessage, RouterRequestMessage, ActorMessage}
-
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>
- */
-
-case class PingAllResponse() extends RouterResponseMessage(Right(""))
diff --git a/src/main/scala/gr/grnet/aquarium/actor/message/config/ActorProviderConfigured.scala b/src/main/scala/gr/grnet/aquarium/actor/message/config/ActorProviderConfigured.scala
deleted file mode 100644 (file)
index 29b082f..0000000
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- *   1. Redistributions of source code must retain the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer in the documentation and/or other materials
- *      provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.actor.message
-package config
-
-import gr.grnet.aquarium.service.RoleableActorProviderService
-
-/**
- * Sent with the configured [[gr.grnet.aquarium.service.RoleableActorProviderService]].
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>
- */
-
-case class ActorProviderConfigured(actorProvider: RoleableActorProviderService) extends ActorConfigurationMessage
\ No newline at end of file
index 4f7eee0..348c2ed 100644 (file)
@@ -39,8 +39,7 @@ import gr.grnet.aquarium.actor.message.{UserActorRequestMessage, ActorMessage}
 import gr.grnet.aquarium.event.model.im.IMEventModel
 
 /**
- * [[gr.grnet.aquarium.actor.service.router.RouterActor]] message
- * that triggers the user event processing pipeline.
+ * A message that triggers the user event processing pipeline.
  *
  * Note that the prefix `Process` means that no reply is created or needed.
  *
@@ -48,5 +47,7 @@ import gr.grnet.aquarium.event.model.im.IMEventModel
  */
 
 case class ProcessIMEvent(imEvent: IMEventModel) extends ActorMessage with UserActorRequestMessage {
+  def userID = imEvent.userID
+
   def referenceTimeMillis = imEvent.occurredMillis
 }
index 63a7447..190c85d 100644 (file)
@@ -39,16 +39,14 @@ import gr.grnet.aquarium.actor.message.{UserActorRequestMessage, ActorMessage}
 import gr.grnet.aquarium.event.model.resource.ResourceEventModel
 
 /**
- * [[gr.grnet.aquarium.actor.service.router.RouterActor]] message
- * that triggers the resource event processing pipeline.
+ * A message that triggers the resource event processing pipeline.
  *
  * Note that the prefix `Process` means that no reply is created or needed.
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
-case class ProcessResourceEvent(rcEvent: ResourceEventModel)
-extends ActorMessage
-   with UserActorRequestMessage {
+case class ProcessResourceEvent(rcEvent: ResourceEventModel) extends ActorMessage with UserActorRequestMessage {
+  def userID = rcEvent.userID
 
   def referenceTimeMillis = rcEvent.occurredMillis
 }
diff --git a/src/main/scala/gr/grnet/aquarium/actor/service/router/RouterActor.scala b/src/main/scala/gr/grnet/aquarium/actor/service/router/RouterActor.scala
deleted file mode 100644 (file)
index f4bb848..0000000
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- *   1. Redistributions of source code must retain the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer in the documentation and/or other materials
- *      provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.actor
-package service
-package router
-
-import gr.grnet.aquarium.service.RoleableActorProviderService
-import akka.actor.ActorRef
-import user.{UserActorCache}
-import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
-import gr.grnet.aquarium.actor.message.admin.PingAllRequest
-import gr.grnet.aquarium.actor.message.{UserActorRequestMessage, GetUserStateRequest, GetUserBalanceRequest}
-import gr.grnet.aquarium.{AquariumException, AquariumInternalError}
-import gr.grnet.aquarium.actor.message.config.{InitializeUserState, AquariumPropertiesLoaded, ActorProviderConfigured}
-import gr.grnet.aquarium.util.{LogHelpers, shortClassNameOf}
-
-/**
- * Business logic router. Incoming messages are routed to appropriate destinations. Replies are routed back
- * appropriately.
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-class RouterActor extends ReflectiveRoleableActor {
-  private[this] var _actorProvider: RoleableActorProviderService = _
-
-  def role = RouterRole
-
-  private[this] def _launchUserActor(userID: String, referenceTimeMillis: Long): ActorRef = {
-    // create a fresh instance
-    val userActor = _actorProvider.actorForRole(UserActorRole)
-    UserActorCache.put(userID, userActor)
-
-    userActor ! InitializeUserState(userID, referenceTimeMillis)
-
-    userActor
-  }
-
-  private[this] def _findOrCreateUserActor(userID: String, referenceTimeMillis: Long): ActorRef = {
-    UserActorCache.get(userID) match {
-      case Some(userActorRef) ⇒
-        userActorRef
-
-      case None ⇒
-        _launchUserActor(userID, referenceTimeMillis)
-    }
-  }
-
-  private[this] def _forwardToUserActor(userID: String, m: UserActorRequestMessage): Unit = {
-    _findOrCreateUserActor(userID, m.referenceTimeMillis) forward m
-  }
-
-
-  /**
-   * Handles an exception that occurred while servicing a message.
-   *
-   * @param t
-   * The exception.
-   * @param servicingMessage
-   * The message that was being served while the exception happened.
-   * Note that the message can be `null`, in which case the exception
-   * is an NPE.
-   */
-  override protected def onThrowable(t: Throwable, servicingMessage: AnyRef) = {
-    LogHelpers.logChainOfCauses(logger, t)
-
-    def logIgnore(e: Throwable) = {
-      logger.error("Ignoring %s".format(shortClassNameOf(e)), e)
-    }
-
-    // We ignore everything except serious errors.
-    t match {
-      case e: Error ⇒
-        throw e
-
-      case e: AquariumInternalError ⇒
-        logIgnore(e)
-
-      case e: AquariumException ⇒
-        logIgnore(e)
-
-      case e: Throwable ⇒
-        logIgnore(e)
-    }
-  }
-
-  def onAquariumPropertiesLoaded(m: AquariumPropertiesLoaded): Unit = {
-    logger.info("Configured with {}", shortClassNameOf(m))
-  }
-
-  def onActorProviderConfigured(m: ActorProviderConfigured): Unit = {
-    this._actorProvider = m.actorProvider
-    logger.info("Configured with {}", shortClassNameOf(m))
-  }
-
-  def onProcessIMEvent(m: ProcessIMEvent): Unit = {
-     _forwardToUserActor(m.imEvent.userID, m)
-  }
-
-  def onGetUserBalanceRequest(m: GetUserBalanceRequest): Unit = {
-    _forwardToUserActor(m.userID, m)
-  }
-
-  def onGetUserStateRequest(m: GetUserStateRequest): Unit = {
-    _forwardToUserActor(m.userID, m)
-  }
-
-  def onProcessResourceEvent(m: ProcessResourceEvent): Unit = {
-    _forwardToUserActor(m.rcEvent.userID, m)
-  }
-
-  def onPingAllRequest(m: PingAllRequest): Unit = {
-  }
-
-  override def postStop = {
-    UserActorCache.stop
-  }
-}
\ No newline at end of file
index 019c179..1f819bb 100644 (file)
@@ -39,9 +39,8 @@ package user
 
 import gr.grnet.aquarium.actor._
 
-import akka.config.Supervision.Temporary
 import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
-import gr.grnet.aquarium.actor.message.config.{InitializeUserState, ActorProviderConfigured, AquariumPropertiesLoaded}
+import gr.grnet.aquarium.actor.message.config.{InitializeUserState, AquariumPropertiesLoaded}
 import gr.grnet.aquarium.util.date.TimeHelpers
 import gr.grnet.aquarium.event.model.im.IMEventModel
 import gr.grnet.aquarium.actor.message.{GetUserStateResponse, GetUserBalanceResponseData, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
@@ -58,26 +57,34 @@ import gr.grnet.aquarium.computation.state.{UserStateBootstrap, UserState}
  */
 
 class UserActor extends ReflectiveRoleableActor {
-  private[this] var _userID: String = "<?>"
-  private[this] var _imState: IMStateSnapshot = _
-  private[this] var _userState: UserState = _
-  private[this] var _latestResourceEventOccurredMillis = TimeHelpers.nowMillis() // some valid datetime
+  @volatile private[this] var _userID: String = "<?>"
+  @volatile private[this] var _imState: IMStateSnapshot = _
+  @volatile private[this] var _userState: UserState = _
+  @volatile private[this] var _latestResourceEventOccurredMillis = TimeHelpers.nowMillis() // some valid datetime
+
+  def userID = {
+    if(this._userID eq null) {
+      throw new AquariumInternalError("%s not initialized ")
+    }
 
-  self.lifeCycle = Temporary
+    this._userID
+  }
 
-  private[this] def _shutmedown(): Unit = {
-    if(haveUserState) {
-      UserActorCache.invalidate(_userID)
-    }
+  override def postStop() {
+    aquarium.akkaService.notifyUserActorPostStop(this)
+  }
 
-    self.stop()
+  private[this] def shutmedown(): Unit = {
+    if(haveIMState) {
+      aquarium.akkaService.invalidateUserActor(this)
+    }
   }
 
   override protected def onThrowable(t: Throwable, message: AnyRef) = {
     LogHelpers.logChainOfCauses(logger, t)
     ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
 
-    _shutmedown()
+    shutmedown()
   }
 
   def role = UserActorRole
@@ -103,9 +110,6 @@ class UserActor extends ReflectiveRoleableActor {
   def onAquariumPropertiesLoaded(event: AquariumPropertiesLoaded): Unit = {
   }
 
-  def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
-  }
-
   private[this] def _updateIMStateRoleHistory(imEvent: IMEventModel, roleCheck: Option[String]) = {
     if(haveIMState) {
       val (newState,
@@ -364,12 +368,12 @@ class UserActor extends ReflectiveRoleableActor {
         this._imState.hasBeenActivated match {
           case true ⇒
             // (have IMState, activated, have UserState)
-            self reply GetUserBalanceResponse(Right(GetUserBalanceResponseData(userID, this._userState.totalCredits)))
+            sender ! GetUserBalanceResponse(Right(GetUserBalanceResponseData(userID, this._userState.totalCredits)))
 
           case false ⇒
             // (have IMState, not activated, have UserState)
             // Since we have user state, we should have been activated
-            self reply GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
+            sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0001]"), 500)
         }
 
       case (true, false) ⇒
@@ -378,31 +382,31 @@ class UserActor extends ReflectiveRoleableActor {
           case true  ⇒
             // (have IMState, activated, no UserState)
             // Since we are activated, we should have some state.
-            self reply GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0002]"), 500)
+            sender ! GetUserBalanceResponse(Left("Internal Server Error [AQU-BAL-0002]"), 500)
           case false ⇒
             // (have IMState, not activated, no UserState)
             // The user is virtually unknown
-            self reply GetUserBalanceResponse(Left("User %s not activated [AQU-BAL-0003]".format(userID)), 404 /*Not found*/)
+            sender ! GetUserBalanceResponse(Left("User %s not activated [AQU-BAL-0003]".format(userID)), 404 /*Not found*/)
         }
 
       case (false, true) ⇒
         // (no IMState, have UserState)
         // A bit ridiculous situation
-        self reply GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
+        sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0004]".format(userID)), 404/*Not found*/)
 
       case (false, false) ⇒
         // (no IMState, no UserState)
-        self reply GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0005]".format(userID)), 404/*Not found*/)
+        sender ! GetUserBalanceResponse(Left("Unknown user %s [AQU-BAL-0005]".format(userID)), 404/*Not found*/)
     }
   }
 
   def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
     haveUserState match {
       case true ⇒
-        self reply GetUserStateResponse(Right(this._userState))
+        sender ! GetUserStateResponse(Right(this._userState))
 
       case false ⇒
-        self reply GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
+        sender ! GetUserStateResponse(Left("No state for user %s [AQU-STA-0006]".format(event.userID)), 404)
     }
   }
 
diff --git a/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActorCache.scala b/src/main/scala/gr/grnet/aquarium/actor/service/user/UserActorCache.scala
deleted file mode 100644 (file)
index 59ca2a0..0000000
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- *   1. Redistributions of source code must retain the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer in the documentation and/or other materials
- *      provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.actor
-package service
-package user
-
-import akka.actor.ActorRef
-import gr.grnet.aquarium.util.{Loggable, Lifecycle}
-import com.google.common.cache._
-
-/**
- * An actor cache implementation using Guava.
- *
- * @author Georgios Gousios <gousiosg@gmail.com>
- */
-object UserActorCache extends Lifecycle with Loggable {
-
-  private lazy val cache: Cache[String, ActorRef] =
-    CacheBuilder.newBuilder()
-      .maximumSize(1000)
-      .initialCapacity(100)
-      .concurrencyLevel(20)
-      .removalListener(EvictionListener)
-      .build()
-
-  private[this] object EvictionListener extends RemovalListener[String, ActorRef] with Loggable {
-
-    def onRemoval(rn: RemovalNotification[String, ActorRef]) {
-      val userID = rn.getKey
-      val userActor = rn.getValue
-
-      logger.debug("Parking UserActor for userID = %s".format(userID))
-      // Check this is received after any currently servicing business logic message.
-      userActor.stop()
-    }
-  }
-
-  def start() = {
-  }
-
-  def stop() = {
-    cache.invalidateAll
-    cache.cleanUp
-  }
-
-  def put(userID: String, userActor: ActorRef): Unit = {
-    cache.put(userID, userActor)
-  }
-
-  def get(userID: String): Option[ActorRef] =
-    cache.getIfPresent(userID) match {
-      case null ⇒ None
-      case actorRef ⇒ Some(actorRef)
-    }
-
-  def invalidate(userID: String): Unit = {
-    cache.invalidate(userID)
-  }
-}
index f94be79..e9913e7 100644 (file)
@@ -38,7 +38,6 @@ package gr.grnet.aquarium.connector.handler
 import gr.grnet.aquarium.Aquarium
 import org.slf4j.Logger
 import gr.grnet.aquarium.converter.JsonTextFormat
-import gr.grnet.aquarium.actor.RouterRole
 import gr.grnet.aquarium.store.{IMEventStore, LocalFSEventStore}
 import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
 import gr.grnet.aquarium.actor.message.event.ProcessIMEvent
@@ -187,6 +186,6 @@ class IMEventPayloadHandler(aquarium: Aquarium, logger: Logger)
 
       // forwardAction: S ⇒ Unit
       imEvent ⇒ {
-        aquarium.actorProvider.actorForRole(RouterRole) ! ProcessIMEvent(imEvent)
+        aquarium.akkaService.getOrCreateUserActor(imEvent.userID) ! ProcessIMEvent(imEvent)
       }
     )
index 7ad47c3..3468acd 100644 (file)
@@ -41,7 +41,6 @@ import gr.grnet.aquarium.converter.JsonTextFormat
 import gr.grnet.aquarium.Aquarium
 import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
 import gr.grnet.aquarium.actor.message.event.ProcessResourceEvent
-import gr.grnet.aquarium.actor.RouterRole
 import gr.grnet.aquarium.util._
 
 /**
@@ -122,6 +121,6 @@ class ResourceEventPayloadHandler(aquarium: Aquarium, logger: Logger)
 
       // forwardAction: S ⇒ Unit
       rcEvent ⇒ {
-        aquarium.actorProvider.actorForRole(RouterRole) ! ProcessResourceEvent(rcEvent)
+        aquarium.akkaService.getOrCreateUserActor(rcEvent.userID) ! ProcessResourceEvent(rcEvent)
       }
     )
diff --git a/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/PayloadHandlerFutureExecutor.scala b/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/PayloadHandlerFutureExecutor.scala
deleted file mode 100644 (file)
index cf61d78..0000000
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- *   1. Redistributions of source code must retain the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer in the documentation and/or other materials
- *      provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.connector.rabbitmq.service
-
-import akka.dispatch.Future
-import gr.grnet.aquarium.connector.handler.{PayloadHandler, HandlerResult, PayloadHandlerExecutor}
-
-/**
- * An [[gr.grnet.aquarium.connector.handler.PayloadHandlerExecutor]] that uses `Akka` futures.
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>
- */
-
-final class PayloadHandlerFutureExecutor extends PayloadHandlerExecutor {
-
-  def exec(payload: Array[Byte], handler: PayloadHandler)
-          (onSuccess: HandlerResult ⇒ Any)
-          (onError: Throwable ⇒ Any): Unit = {
-
-    val result = Future { handler.handlePayload(payload) }
-
-    result.onComplete { futureHandlerResult ⇒
-      futureHandlerResult.value.get match {
-        case Left(e) ⇒
-          onError(e)
-
-        case Right(handlerResult) ⇒
-          onSuccess(handlerResult)
-      }
-    }
-  }
-}
index a7b0f42..8a161fb 100644 (file)
 
 package gr.grnet.aquarium.service
 
-import akka.actor.Actor
-import gr.grnet.aquarium.util.{Loggable, Lifecycle}
+import akka.actor.{Props, ActorRef, ActorSystem}
+import gr.grnet.aquarium.util.{Loggable, Lifecycle, shortClassNameOf}
 import gr.grnet.aquarium.ResourceLocator.SysEnvs
-import gr.grnet.aquarium.AquariumInternalError
+import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable, AquariumException, AquariumInternalError}
+import com.typesafe.config.ConfigFactory
+import java.util.concurrent.atomic.AtomicBoolean
+import com.google.common.cache.{RemovalNotification, RemovalListener, CacheBuilder, Cache}
+import com.ckkloverdos.props.{Props ⇒ KKProps}
+import gr.grnet.aquarium.actor.service.user.UserActor
+import gr.grnet.aquarium.service.event.AquariumCreatedEvent
+import gr.grnet.aquarium.actor.message.config.InitializeUserState
+import gr.grnet.aquarium.util.date.TimeHelpers
+import java.util.concurrent.{TimeUnit, ConcurrentHashMap, Callable}
+import akka.dispatch.{Await, Future}
+import akka.util.Duration
 
 /**
  * A wrapper around Akka, so that it is uniformly treated as an Aquarium service.
@@ -46,7 +57,39 @@ import gr.grnet.aquarium.AquariumInternalError
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 
-final class AkkaService extends Lifecycle with Loggable {
+final class AkkaService extends AquariumAwareSkeleton with Configurable with Lifecycle with Loggable {
+  @volatile private[this] var _actorSystem: ActorSystem = _
+  @volatile private[this] var _userActorCache: Cache[String, ActorRef] = _
+  @volatile private[this] var _cacheEvictionListener: RemovalListener[String, ActorRef] = _
+  @volatile private[this] var _cacheMaximumSize = 1000
+  @volatile private[this] var _cacheInitialCapacity = 100
+  @volatile private[this] var _cacheConcurrencyLevel = 20
+
+  private[this] val stoppingUserActors = new ConcurrentHashMap[String, Future[Boolean]]
+
+  private[this] val isShuttingDown = new AtomicBoolean(false)
+
+  def propertyPrefix: Option[String] = Some("actors")
+
+  /**
+   * Configure this instance with the provided properties.
+   *
+   * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
+   */
+  def configure(props: KKProps): Unit = {}
+
+  def actorSystem = {
+    if(this._actorSystem eq null) {
+      throw new AquariumInternalError("Akka actorSystem is null")
+    }
+
+    if(this.isShuttingDown.get()) {
+      throw new AquariumException("%s is shutting down".format(shortClassNameOf(this)))
+    }
+
+    this._actorSystem
+  }
+
   def start() = {
     // We have AKKA builtin, so no need to mess with pre-existing installation.
     if(SysEnvs.AKKA_HOME.value.isJust) {
@@ -54,9 +97,110 @@ final class AkkaService extends Lifecycle with Loggable {
       logger.error("%s is set".format(SysEnvs.Names.AKKA_HOME), error)
       throw error
     }
+
+    this._cacheEvictionListener = new RemovalListener[String, ActorRef] {
+      def onRemoval(rn: RemovalNotification[String, ActorRef]): Unit = {
+        if(isShuttingDown.get()) {
+          return
+        }
+
+        val userID   = rn.getKey
+        val actorRef = rn.getValue
+
+        logger.debug("Due to memory constraints, unloading UserActor for userID = %s".format(userID))
+
+        gracefullyStopUserActor(userID, actorRef)
+      }
+    }
+
+    this._userActorCache = CacheBuilder.
+      newBuilder().
+        maximumSize(_cacheMaximumSize).
+        initialCapacity(_cacheInitialCapacity).
+        concurrencyLevel(_cacheConcurrencyLevel).
+        removalListener(this._cacheEvictionListener).
+      build()
+
+    this._actorSystem = ActorSystem("aquarium-akka", ConfigFactory.load("akka.conf"))
+    logger.debug("Created %s".format(this._actorSystem))
   }
 
-  def stop()= {
-    Actor.registry.shutdownAll()
+  def stop() = {
+    this.isShuttingDown.set(true)
+
+    this.stoppingUserActors.clear()
+
+    this._userActorCache.invalidateAll
+    this._userActorCache.cleanUp()
+
+    this._actorSystem.shutdown()
+
+    logger.info("Shut down %s".format(this._actorSystem))
+  }
+
+  def notifyUserActorPostStop(userActor: UserActor): Unit = {
+    this.stoppingUserActors.remove(userActor.userID)
+  }
+
+  private[this] def gracefullyStopUserActor(userID: String, actorRef: ActorRef): Unit = {
+    this.stoppingUserActors.put(
+      userID,
+      akka.pattern.gracefulStop(actorRef, Duration(1000, TimeUnit.MILLISECONDS))(this._actorSystem)
+    )
+  }
+
+  def invalidateUserActor(userActor: UserActor): Unit = {
+    if(this.isShuttingDown.get()) {
+      return
+    }
+
+    val userID = userActor.userID
+    val actorRef = userActor.self
+
+    this._userActorCache.invalidate(userID)
+    gracefullyStopUserActor(userID, actorRef)
+  }
+
+  def getOrCreateUserActor(userID: String): ActorRef = {
+    if(this.isShuttingDown.get()) {
+      throw new AquariumException(
+        "%s is shutting down. Cannot provide user actor %s".format(
+          shortClassNameOf(this),
+          userID))
+    }
+
+    // If stopping, wait to stop or ignore
+    this.stoppingUserActors.get(userID) match {
+      case null ⇒
+
+      case future ⇒
+        try {
+          Await.result(future, Duration(500, TimeUnit.MILLISECONDS))
+        }
+        catch {
+          case e: Throwable ⇒
+            logger.error("While Await(ing) UserActor %s to stop".format(userID), e)
+        }
+        this.stoppingUserActors.remove(userID)
+    }
+
+    this._userActorCache.get(userID, new Callable[ActorRef] {
+      def call(): ActorRef = {
+        // Create new User Actor instance
+        val actorRef = _actorSystem.actorOf(Props.apply({
+          val actor = aquarium.newInstance(classOf[UserActor], classOf[UserActor].getName)
+          actor.awareOfAquariumEx(AquariumCreatedEvent(aquarium))
+          actor
+        }), "userActor::%s".format(userID))
+
+        // Cache it for subsequent calls
+        _userActorCache.put(userID, actorRef)
+
+        // Send the initialization message
+        actorRef ! InitializeUserState(userID, TimeHelpers.nowMillis())
+
+        actorRef
+      }
+    })
   }
 }
index 1b635f6..45b0856 100644 (file)
@@ -51,11 +51,11 @@ import java.net.InetSocketAddress
 import java.util.concurrent.{Executors, TimeUnit}
 import gr.grnet.aquarium.util.date.TimeHelpers
 import org.joda.time.format.ISODateTimeFormat
-import gr.grnet.aquarium.actor.message.{GetUserStateRequest, GetUserBalanceRequest, RouterResponseMessage, RouterRequestMessage}
-import gr.grnet.aquarium.actor.RouterRole
+import gr.grnet.aquarium.actor.message.{UserActorRequestMessage, GetUserStateRequest, GetUserBalanceRequest, UserActorResponseMessage}
 import com.ckkloverdos.resource.StreamResource
 import com.ckkloverdos.maybe.{Just, Failed}
 import gr.grnet.aquarium.event.model.ExternalEventModel
+import akka.util.{Timeout ⇒ ATimeout, Duration ⇒ ADuration}
 
 /**
  *
@@ -166,42 +166,31 @@ class FinagleRESTService extends Lifecycle with AquariumAwareSkeleton with Confi
   }
 
   final case class MainService() extends Service[THttpRequest, THttpResponse] {
-    final class ActorRouterService extends Service[RouterRequestMessage, RouterResponseMessage[_]] {
-      def apply(message: RouterRequestMessage): TFuture[RouterResponseMessage[_]] = {
+    final case class UserActorService() extends Service[UserActorRequestMessage, UserActorResponseMessage[_]] {
+      def apply(request: UserActorRequestMessage): TFuture[UserActorResponseMessage[_]] = {
         // We want to asynchronously route the message via akka and get the whole computation as a
         // twitter future.
-        val actorProvider = aquarium.actorProvider
-        val router = actorProvider.actorForRole(RouterRole)
-        val promise = new TPromise[RouterResponseMessage[_]]()
+        val actorRef = aquarium.akkaService.getOrCreateUserActor(request.userID)
+        val promise = new TPromise[UserActorResponseMessage[_]]()
 
-        val actualWork = router.ask(message)
+        val actualWork = akka.pattern.ask(actorRef, request)(ATimeout(ADuration(500, TimeUnit.MILLISECONDS))).
+          asInstanceOf[TFuture[UserActorResponseMessage[_]]]
 
-        actualWork onComplete { akkaFuture ⇒
-          akkaFuture.value match {
-            case Some(eitherValue) ⇒
-              eitherValue match {
-                case Left(throwable) ⇒
-                  promise.setException(throwable)
-
-                case Right(value) ⇒
-                  promise.setValue(value.asInstanceOf[RouterResponseMessage[_]])
-              }
-
-            case None ⇒
-              promise.setException(new Exception("Got no response for %s".format(message)))
-          }
-        }
+        actualWork.
+          onSuccess(promise.setValue).
+          onFailure(promise.setException).
+          onCancellation(promise.setException(new Exception("Processing of %s has been cancelled".format(request))))
 
         promise
       }
     }
 
-    final val actorRouterService = new ActorRouterService
+    final val actorRouterService = UserActorService()
 
-    def callRouter(requestMessage: RouterRequestMessage): TFuture[THttpResponse] = {
+    def callUserActor(requestMessage: UserActorRequestMessage): TFuture[THttpResponse] = {
       actorRouterService(requestMessage).transform { tryResponse ⇒
         tryResponse match {
-          case TReturn(responseMessage: RouterResponseMessage[_]) ⇒
+          case TReturn(responseMessage: UserActorResponseMessage[_]) ⇒
             val statusCode = responseMessage.suggestedHTTPStatus
             val status = THttpResponseStatus.valueOf(statusCode)
 
@@ -253,11 +242,11 @@ class FinagleRESTService extends Lifecycle with AquariumAwareSkeleton with Confi
 
         case RESTPaths.UserBalancePath(userID) ⇒
           // /user/(.+)/balance/?
-          callRouter(GetUserBalanceRequest(userID, millis))
+          callUserActor(GetUserBalanceRequest(userID, millis))
 
         case RESTPaths.UserStatePath(userId) ⇒
           // /user/(.+)/state/?
-          callRouter(GetUserStateRequest(userId, millis))
+          callUserActor(GetUserStateRequest(userId, millis))
 
         case RESTPaths.ResourcesAquariumPropertiesPath() ⇒
           resourceInfoResponse(ResourceLocator.Resources.AquariumPropertiesResource, TEXT_PLAIN)
index 87992ce..9986d01 100644 (file)
@@ -37,19 +37,19 @@ package gr.grnet.aquarium.service
 
 import com.ckkloverdos.props.Props
 import com.google.common.eventbus.Subscribe
-import gr.grnet.aquarium.{AquariumAwareSkeleton, Aquarium, Configurable}
+import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable}
 import gr.grnet.aquarium.converter.StdConverters
-import gr.grnet.aquarium.actor.RouterRole
 import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
 import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle}
 import gr.grnet.aquarium.util.sameTags
 import gr.grnet.aquarium.service.event.{AquariumCreatedEvent, StoreIsAliveBusEvent, StoreIsDeadBusEvent}
-import gr.grnet.aquarium.connector.rabbitmq.service.{PayloadHandlerFutureExecutor, PayloadHandlerPostNotifier}
+import gr.grnet.aquarium.connector.rabbitmq.service.PayloadHandlerPostNotifier
 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys.RabbitMQConfKeys
 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQKeys
 import gr.grnet.aquarium.connector.handler.{SynchronousPayloadHandlerExecutor, ResourceEventPayloadHandler, IMEventPayloadHandler}
 
 /**
+ * The service that is responsible to handle `RabbitMQ` connecrivity.
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
@@ -68,8 +68,6 @@ class RabbitMQService extends Loggable with Lifecycle with Configurable with Aqu
 
   def converters = aquarium.converters
 
-  def router = aquarium.actorProvider.actorForRole(RouterRole)
-
   /**
    * Configure this instance with the provided properties.
    *
diff --git a/src/main/scala/gr/grnet/aquarium/service/SimpleLocalRoleableActorProviderService.scala b/src/main/scala/gr/grnet/aquarium/service/SimpleLocalRoleableActorProviderService.scala
deleted file mode 100644 (file)
index 8ffdd83..0000000
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- *   1. Redistributions of source code must retain the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer in the documentation and/or other materials
- *      provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.service
-
-import com.ckkloverdos.props.Props
-import akka.actor.ActorRef
-import gr.grnet.aquarium.{AquariumAwareSkeleton, Configurable}
-import java.util.concurrent.ConcurrentHashMap
-import gr.grnet.aquarium.util.Loggable
-import gr.grnet.aquarium.actor.message.config.{AquariumPropertiesLoaded, ActorProviderConfigured}
-import gr.grnet.aquarium.actor._
-import gr.grnet.aquarium.service.event.AquariumCreatedEvent
-
-/**
- * All actors are provided locally.
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-class SimpleLocalRoleableActorProviderService
-  extends RoleableActorProviderService
-     with AquariumAwareSkeleton
-     with Configurable
-     with Loggable {
-
-  private[this] val actorCache = new ConcurrentHashMap[ActorRole, ActorRef]
-  @volatile private[this] var _props: Props = _
-
-  def propertyPrefix = None
-
-  def configure(props: Props): Unit = {
-    this._props = props
-  }
-
-  def start(): Unit = {
-    // Start and configure actors
-    import SimpleLocalRoleableActorProviderService.RolesToBeStarted
-
-    for(role <- RolesToBeStarted) {
-      actorForRole(role)
-    }
-  }
-
-  def stop(): Unit = {
-  }
-
-  private[this] def _newActor(role: ActorRole): ActorRef = {
-    val actorFactory = (_class: Class[_ <: RoleableActor]) ⇒ {
-      val actor = aquarium.newInstance(_class, _class.getName)
-      actor.awareOfAquariumEx(AquariumCreatedEvent(aquarium))
-      actor
-    }
-
-    val actorRef = akka.actor.Actor.actorOf(actorFactory(role.actorType)).start()
-
-    val propsMsg = AquariumPropertiesLoaded(this._props)
-    if(role.canHandleConfigurationMessage(propsMsg)) {
-      actorRef ! propsMsg
-    }
-
-    val providerMsg = ActorProviderConfigured(this)
-    if(role.canHandleConfigurationMessage(providerMsg)) {
-      actorRef ! providerMsg
-    }
-
-    actorRef
-  }
-
-  private[this] def _fromCacheOrNew(role: ActorRole): ActorRef = synchronized {
-    actorCache.get(role) match {
-      case null ⇒
-        val actorRef = _newActor(role)
-        actorCache.put(role, actorRef)
-        actorRef
-      case actorRef ⇒
-        actorRef
-    }
-  }
-
-  @throws(classOf[Exception])
-  def actorForRole(role: ActorRole, hints: Props = Props.empty) = synchronized {
-    if(role.isCacheable) {
-      _fromCacheOrNew(role)
-    } else {
-      _newActor(role)
-    }
-  }
-
-  override def toString = gr.grnet.aquarium.util.shortClassNameOf(this)
-}
-
-object SimpleLocalRoleableActorProviderService {
-  // Always set Router at the end.
-  // We could definitely use some automatic dependency sorting here (topological sorting anyone?)
-  final val RolesToBeStarted = List(RouterRole)
-
-  lazy val ActorClassByRole: Map[ActorRole, Class[_ <: RoleableActor]] =
-    RolesToBeStarted map {
-      role ⇒
-        (role, role.actorType)
-    } toMap
-
-  lazy val ActorRefByRole: Map[ActorRole, ActorRef] =
-    ActorClassByRole map {
-      case (role, clazz) ⇒
-        (role, akka.actor.Actor.actorOf(clazz).start())
-    }
-}
\ No newline at end of file
diff --git a/src/main/scala/gr/grnet/aquarium/uid/EAIOUUIDGenerator.scala b/src/main/scala/gr/grnet/aquarium/uid/EAIOUUIDGenerator.scala
deleted file mode 100644 (file)
index 5ab0cbb..0000000
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- *   1. Redistributions of source code must retain the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer in the documentation and/or other materials
- *      provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.uid
-
-import com.eaio.uuid.UUID
-
-/**
- * [[gr.grnet.aquarium.uid.UIDGenerator]] based on [[com.eaio.uuid.UUID]].
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>
- */
-
-object EAIOUUIDGenerator extends UIDGenerator[UUID] {
-  def nextUIDObject() = new UUID()
-}
index 3519279..2468e15 100644 (file)
@@ -1,11 +1,29 @@
 akka {
-  version = "1.3.1"
-  
+  version = "2.0.2"
+
   event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
-  event-handler-level = "WARNING" # Options: ERROR, WARNING, INFO, DEBUG
+
+  # Log level used by the configured loggers (see "event-handlers") as soon
+  # as they have been started; before that, see "stdout-loglevel"
+  # Options: ERROR, WARNING, INFO, DEBUG
+  loglevel = "INFO"
+
+  # Log level for the very basic logger activated during AkkaApplication startup
+  # Options: ERROR, WARNING, INFO, DEBUG
+  stdout-loglevel = "WARNING"
+
+  # Log the complete configuration at INFO level when the actor system is started.
+  # This is useful when you are uncertain of what configuration is used.
+  log-config-on-start = off
+
+  # Toggles whether the threads created by this ActorSystem should be daemons or not
+  daemonic = on
 
   actor {
-    # Default throughput for all ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness
-    throughput = 1
+    default-dispatcher {
+      # Throughput defines the number of messages that are processed in a batch
+      # before the thread is returned to the pool. Set to 1 for as fair as possible.
+      throughput = 1
+    }
   }
 }
\ No newline at end of file
index 9b2569e..d7aa4b7 100644 (file)
@@ -70,12 +70,6 @@ events.store.save.im.events=false
 # How often do we attempt a reconnection to the store(s)?
 anystore.reconnect.period.millis=1000
 
-#######
-# DO NOT TOUCH the following options, unless you know what you are doing
-#######
-
-# Actor subsystem
-actor.provider.class=gr.grnet.aquarium.service.SimpleLocalRoleableActorProviderService
 # Class that initializes the REST service
 rest.service.class=gr.grnet.aquarium.service.FinagleRESTService
 rest.shutdown.timeout.millis=2000
diff --git a/src/test/scala/gr/grnet/aquarium/processor/actor/ActorProxy.scala b/src/test/scala/gr/grnet/aquarium/processor/actor/ActorProxy.scala
deleted file mode 100644 (file)
index abac894..0000000
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- *   1. Redistributions of source code must retain the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer in the documentation and/or other materials
- *      provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.processor.actor
-
-import akka.actor.Actor
-
-/**
- * A proxy for a remote actor.
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-abstract class ActorProxy(name: String, host: String = Constants.RemoteHost, port: Int = Constants.RemotePort) extends Actor {
-  private[this] lazy val remote = Actor.remote.actorFor(name, host, port)
-
-  def receive = {
-    case message =>
-      remote forward message
-  }
-}
\ No newline at end of file
diff --git a/src/test/scala/gr/grnet/aquarium/processor/actor/Constants.scala b/src/test/scala/gr/grnet/aquarium/processor/actor/Constants.scala
deleted file mode 100644 (file)
index 1363afa..0000000
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- *   1. Redistributions of source code must retain the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer in the documentation and/or other materials
- *      provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.processor.actor
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-object Constants {
-  val RemoteHost = "localhost"
-  val RemotePort = 2552
-  val LocalHost = "localhost"
-  val LocalPort = 2551
-  val ActorNameEcho = "echo"
-  val ActorNameSilent = "silent"
-}
diff --git a/src/test/scala/gr/grnet/aquarium/processor/actor/RemoteActorTest.scala b/src/test/scala/gr/grnet/aquarium/processor/actor/RemoteActorTest.scala
deleted file mode 100644 (file)
index 8aea06f..0000000
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- *   1. Redistributions of source code must retain the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer in the documentation and/or other materials
- *      provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.processor.actor
-
-import org.junit.Test
-import org.junit.Assert._
-import org.junit.Assume.assumeTrue
-
-import akka.actor.Actor
-import java.lang.Object
-
-//import akka.actor.Actor.
-
-
-class EchoActor extends Actor {
-  def receive = {
-    case message =>
-      println("%s received: %s".format(this, message))
-      //self.reply("REPLY from EchoActor for '%s'".format(message))
-  }
-}
-
-class SilentActor extends Actor {
-  def receive = {
-    case message =>
-  }
-}
-
-class EchoProxy extends ActorProxy("echo")
-class SilentProxy extends ActorProxy("silent")
-
-object ClientPart {
-  import Constants._
-
-  def start(): Unit = Actor.remote.start(LocalHost, LocalPort)
-  def stop(): Unit  = Actor.remote.shutdownClientModule()
-
-  val echo   = Actor.actorOf[EchoProxy].start()
-  val silent = Actor.actorOf[SilentProxy].start()
-}
-
-object ServerPart {
-  import Constants._
-
-  def start(): Unit = {
-    Actor.remote.start(RemoteHost, RemotePort)
-
-    Actor.remote.register(ActorNameEcho, Actor.actorOf[EchoActor])
-    Actor.remote.register(ActorNameSilent, Actor.actorOf[SilentActor])
-  }
-
-  def stop(): Unit  = Actor.remote.shutdownServerModule()
-}
-
-
-/**
- *
- * @author Christos KK Loverdos <loverdos@gmail.com>.
- */
-class RemoteActorTest {
-  @Test
-  def testSendMessage: Unit = {
-    try {
-      ServerPart.start()
-      ClientPart.start()
-
-      ClientPart.echo ! "one"
-      ClientPart.echo ! "two"
-      ClientPart.echo ! "three"
-
-      // Give us some delay to print to the console...
-      Thread.sleep(100)
-    }
-    finally {
-      ServerPart.stop()
-      ClientPart.stop()
-    }
-  }
-}
\ No newline at end of file