WIP Rework AMQP stuff
authorChristos KK Loverdos <loverdos@gmail.com>
Mon, 14 May 2012 14:37:27 +0000 (17:37 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Mon, 14 May 2012 14:37:27 +0000 (17:37 +0300)
19 files changed:
src/main/scala/gr/grnet/aquarium/Configurator.scala
src/main/scala/gr/grnet/aquarium/connector/handler/PayloadHandlerExecutor.scala [new file with mode: 0644]
src/main/scala/gr/grnet/aquarium/connector/handler/SynchronousPayloadHandlerExecutor.scala [new file with mode: 0644]
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQConsumer.scala
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/GenericPayloadHandler.scala
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/PayloadHandlerFutureExecutor.scala [new file with mode: 0644]
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/RabbitMQService.scala
src/main/scala/gr/grnet/aquarium/messaging/AkkaAMQP.scala [deleted file]
src/main/scala/gr/grnet/aquarium/service/AkkaService.scala
src/main/scala/gr/grnet/aquarium/service/EventBusService.scala
src/main/scala/gr/grnet/aquarium/service/EventProcessorService.scala [deleted file]
src/main/scala/gr/grnet/aquarium/service/IMEventProcessorService.scala [deleted file]
src/main/scala/gr/grnet/aquarium/service/ResourceEventProcessorService.scala [deleted file]
src/main/scala/gr/grnet/aquarium/service/SimpleLocalRoleableActorProviderService.scala
src/main/scala/gr/grnet/aquarium/util/RandomEventGenerator.scala [deleted file]
src/main/scala/gr/grnet/aquarium/util/ReflectHelpers.scala
src/main/scala/gr/grnet/aquarium/util/package.scala
src/test/scala/gr/grnet/aquarium/messaging/AkkaAMQPTest.scala [deleted file]
src/test/scala/gr/grnet/aquarium/store/mongodb/EventStoreTest.scala [deleted file]

index 83ddb76..03ed4e7 100644 (file)
@@ -58,7 +58,15 @@ final class Configurator(val props: Props) extends Loggable {
   /**
    * Reflectively provide a new instance of a class and configure it appropriately.
    */
-  private[this] def newInstance[C : Manifest](className: String): C = {
+  private[this] def newInstance[C : Manifest](_className: String = ""): C = {
+    val className = _className match {
+      case "" ⇒
+        manifest[C].erasure.getName
+
+      case name ⇒
+        name
+    }
+
     val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
     instanceM match {
       case Just(instance) ⇒ instance match {
@@ -73,6 +81,7 @@ final class Configurator(val props: Props) extends Loggable {
 
           MaybeEither(configurable configure localProps) match {
             case Just(_) ⇒
+              logger.debug("Configured {} with props", configurable.getClass.getName)
               instance
 
             case Failed(e) ⇒
@@ -89,27 +98,15 @@ final class Configurator(val props: Props) extends Loggable {
 
   }
 
-  private[this] lazy val _actorProvider: RoleableActorProviderService = {
-    val instance = newInstance[RoleableActorProviderService](props.getEx(Keys.actor_provider_class))
-    logger.info("Loaded %s: %s".format(shortNameOfClass(classOf[RoleableActorProviderService]), instance.getClass))
-    instance
-  }
+  private[this] lazy val _actorProvider = newInstance[RoleableActorProviderService](props(Keys.actor_provider_class))
 
   /**
    * Initializes a store provider, according to the value configured
    * in the configuration file. The
    */
-  private[this] lazy val _storeProvider: StoreProvider = {
-    val instance = newInstance[StoreProvider](props.getEx(Keys.store_provider_class))
-    logger.info("Loaded %s: %s".format(shortNameOfClass(classOf[StoreProvider]), instance.getClass))
-    instance
-  }
+  private[this] lazy val _storeProvider = newInstance[StoreProvider](props(Keys.store_provider_class))
   
-  private[this] lazy val _restService: Lifecycle = {
-    val instance = newInstance[Lifecycle](props.getEx(Keys.rest_service_class))
-    logger.info("Loaded RESTService: %s".format(instance.getClass))
-    instance
-  }
+  private[this] lazy val _restService = newInstance[Lifecycle](props(Keys.rest_service_class))
 
   private[this] lazy val _userStateStoreM: Maybe[UserStateStore] = {
     // If there is a specific `UserStateStore` implementation specified in the
@@ -204,21 +201,19 @@ final class Configurator(val props: Props) extends Loggable {
 
   private[this] lazy val _converters = StdConverters.AllConverters
 
-  private[this] lazy val _resEventProc = new ResourceEventProcessorService
+  private[this] lazy val _akka = newInstance[AkkaService]()
 
-  private[this] lazy val _imEventProc = new IMEventProcessorService
+  private[this] lazy val _eventBus = newInstance[EventBusService]()
 
-  private[this] lazy val _eventBus = newInstance[EventBusService](classOf[EventBusService].getName)
+  private[this] lazy val _rabbitmqService = newInstance[RabbitMQService]()
 
-  private[this] lazy val _lifecycleServices = List(
-    AkkaService,
+  private[this] lazy val _allServices = List(
+    _akka,
+    _actorProvider,
     _eventBus,
     _restService,
-    _actorProvider,
-    newInstance[RabbitMQService](classOf[RabbitMQService].getName)
-    )/*,
-    _resEventProc,
-    _imEventProc)*/
+    _rabbitmqService
+  )
 
   def get(key: String, default: String = ""): String = props.getOr(key, default)
 
@@ -251,12 +246,13 @@ final class Configurator(val props: Props) extends Loggable {
   }
 
   def startServices(): Unit = {
-    _lifecycleServices.foreach(_.start())
+    for(service ← _allServices) {
+      service.start()
+    }
   }
 
   def stopServices(): Unit = {
-    _lifecycleServices.reverse.foreach(_.stop())
-//    akka.actor.Actor.registry.shutdownAll()
+    _allServices.reverse.foreach(service ⇒ safeUnit(service.stop()))
   }
 
   def stopServicesWithDelay(millis: Long) {
@@ -479,7 +475,7 @@ object Configurator {
     final val events_store_folder = "events.store.folder"
 
     /**
-     * If set to `true`, then an IM event that cannot be parsed to [[gr.grnet.aquarium.event.im.IMEventModel]] is
+     * If set to `true`, then an IM event that cannot be parsed to [[gr.grnet.aquarium.event.model.im.IMEventModel]] is
      * saved to the [[gr.grnet.aquarium.store.IMEventStore]].
      */
     final val save_unparsed_event_im = "save.unparsed.event.im"
diff --git a/src/main/scala/gr/grnet/aquarium/connector/handler/PayloadHandlerExecutor.scala b/src/main/scala/gr/grnet/aquarium/connector/handler/PayloadHandlerExecutor.scala
new file mode 100644 (file)
index 0000000..7f1f805
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.connector.handler
+
+/**
+ * An executor for a [[gr.grnet.aquarium.connector.handler.PayloadHandler]].
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+
+trait PayloadHandlerExecutor {
+  def exec(payload: Array[Byte], handler: PayloadHandler)
+          (onSuccess: HandlerResult ⇒ Unit)
+          (onError: Throwable ⇒ Unit): Unit
+}
diff --git a/src/main/scala/gr/grnet/aquarium/connector/handler/SynchronousPayloadHandlerExecutor.scala b/src/main/scala/gr/grnet/aquarium/connector/handler/SynchronousPayloadHandlerExecutor.scala
new file mode 100644 (file)
index 0000000..5b1c031
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.connector.handler
+
+/**
+ * A [[gr.grnet.aquarium.connector.handler.PayloadHandlerExecutor]] that calls the
+ * [[gr.grnet.aquarium.connector.handler.PayloadHandler]] synchronously.
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+
+final class SynchronousPayloadHandlerExecutor extends PayloadHandlerExecutor {
+
+  def exec(payload: Array[Byte], handler: PayloadHandler)
+          (onSuccess: HandlerResult ⇒ Unit)
+          (onError: Throwable ⇒ Unit): Unit = {
+
+    try onSuccess(handler.handlePayload(payload))
+    catch { case e: Throwable ⇒ onError(e) }
+  }
+}
index 06b2478..bcbea00 100644 (file)
@@ -37,7 +37,7 @@ package gr.grnet.aquarium.connector.rabbitmq
 
 import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQConsumerConf
 import gr.grnet.aquarium.util.{Lifecycle, Loggable}
-import gr.grnet.aquarium.util.safeUnit
+import gr.grnet.aquarium.util.{safeUnit, shortClassNameOf}
 import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.{RabbitMQQueueKeys, RabbitMQExchangeKeys}
 import com.rabbitmq.client.{Envelope, Consumer, ShutdownSignalException, ShutdownListener, ConnectionFactory, Channel, Connection}
 import com.rabbitmq.client.AMQP.BasicProperties
@@ -45,22 +45,27 @@ import gr.grnet.aquarium.Configurator
 import gr.grnet.aquarium.connector.rabbitmq.eventbus.RabbitMQError
 import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
 import gr.grnet.aquarium.service.event.BusEvent
-import gr.grnet.aquarium.connector.handler.{HandlerResultPanic, HandlerResultRequeue, HandlerResultReject, HandlerResultSuccess, PayloadHandler}
+import gr.grnet.aquarium.connector.handler.{PayloadHandlerExecutor, HandlerResultPanic, HandlerResultRequeue, HandlerResultReject, HandlerResultSuccess, PayloadHandler}
+import com.ckkloverdos.maybe.MaybeEither
+import gr.grnet.aquarium.util.date.TimeHelpers
 
 /**
+ * A basic `RabbitMQ` consumer. Sufficiently generalized, sufficiently tied to Aquarium.
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 
-class RabbitMQConsumer(val conf: RabbitMQConsumerConf, handler: PayloadHandler) extends Loggable with Lifecycle {
+class RabbitMQConsumer(conf: RabbitMQConsumerConf,
+                       handler: PayloadHandler,
+                       executor: PayloadHandlerExecutor) extends Loggable with Lifecycle {
   private[this] var _factory: ConnectionFactory = _
   private[this] var _connection: Connection = _
   private[this] var _channel: Channel = _
-  private[this] val _isAlive = new AtomicBoolean(false)
+//  private[this] val _isAlive = new AtomicBoolean(false)
   private[this] val _state = new AtomicReference[State](Shutdown)
 
-  def isAlive() = {
-    _isAlive.get() && _state.get().isStarted
+  def isStarted() = {
+    _state.get().isStarted && MaybeEither(_channel.isOpen).getOr(false)
   }
 
   sealed trait State {
@@ -76,7 +81,6 @@ class RabbitMQConsumer(val conf: RabbitMQConsumerConf, handler: PayloadHandler)
     _state.set(ShutdownSequence)
     safeUnit(_channel.close())
     safeUnit(_connection.close())
-    _isAlive.set(false)
     _state.set(Shutdown)
   }
 
@@ -138,10 +142,6 @@ class RabbitMQConsumer(val conf: RabbitMQConsumerConf, handler: PayloadHandler)
 
     channel.addShutdownListener(RabbitMQShutdownListener)
 
-    if(_channel.isOpen) {
-      _isAlive.getAndSet(true)
-    }
-
     _channel.basicConsume(
       queueName,
       false, // We send explicit acknowledgements to RabbitMQ
@@ -151,14 +151,25 @@ class RabbitMQConsumer(val conf: RabbitMQConsumerConf, handler: PayloadHandler)
 
   def start(): Unit = {
     try {
-      doFullStartupSequence()
-      _state.set(Started)
+      logStarting()
+      val (ms0, ms1, _) = TimeHelpers.timed {
+        doFullStartupSequence()
+        _state.set(Started)
+      }
+      logStarted(ms0, ms1, toDebugString)
     } catch {
       case e: Exception ⇒
         doSafeFullShutdownSequence(true)
+        logger.error("While starting", e)
     }
   }
 
+  def stop() = {
+    logStopping()
+    val (ms0, ms1,  _) = TimeHelpers.timed(doSafeFullShutdownSequence(false))
+    logStopped(ms0, ms1, toDebugString)
+  }
+
   private[this] def postBusError(event: BusEvent): Unit = {
     Configurator.MasterConfigurator.eventBus ! event
   }
@@ -192,10 +203,18 @@ class RabbitMQConsumer(val conf: RabbitMQConsumerConf, handler: PayloadHandler)
     }
 
     def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) = {
+      def doError: PartialFunction[Throwable, Unit] = {
+        case e: Exception ⇒
+          logger.warn("Unexpected error", e)
+
+        case e: Throwable ⇒
+          throw e
+      }
+
       try {
         val deliveryTag = envelope.getDeliveryTag
-        val hresult = handler.handlePayload(body)
-        hresult match {
+
+        executor.exec(body, handler) {
           case HandlerResultSuccess ⇒
             doWithChannel(_.basicAck(deliveryTag, false))
 
@@ -209,11 +228,8 @@ class RabbitMQConsumer(val conf: RabbitMQConsumerConf, handler: PayloadHandler)
             // The other end is crucial to the overall operation and it is in panic mode,
             // so we stop delivering messages until further notice
             doSafeFullShutdownSequence(true)
-        }
-      } catch {
-        case e: Exception ⇒
-          logger.warn("Unexpected error", e)
-      }
+        } (doError)
+      } catch (doError)
     }
   }
 
@@ -223,7 +239,6 @@ class RabbitMQConsumer(val conf: RabbitMQConsumerConf, handler: PayloadHandler)
 
     def shutdownCompleted(cause: ShutdownSignalException) = {
       safeUnit { _channel.close() }
-      _isAlive.getAndSet(false)
 
       // Now, let's see what happened
       if(isConnectionError(cause)) {
@@ -232,7 +247,12 @@ class RabbitMQConsumer(val conf: RabbitMQConsumerConf, handler: PayloadHandler)
     }
   }
 
-  def stop() = {
+  def toDebugString = {
+    import conf._
+    "(exchange=%s, routingKey=%s, queue=%s)".format(exchangeName, routingKey, queueName)
+  }
 
+  override def toString = {
+    "%s%s".format(shortClassNameOf(this), toDebugString)
   }
 }
index f702d6f..8d4853a 100644 (file)
@@ -43,6 +43,15 @@ import gr.grnet.aquarium.event.model.ExternalEventModel
 import gr.grnet.aquarium.util.safeUnit
 
 /**
+ * Generic handler of events arriving to Aquarium.
+ *
+ * We first parse them to JSON ([[gr.grnet.aquarium.converter.JsonTextFormat]]) and an appropriate event model
+ * (`E <:` [[gr.grnet.aquarium.event.model.ExternalEventModel]]),
+ * then store them to DB
+ * (`S <:` [[gr.grnet.aquarium.event.model.ExternalEventModel]])
+ * and then forward them to business logic.
+ *
+ * All the above actions are given polymorphically via appropriate functions.
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
diff --git a/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/PayloadHandlerFutureExecutor.scala b/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/PayloadHandlerFutureExecutor.scala
new file mode 100644 (file)
index 0000000..e89da0e
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.connector.rabbitmq.service
+
+import akka.dispatch.Future
+import gr.grnet.aquarium.connector.handler.{PayloadHandler, HandlerResult, PayloadHandlerExecutor}
+
+
+/**
+ * An [[gr.grnet.aquarium.connector.handler.PayloadHandlerExecutor]] that uses `Akka` futures.
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+
+final class PayloadHandlerFutureExecutor extends PayloadHandlerExecutor {
+
+  def exec(payload: Array[Byte], handler: PayloadHandler)
+          (onSuccess: HandlerResult ⇒ Unit)
+          (onError: Throwable ⇒ Unit): Unit = {
+
+    val result = Future { handler.handlePayload(payload) }
+
+    result.onComplete { futureHandlerResult ⇒
+      futureHandlerResult.value.get match {
+        case Left(e) ⇒
+          onError(e)
+
+        case Right(handlerResult) ⇒
+          onSuccess(handlerResult)
+      }
+    }
+  }
+}
index 7a77700..9e79d38 100644 (file)
@@ -37,10 +37,9 @@ package gr.grnet.aquarium.connector.rabbitmq.service
 
 import com.ckkloverdos.props.Props
 import gr.grnet.aquarium.util.date.TimeHelpers
-import gr.grnet.aquarium.util.{ReflectHelpers, Loggable, Lifecycle}
-import com.rabbitmq.client.{ConnectionFactory, Address}
+import gr.grnet.aquarium.util.{Loggable, Lifecycle}
+import com.rabbitmq.client.Address
 import gr.grnet.aquarium.{Configurator, Configurable}
-import gr.grnet.aquarium.connector.rabbitmq.eventbus.RabbitMQError
 import gr.grnet.aquarium.connector.rabbitmq.conf.{TopicExchange, RabbitMQConsumerConf, RabbitMQExchangeType}
 import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.RabbitMQConfKeys
 import com.ckkloverdos.key.{ArrayKey, IntKey, TypedKeySkeleton, BooleanKey, StringKey}
@@ -48,8 +47,7 @@ import com.ckkloverdos.env.{EnvKey, Env}
 import gr.grnet.aquarium.converter.{JsonTextFormat, StdConverters}
 import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
 import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
-import com.ckkloverdos.maybe.{MaybeEither, Failed, Just, Maybe}
-import gr.grnet.aquarium.connector.handler.{HandlerResultSuccess, HandlerResultPanic, HandlerResultReject, HandlerResult, PayloadHandler}
+import com.ckkloverdos.maybe.MaybeEither
 import gr.grnet.aquarium.actor.RouterRole
 import gr.grnet.aquarium.actor.message.event.{ProcessIMEvent, ProcessResourceEvent}
 import gr.grnet.aquarium.store.{LocalFSEventStore, IMEventStore, ResourceEventStore}
@@ -61,7 +59,8 @@ import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
  */
 
 class RabbitMQService extends Loggable with Lifecycle with Configurable {
-  private[this] val props: Props = Props()(StdConverters.AllConverters)
+  @volatile private[this] var _props: Props = Props()(StdConverters.AllConverters)
+  @volatile private[this] var _consumers = List[RabbitMQConsumer]()
 
   def propertyPrefix = Some(RabbitMQService.PropertiesPrefix)
 
@@ -83,17 +82,9 @@ class RabbitMQService extends Loggable with Lifecycle with Configurable {
    * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
    */
   def configure(props: Props) = {
-    ReflectHelpers.setField(this, "props", props)
-
-    try {
-      doConfigure()
-      logger.info("Configured with {}", this.props)
-    } catch {
-      case e: Exception ⇒
-      // If we have no internal error, then something is bad with RabbitMQ
-      eventBus ! RabbitMQError(e)
-      throw e
-    }
+    this._props = props
+
+    doConfigure()
   }
 
   private[this] def doConfigure(): Unit = {
@@ -109,12 +100,28 @@ class RabbitMQService extends Loggable with Lifecycle with Configurable {
       StdIMEvent.fromJsonTextFormat(jsonTextFormat)
     }
 
+    val rcForwardAction: (ResourceEventStore#ResourceEvent ⇒ Unit) = { rcEvent ⇒
+      router ! ProcessResourceEvent(rcEvent)
+    }
+
+    val rcDebugForwardAction: (ResourceEventStore#ResourceEvent ⇒ Unit) = { rcEvent ⇒
+      logger.info("Forwarding {}", rcEvent)
+    }
+
+    val imForwardAction: (IMEventStore#IMEvent ⇒ Unit) = { imEvent ⇒
+      router ! ProcessIMEvent(imEvent)
+    }
+
+    val imDebugForwardAction: (IMEventStore#IMEvent ⇒ Unit) = { imEvent ⇒
+      logger.info("Forwarding {}", imEvent)
+    }
+
     val rcHandler = new GenericPayloadHandler[ResourceEventModel, ResourceEventStore#ResourceEvent](
       jsonParser,
       (payload, error) ⇒ LocalFSEventStore.storeUnparsedResourceEvent(configurator, payload, error),
       rcEventParser,
       rcEvent ⇒ resourceEventStore.insertResourceEvent(rcEvent),
-      rcEvent ⇒ router ! ProcessResourceEvent(rcEvent)
+      rcDebugForwardAction
     )
 
     val imHandler = new GenericPayloadHandler[IMEventModel, IMEventStore#IMEvent](
@@ -122,36 +129,88 @@ class RabbitMQService extends Loggable with Lifecycle with Configurable {
       (payload, error) ⇒ LocalFSEventStore.storeUnparsedIMEvent(configurator, payload, error),
       imEventParser,
       imEvent ⇒ imEventStore.insertIMEvent(imEvent),
-      imEvent ⇒ router ! ProcessIMEvent(imEvent)
+      imDebugForwardAction
     )
 
+    val futureExecutor = new PayloadHandlerFutureExecutor
+
     // (e)xchange:(r)outing key:(q)
-    val all_rc_ERQs = props.getTrimmedList(RabbitMQConfKeys.rcevents_queues)
-    val rcConsumerConfs = for(oneERQ ← all_rc_ERQs) yield {
-      RabbitMQService.makeRabbitMQConsumerConf(props, oneERQ)
+    logger.debug("%s=%s".format(RabbitMQConfKeys.rcevents_queues, _props(RabbitMQConfKeys.rcevents_queues)))
+    val all_rc_ERQs = _props.getTrimmedList(RabbitMQConfKeys.rcevents_queues)
+    logger.debug("all_rc_ERQs = %s".format(all_rc_ERQs))
+
+    val rcConsumerConfs_ = for(oneERQ ← all_rc_ERQs) yield {
+      RabbitMQService.makeRabbitMQConsumerConf(_props, oneERQ)
+    }
+    val rcConsumerConfs = rcConsumerConfs_.toSet.toList
+    if(rcConsumerConfs.size != rcConsumerConfs_.size) {
+      logger.warn(
+        "Duplicate %s consumer info in %s=%s".format(
+        RabbitMQService.PropertiesPrefix,
+        RabbitMQConfKeys.rcevents_queues,
+        _props(RabbitMQConfKeys.rcevents_queues)))
     }
 
-    val all_im_ERQs = props.getTrimmedList(RabbitMQConfKeys.imevents_queues)
-    val imConsumerConfs = for(oneERQ ← all_im_ERQs) yield {
-      RabbitMQService.makeRabbitMQConsumerConf(props, oneERQ)
+    val all_im_ERQs = _props.getTrimmedList(RabbitMQConfKeys.imevents_queues)
+    val imConsumerConfs_ = for(oneERQ ← all_im_ERQs) yield {
+      RabbitMQService.makeRabbitMQConsumerConf(_props, oneERQ)
+    }
+    val imConsumerConfs = imConsumerConfs_.toSet.toList
+    if(imConsumerConfs.size != imConsumerConfs_.size) {
+      logger.warn(
+        "Duplicate %s consumer info in %s=%s".format(
+        RabbitMQService.PropertiesPrefix,
+        RabbitMQConfKeys.imevents_queues,
+        _props(RabbitMQConfKeys.imevents_queues)))
     }
 
     val rcConsumers = for(rccc ← rcConsumerConfs) yield {
-      new RabbitMQConsumer(rccc, rcHandler)
+      logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
+        RabbitMQService.PropertiesPrefix,
+        rccc.exchangeName,
+        rccc.routingKey,
+        rccc.queueName
+      ))
+      new RabbitMQConsumer(rccc, rcHandler, futureExecutor)
     }
 
     val imConsumers = for(imcc ← imConsumerConfs) yield {
-      new RabbitMQConsumer(imcc, imHandler)
+      logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
+        RabbitMQService.PropertiesPrefix,
+        imcc.exchangeName,
+        imcc.routingKey,
+        imcc.queueName
+      ))
+      new RabbitMQConsumer(imcc, imHandler, futureExecutor)
     }
+
+    this._consumers = rcConsumers ++ imConsumers
+
+    val lg: (String ⇒ Unit) = if(this._consumers.size == 0) logger.warn(_) else logger.debug(_)
+    lg("Got %s consumers".format(this._consumers.size))
+
+    this._consumers.foreach(logger.debug("Configured {}", _))
   }
 
   def start() = {
-    logStarted(TimeHelpers.nowMillis(), TimeHelpers.nowMillis())
-    System.exit(1)
+    val (ms0, ms1, _) = TimeHelpers.timed {
+      this._consumers.foreach(_.start())
+
+      for(consumer ← this._consumers) {
+        if(!consumer.isStarted()) {
+          logger.warn("Not started yet {}", consumer.toDebugString)
+        }
+      }
+    }
+    logStarted(ms0, ms1)
   }
 
   def stop() = {
-    logStopped(TimeHelpers.nowMillis(), TimeHelpers.nowMillis())
+    val (ms0, ms1, _) = TimeHelpers.timed {
+      this._consumers.foreach(_.stop())
+    }
+
+    logStopped(ms0, ms1)
   }
 
 }
@@ -160,7 +219,7 @@ object RabbitMQService {
   final val PropertiesPrefix       = "rabbitmq"
   final val PropertiesPrefixAndDot = PropertiesPrefix + "."
 
-  @inline private[this] def p(name: String) = PropertiesPrefix + name
+  @inline private[this] def p(name: String) = PropertiesPrefixAndDot + name
 
   final val DefaultExchangeConfArguments = Env()
 
@@ -273,39 +332,33 @@ object RabbitMQService {
      * configuration.
      */
     final val servers = p("servers")
-    final val amqp_servers = servers
 
     /**
      * Comma separated list of AMQP servers running in active-active
      * configuration.
      */
     final val port = p("port")
-    final val amqp_port = port
 
     /**
      * User name for connecting with the AMQP server
      */
     final val username = p("username")
-    final val amqp_username = username
 
     /**
      * Password for connecting with the AMQP server
      */
     final val password = p("passwd")
-    final val amqp_password = password
 
     /**
      * Virtual host on the AMQP server
      */
     final val vhost = p("vhost")
-    final val amqp_vhost = vhost
 
     /**
      * Comma separated list of exchanges known to aquarium
      * FIXME: What is this??
      */
     final val exchange = p("exchange")
-    final val amqp_exchange = exchange
 
     /**
      * Queues for retrieving resource events from. Multiple queues can be
@@ -314,7 +367,6 @@ object RabbitMQService {
      * Format is `exchange:routing.key:queue-name,...`
      */
     final val rcevents_queues = p("rcevents.queues")
-    final val amqp_rcevents_queues = rcevents_queues
 
     /**
      * Queues for retrieving user events from. Multiple queues can be
@@ -323,7 +375,5 @@ object RabbitMQService {
      * Format is `exchange:routing.key:queue-name,...`
      */
     final val imevents_queues = p("imevents.queues")
-    final val amqp_imevents_queues = imevents_queues
   }
-
 }
diff --git a/src/main/scala/gr/grnet/aquarium/messaging/AkkaAMQP.scala b/src/main/scala/gr/grnet/aquarium/messaging/AkkaAMQP.scala
deleted file mode 100644 (file)
index 07e3495..0000000
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- *   1. Redistributions of source code must retain the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer in the documentation and/or other materials
- *      provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.messaging
-
-import akka.actor._
-import akka.amqp.{Topic, AMQP}
-import akka.amqp.AMQP._
-import gr.grnet.aquarium.Configurator
-import com.rabbitmq.client.Address
-import gr.grnet.aquarium.util.Loggable
-
-import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.RabbitMQConfKeys
-
-/**
- * Functionality for working with queues.
- *
- * @author Georgios Gousios <gousiosg@gmail.com>
- */
-trait AkkaAMQP extends Loggable {
-
-  class AMQPConnection {
-    private[messaging] lazy val connection = {
-
-      val mc = Configurator.MasterConfigurator
-
-      val servers = mc.get(RabbitMQConfKeys.amqp_servers)
-      val port = mc.get(RabbitMQConfKeys.amqp_port).toInt
-
-      // INFO: Can connect to more than one rabbitmq nodes
-      val addresses = servers.split(",").foldLeft(Array[Address]()) {
-        (x, y) => x ++ Array(new Address(y, port))
-      }
-
-      AMQP.newConnection(
-        ConnectionParameters(
-          addresses,
-          mc.get(RabbitMQConfKeys.amqp_username),
-          mc.get(RabbitMQConfKeys.amqp_password),
-          mc.get(RabbitMQConfKeys.amqp_vhost),
-          1000,
-          None))
-    }
-  }
-
-  lazy val im_exchanges =
-    Configurator.MasterConfigurator.get(RabbitMQConfKeys.amqp_imevents_queues).split(';').map(e => e.split(':')(0))
-
-  lazy val aquarium_exchnage = Configurator.MasterConfigurator.get(RabbitMQConfKeys.amqp_exchange)
-
-  lazy val resevent_exchanges =
-    Configurator.MasterConfigurator.get(RabbitMQConfKeys.amqp_rcevents_queues).split(';').map(e => e.split(':')(0))
-
-  //Queues and exchnages are by default durable and persistent
-  val decl = ActiveDeclaration(durable = true, autoDelete = false)
-
-  def consumer(routekey: String, queue: String, exchange: String,
-               recipient: ActorRef, selfAck: Boolean) =
-    AMQP.newConsumer(
-      connection = (new AMQPConnection()).connection,
-      consumerParameters = ConsumerParameters(
-        routingKey = routekey,
-        exchangeParameters =
-          // INFO "x-ha-policy" is used to make queues on all nodes
-          Some(ExchangeParameters(exchange, Topic, decl, Map("x-ha-policy" -> "all"))),
-        deliveryHandler = recipient,
-        queueName = Some(queue),
-        queueDeclaration = decl,
-        selfAcknowledging = selfAck,
-        /* Better safe than sorry */
-        channelParameters = Some(ChannelParameters(prefetchSize = 1))
-        ))
-
-  def producer(exchange: String) = {
-
-    AMQP.newProducer(
-      connection = (new AMQPConnection()).connection,
-      producerParameters = ProducerParameters(
-        exchangeParameters = Some(ExchangeParameters(exchange, Topic, decl)),
-        channelParameters = Some(ChannelParameters(prefetchSize = 1))))
-  }
-}
\ No newline at end of file
index 2b7b8c1..0fca827 100644 (file)
@@ -45,7 +45,7 @@ import gr.grnet.aquarium.util.date.TimeHelpers
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
 
-object AkkaService extends Lifecycle with Loggable {
+final class AkkaService extends Lifecycle with Loggable {
   def start() = {
     logStarted(TimeHelpers.nowMillis(), TimeHelpers.nowMillis())
   }
index 5613965..3ad70c1 100644 (file)
@@ -41,7 +41,8 @@ import gr.grnet.aquarium.util.date.TimeHelpers
 import gr.grnet.aquarium.service.event.BusEvent
 import com.google.common.eventbus.{DeadEvent, Subscribe, EventBus}
 import akka.actor.{ActorRef, Actor}
-import gr.grnet.aquarium.util.{ReflectHelpers, Lifecycle, Loggable}
+import gr.grnet.aquarium.util.{Lifecycle, Loggable}
+import gr.grnet.aquarium.util.safeUnit
 
 
 /**
@@ -52,7 +53,7 @@ import gr.grnet.aquarium.util.{ReflectHelpers, Lifecycle, Loggable}
 
 class EventBusService extends Loggable with Lifecycle with Configurable {
   private[this] val theBus = new EventBus(classOf[EventBusService].getName)
-  private[this] val poster: ActorRef = null
+  private[this] var _poster: ActorRef = null
 
   def propertyPrefix = None
 
@@ -62,21 +63,20 @@ class EventBusService extends Loggable with Lifecycle with Configurable {
    * If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
    */
   def configure(props: Props) = {
-
   }
 
   def start() = {
     val (ms0, ms1, _) = TimeHelpers.timed {
       this addSubsciber this // Wow!
 
-      val poster = Actor actorOf AsyncPoster
-      ReflectHelpers.setField(this, "poster", poster)
+      this._poster = Actor actorOf AsyncPoster
     }
     logStarted(ms0, ms1)
   }
 
   def stop() = {
-    logStopped(TimeHelpers.nowMillis(), TimeHelpers.nowMillis())
+    val (ms0, ms1, _) = TimeHelpers.timed(safeUnit(_poster.stop()))
+    logStopped(ms0, ms1)
   }
 
   @inline
@@ -85,7 +85,7 @@ class EventBusService extends Loggable with Lifecycle with Configurable {
   }
 
   def ![A <: BusEvent](event: A): Unit = {
-    poster ! event
+    _poster ! event
   }
 
   def addSubsciber[A <: AnyRef](subscriber: A): Unit = {
diff --git a/src/main/scala/gr/grnet/aquarium/service/EventProcessorService.scala b/src/main/scala/gr/grnet/aquarium/service/EventProcessorService.scala
deleted file mode 100644 (file)
index 3f81aee..0000000
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- *   1. Redistributions of source code must retain the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer in the documentation and/or other materials
- *      provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.service
-
-import gr.grnet.aquarium.util.{Lifecycle, Loggable}
-
-import akka.actor._
-import akka.actor.Actor._
-import akka.routing.CyclicIterator
-import akka.routing.Routing._
-import akka.dispatch.Dispatchers
-import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
-import akka.config.Supervision.SupervisorConfig
-import akka.config.Supervision.OneForOneStrategy
-import gr.grnet.aquarium.messaging.AkkaAMQP
-import akka.amqp._
-import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet}
-import gr.grnet.aquarium.util.date.TimeHelpers
-import gr.grnet.aquarium.{AquariumException, Configurator}
-import gr.grnet.aquarium.actor.ReflectiveActor
-import gr.grnet.aquarium.event.model.ExternalEventModel
-
-/**
- * An abstract service that retrieves Aquarium events from a queue,
- * stores them persistently and forwards them for further processing.
- * The processing happens among two load-balanced actor clusters
- * asynchronously. The number of employed actors is always equal to
- * the number of processors. The number of threads per load-balanced
- * cluster is configurable by subclasses.
- *
- * @author Georgios Gousios <gousiosg@gmail.com>
- */
-abstract class EventProcessorService[E <: ExternalEventModel] extends AkkaAMQP with Loggable with Lifecycle {
-
-  /* Messages exchanged between the persister and the queuereader */
-  case class AckData(msgId: String, deliveryTag: Long, queue: ActorRef)
-
-  case class Persist(event: E, initialPayload: Array[Byte], sender: ActorRef, ackData: AckData)
-
-  case class PersistOK(ackData: AckData)
-
-  case class PersistFailed(ackData: AckData)
-
-  case class Duplicate(ackData: AckData)
-
-  /**
-   * Short term storage for delivery tags to work around AMQP
-   * limitation with redelivering rejected messages to same host.
-   *
-   * FIXME: Grow unbounded???
-   */
-  private val redeliveries = new ConcurrentSkipListSet[String]()
-
-  /**
-   *  Temporarily keeps track of messages while being processed
-   *
-   *  FIXME: Grow unbounded???
-   */
-  private val inFlightEvents = new ConcurrentHashMap[Long, E](200, 0.9F, 4)
-
-  /* Supervisor actor for each event processing operation */
-  private lazy val supervisor = Supervisor(SupervisorConfig(
-    OneForOneStrategy(
-      List(classOf[Exception]), //What exceptions will be handled
-      5, // maximum number of restart retries
-      5000 // within time in millis
-    ), Nil
-  ))
-
-  protected def _configurator: Configurator = Configurator.MasterConfigurator
-
-  protected def parseJsonBytes(data: Array[Byte]): E
-
-  protected def forward(event: E): Unit
-
-  protected def existsInStore(event: E): Boolean
-
-  protected def storeParsedEvent(event: E, initialPayload: Array[Byte]): Unit
-
-  protected def storeUnparsedEvent(initialPayload: Array[Byte], exception: Throwable): Unit
-
-  protected def queueReaderThreads: Int
-
-  protected def persisterThreads: Int
-
-  protected def numQueueActors: Int
-
-  protected def numPersisterActors: Int
-
-  protected def name: String
-
-  protected def persisterManager: PersisterManager
-
-  protected def queueReaderManager: QueueReaderManager
-
-  protected val numCPUs = Runtime.getRuntime.availableProcessors
-
-  def start(): Unit
-
-  def stop(): Unit
-
-  protected def declareQueues(conf: String) = {
-    val decl = _configurator.get(conf)
-    decl.split(";").foreach {
-      q =>
-        val i = q.split(":")
-
-        if(i.size < 3)
-          throw new AquariumException("Queue declaration \"%s\" not correct".format(q))
-
-        val exchange = i(0)
-        val route = i(1)
-        val qname = i(2)
-        logger.info("Declaring queue '%s' for exchange '%s' and key '%s'".format(qname, exchange, route))
-        consumer(route, qname, exchange, queueReaderManager.lb, false)
-    }
-  }
-
-  class QueueReader extends Actor {
-
-    def receive = {
-      case Delivery(payload, _, deliveryTag, isRedeliver, _, queue) =>
-        try {
-          val event = parseJsonBytes(payload)
-          inFlightEvents.put(deliveryTag, event)
-
-          if(isRedeliver) {
-            //Message could not be processed 3 times, just ignore it
-            if(redeliveries.contains(event.id)) {
-              logger.warn("Actor[%s] - Event[%s] msg[%d] redelivered >2 times. Rejecting".format(self.getUuid(), event, deliveryTag))
-              queue ! Reject(deliveryTag, false)
-              redeliveries.remove(event.id)
-              inFlightEvents.remove(deliveryTag)
-            } else {
-              //Redeliver, but keep track of the message
-              redeliveries.add(event.id)
-              persisterManager.lb ! Persist(event, payload, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
-            }
-          } else {
-            val eventWithReceivedMillis = event.withReceivedMillis(TimeHelpers.nowMillis()).asInstanceOf[E]
-            persisterManager.lb ! Persist(eventWithReceivedMillis, payload, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
-          }
-
-        } catch { case e: Exception ⇒
-          logger.error("While parsing incoming json bytes payload", e)
-
-          // If we could not create an object from the incoming json, then we just store the message
-          // and then ignore it.
-          // TODO: Possibly the sending site should setup a queue to accept such erroneous messages?
-          try {
-            storeUnparsedEvent(payload, e)
-            queue ! Acknowledge(deliveryTag)
-          } catch { case e: Exception ⇒
-            // Aquarium internal error here...
-            logger.error("Could not store unparsed json bytes payload", e)
-            queue ! Reject(deliveryTag, true)
-          }
-        }
-
-      case PersistOK(ackData) =>
-        logger.debug("Actor[%s] - Stored event[%s] msg[%d] - %d left".format(self.getUuid(), ackData.msgId, ackData.deliveryTag, inFlightEvents.size))
-        ackData.queue ! Acknowledge(ackData.deliveryTag)
-
-      case PersistFailed(ackData) =>
-        //Give the message a chance to be processed by other processors
-        logger.error("Actor[%s] - Storing event[%s] msg[%d] failed".format(self.getUuid(), ackData.msgId, ackData.deliveryTag))
-        inFlightEvents.remove(ackData.deliveryTag)
-        ackData.queue ! Reject(ackData.deliveryTag, true)
-
-      case Duplicate(ackData) =>
-        logger.debug("Actor[%s] - Event[%s] msg[%d] is duplicate".format(self.getUuid(), ackData.msgId, ackData.deliveryTag))
-        inFlightEvents.remove(ackData.deliveryTag)
-        ackData.queue ! Reject(ackData.deliveryTag, false)
-
-      case Acknowledged(deliveryTag) =>
-        logger.debug("Actor[%s] - Msg with tag [%d] acked. Forwarding...".format(self.getUuid(), deliveryTag))
-        forward(inFlightEvents.remove(deliveryTag))
-
-      case Rejected(deliveryTag) =>
-        logger.debug("Actor[%s] - Msg with tag [%d] rejected".format(self.getUuid(), deliveryTag))
-
-      case _ => logger.warn("Unknown message")
-    }
-
-    override def preStart = {
-      logger.debug("Starting actor QueueReader-%s".format(self.getUuid()))
-      super.preStart
-    }
-
-    self.dispatcher = queueReaderManager.dispatcher
-  }
-
-  class Persister extends ReflectiveActor {
-
-    def knownMessageTypes = Set(classOf[Persist])
-
-    override protected def onThrowable(t: Throwable, servicingMessage: AnyRef) {
-      logChainOfCauses(t)
-      servicingMessage match {
-        case Persist(event, initialPayload, sender, ackData) ⇒
-          sender ! PersistFailed(ackData)
-          logger.error("While persisting", t)
-      }
-    }
-
-    def onPersist(persist: Persist): Unit = {
-      persist match {
-        case Persist(event, initialPayload, sender, ackData) ⇒
-          if(existsInStore(event)) {
-            sender ! Duplicate(ackData)
-          }
-          else {
-            storeParsedEvent(event, initialPayload)
-            sender ! PersistOK(ackData)
-          }
-      }
-    }
-
-    self.dispatcher = persisterManager.dispatcher
-  }
-
-  class QueueReaderManager {
-    lazy val lb = loadBalancerActor(new CyclicIterator(actors))
-
-    lazy val dispatcher =
-      Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "-queuereader")
-        .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(1000)
-        .setMaxPoolSize(2 * numCPUs)
-        .setCorePoolSize(queueReaderThreads)
-        .setKeepAliveTimeInMillis(60000)
-        .setRejectionPolicy(new CallerRunsPolicy).build
-
-    lazy val actors =
-      for(i <- 0 until numQueueActors) yield {
-        val actor = actorOf(new QueueReader)
-        supervisor.link(actor)
-        actor.start()
-        actor
-      }
-
-    def stop() = dispatcher.stopAllAttachedActors
-  }
-
-  class PersisterManager {
-    lazy val lb = loadBalancerActor(new CyclicIterator(actors))
-
-    val dispatcher =
-      Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "-persister")
-        .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(1000)
-        .setMaxPoolSize(2 * numCPUs)
-        .setCorePoolSize(persisterThreads)
-        .setKeepAliveTimeInMillis(60000)
-        .setRejectionPolicy(new CallerRunsPolicy).build
-
-    lazy val actors =
-      for(i <- 0 until numPersisterActors) yield {
-        val actor = actorOf(new Persister)
-        supervisor.link(actor)
-        actor.start()
-        actor
-      }
-
-    def stop() = dispatcher.stopAllAttachedActors
-  }
-
-}
\ No newline at end of file
diff --git a/src/main/scala/gr/grnet/aquarium/service/IMEventProcessorService.scala b/src/main/scala/gr/grnet/aquarium/service/IMEventProcessorService.scala
deleted file mode 100644 (file)
index e3e738a..0000000
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- *   1. Redistributions of source code must retain the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer in the documentation and/or other materials
- *      provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.service
-
-
-import gr.grnet.aquarium.actor.RouterRole
-import gr.grnet.aquarium.store.LocalFSEventStore
-import gr.grnet.aquarium.util.date.TimeHelpers
-import gr.grnet.aquarium.util.makeString
-import com.ckkloverdos.maybe._
-import gr.grnet.aquarium.actor.message.event.ProcessIMEvent
-import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
-import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService
-
-/**
- * An event processor service for user events coming from the IM system
- *
- * @author Georgios Gousios <gousiosg@gmail.com>
- */
-class IMEventProcessorService extends EventProcessorService[IMEventModel] {
-
-  override def parseJsonBytes(data: Array[Byte]) = {
-    StdIMEvent.fromJsonBytes(data)
-  }
-
-  override def forward(event: IMEventModel) = {
-    if(event ne null) {
-      _configurator.actorProvider.actorForRole(RouterRole) ! ProcessIMEvent(event)
-    }
-  }
-
-  override def existsInStore(event: IMEventModel) =
-    _configurator.imEventStore.findIMEventById(event.id).isDefined
-
-  override def storeParsedEvent(event: IMEventModel, initialPayload: Array[Byte]) = {
-    // 1. Store to local FS for debugging purposes.
-    //    BUT be resilient to errors, since this is not critical
-    if(_configurator.eventsStoreFolder.isJust) {
-      Maybe {
-        LocalFSEventStore.storeIMEvent(_configurator, event, initialPayload)
-      }
-    }
-
-    // 2. Store to DB
-    _configurator.imEventStore.insertIMEvent(event)
-  }
-
-  protected def storeUnparsedEvent(initialPayload: Array[Byte], exception: Throwable): Unit = {
-    val json = makeString(initialPayload)
-
-    LocalFSEventStore.storeUnparsedIMEvent(_configurator, initialPayload, exception)
-  }
-
-  override def queueReaderThreads: Int = 1
-
-  override def persisterThreads: Int = numCPUs
-
-  protected def numQueueActors = 2 * queueReaderThreads
-
-  protected def numPersisterActors = 2 * persisterThreads
-
-  override def name = "usrevtproc"
-
-  lazy val persister = new PersisterManager
-  lazy val queueReader = new QueueReaderManager
-
-  override def persisterManager = persister
-
-  override def queueReaderManager = queueReader
-
-  def start() {
-    logStarting()
-    val (ms0, ms1, _) = TimeHelpers.timed {
-      declareQueues(RabbitMQService.RabbitMQConfKeys.amqp_imevents_queues)
-    }
-    logStarted(ms0, ms1)
-  }
-
-  def stop() {
-    logStopping()
-    val (ms0, ms1, _) = TimeHelpers.timed {
-      queueReaderManager.stop()
-      persisterManager.stop()
-    }
-    logStopped(ms0, ms1)
-  }
-}
\ No newline at end of file
diff --git a/src/main/scala/gr/grnet/aquarium/service/ResourceEventProcessorService.scala b/src/main/scala/gr/grnet/aquarium/service/ResourceEventProcessorService.scala
deleted file mode 100644 (file)
index 8dd46ea..0000000
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- *   1. Redistributions of source code must retain the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer in the documentation and/or other materials
- *      provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.service
-
-import gr.grnet.aquarium.actor.RouterRole
-import gr.grnet.aquarium.store.LocalFSEventStore
-import com.ckkloverdos.maybe.Maybe
-import gr.grnet.aquarium.util.date.TimeHelpers
-import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
-import gr.grnet.aquarium.actor.message.event.ProcessResourceEvent
-import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.{RabbitMQConfKeys ⇒ Keys}
-
-/**
- * An event processor service for resource events
- *
- * @author Georgios Gousios <gousiosg@gmail.com>
- */
-final class ResourceEventProcessorService extends EventProcessorService[ResourceEventModel] {
-
-  override def parseJsonBytes(data: Array[Byte]) = {
-    StdResourceEvent.fromJsonBytes(data)
-  }
-
-  override def forward(event: ResourceEventModel): Unit = {
-    if(event ne null) {
-      val businessLogicDispacther = _configurator.actorProvider.actorForRole(RouterRole)
-      businessLogicDispacther ! ProcessResourceEvent(event)
-    }
-  }
-
-  override def existsInStore(event: ResourceEventModel): Boolean =
-    _configurator.resourceEventStore.findResourceEventById(event.id).isDefined
-
-  override def storeParsedEvent(event: ResourceEventModel, initialPayload: Array[Byte]): Unit = {
-    // 1. Store to local FS for debugging purposes.
-    //    BUT be resilient to errors, since this is not critical
-    if(_configurator.eventsStoreFolder.isJust) {
-      Maybe {
-        LocalFSEventStore.storeResourceEvent(_configurator, event, initialPayload)
-      }
-    }
-
-    // 2. Store to DB
-    _configurator.resourceEventStore.insertResourceEvent(event)
-  }
-
-
-  protected def storeUnparsedEvent(initialPayload: Array[Byte], exception: Throwable): Unit = {
-    // TODO: Also save to DB, just like we do for UserEvents
-    LocalFSEventStore.storeUnparsedResourceEvent(_configurator, initialPayload, exception)
-  }
-
-  override def queueReaderThreads: Int = 1
-
-  override def persisterThreads: Int = numCPUs + 4
-
-  override def numQueueActors: Int = 1 * queueReaderThreads
-
-  override def numPersisterActors: Int = 2 * persisterThreads
-
-  override def name = "resevtproc"
-
-  lazy val persister = new PersisterManager
-  lazy val queueReader = new QueueReaderManager
-
-  override def persisterManager = persister
-
-  override def queueReaderManager = queueReader
-
-  def start() {
-    logStarting()
-    val (ms0, ms1, _) = TimeHelpers.timed {
-      declareQueues(Keys.amqp_rcevents_queues)
-    }
-    logStarted(ms0, ms1)
-  }
-
-  def stop() {
-    logStopping()
-    val (ms0, ms1, _) = TimeHelpers.timed {
-      queueReaderManager.stop()
-      persisterManager.stop()
-    }
-    logStopped(ms0, ms1)
-  }
-}
\ No newline at end of file
index 3e0091e..62ec9c2 100644 (file)
@@ -58,7 +58,6 @@ class SimpleLocalRoleableActorProviderService extends RoleableActorProviderServi
 
   def configure(props: Props): Unit = {
     this._props = props
-    logger.debug("Configured with props")
   }
 
   private[this] def __doStart(): Unit = {
@@ -122,7 +121,7 @@ class SimpleLocalRoleableActorProviderService extends RoleableActorProviderServi
 }
 
 object SimpleLocalRoleableActorProviderService {
-  // Always set Dispatcher at the end.
+  // Always set Router at the end.
   // We could definitely use some automatic dependency sorting here (topological sorting anyone?)
   final val RolesToBeStarted = List(
     //    ResourceProcessorRole,
diff --git a/src/main/scala/gr/grnet/aquarium/util/RandomEventGenerator.scala b/src/main/scala/gr/grnet/aquarium/util/RandomEventGenerator.scala
deleted file mode 100644 (file)
index 592f5fd..0000000
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- *   1. Redistributions of source code must retain the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer in the documentation and/or other materials
- *      provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.util
-
-import akka.amqp._
-import util.Random
-import scopt.OptionParser
-import gr.grnet.aquarium.messaging.AkkaAMQP
-import java.lang.StringBuffer
-import gr.grnet.aquarium.logic.accounting.Policy
-import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
-import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
-
-/**
- *  Generates random resource events to use as input for testing and
- *  injects them to the specified exchange.
- *
- * @author Georgios Gousios <gousiosg@gmail.com>
- */
-trait RandomEventGenerator extends AkkaAMQP {
-
-  val userIds = 1 to 1000
-  val clientIds = 1 to 4
-  val vmIds = 1 to 4000
-  val resources = Policy.policy.resources.map{r => r.name}
-  val tsFrom = 1293840000000L //1/1/2011 0:00:00 GMT
-  val tsTo = 1325376000000L   //1/1/2012 0:00:00 GMT
-  val eventVersion = 1 to 4
-
-  private val seed = 0xdeadbeef
-  private lazy val rnd = new Random(seed)
-
-  /**
-   * Generate a random resource event
-   */
-  def nextUserEvent(): IMEventModel = {
-    val ts = tsFrom + (scala.math.random * ((tsTo - tsFrom) + 1)).asInstanceOf[Long]
-
-    new StdIMEvent(
-      id = CryptoUtils.sha1(genRndAsciiString(35)),
-      occurredMillis = ts.toLong,
-      receivedMillis = ts.toLong,
-      userID = userIds(rnd.nextInt(100)).toString,
-      clientID = "defclient",
-      isActive = rnd.nextBoolean,
-      role = Array("PROF", "STUDENT", "ADMIN").apply(rnd.nextInt(3)),
-      eventVersion = 1.toString,
-      eventType = Array("ACTIVE", "SUSPENDED").apply(rnd.nextInt(2)),
-      details = Map()
-    )
-  }
-
-  /**
-   * Generate a random resource event
-   */
-  def genPublishUserEvents(num: Int) = {
-    val publisher = producer(im_exchanges(0))
-
-    (1 to num).foreach {
-      n =>
-        var event = nextUserEvent()
-        publisher ! Message(event.toJsonString.getBytes, "astakos.user")
-    }
-  }
-
-  /**
-   * Generete and publish create events for test users
-   */
-  def initUsers(num: Int) = {
-    val publisher = producer(im_exchanges(0))
-
-    userIds.filter(_ < num).foreach {
-      i =>
-        val ts = tsFrom + (scala.math.random * ((tsTo - tsFrom) + 1)).asInstanceOf[Long]
-        val user = new StdIMEvent(
-          id = CryptoUtils.sha1(genRndAsciiString(35)),
-          occurredMillis = ts.toLong,
-          receivedMillis = ts.toLong,
-          userID = i.toString,
-          clientID = "defclient",
-          isActive = rnd.nextBoolean,
-          role = Array("PROF", "STUDENT", "ADMIN").apply(rnd.nextInt(3)),
-          eventVersion = 1.toString,
-          eventType = "CREATE",
-          details = Map()
-        )
-        publisher ! Message(user.toJsonString.getBytes, "astakos.user")
-    }
-  }
-
-  /**
-   * Get the next random resource event
-   */
-  def nextResourceEvent() : ResourceEventModel = {
-    val res = rnd.shuffle(resources).head
-
-    val extra = res match {
-      case "vmtime" => Map("vmid" -> rnd.nextInt(vmIds.max).toString)
-      case _ => Map[String, String]()
-    }
-
-    val value = res match {
-      case "vmtime" => rnd.nextInt(1)
-      case _ => rnd.nextInt(5000)
-    }
-
-    val ts = tsFrom + (scala.math.random * ((tsTo - tsFrom) + 1)).asInstanceOf[Long]
-    val str = genRndAsciiString(35)
-
-    new StdResourceEvent(
-      CryptoUtils.sha1(str),
-      ts, ts,
-      rnd.nextInt(userIds.max).toString,
-      rnd.nextInt(clientIds.max).toString,
-      res, "1", value, 1.toString, extra)
-  }
-
-  def genRndAsciiString(size: Int): String = {
-    (1 to size).map{
-      i => rnd.nextPrintableChar()
-    }.foldLeft(new StringBuffer()){
-      (a, b) => a.append(b)
-    }.toString
-  }
-
-  /**
-   * Generate resource events and publish them to the queue
-   */
-  def genPublishResEvents(num: Int) = {
-
-    assert(num > 0)
-    val publisher = producer(resevent_exchanges(0))
-
-    (1 to num).foreach {
-      n =>
-        var event = nextResourceEvent
-        publisher ! Message(event.toBytes,
-          "%s.%s.%s".format("",event.clientID, event.resource))
-    }
-  }
-}
-
-object RandomEventGen extends RandomEventGenerator {
-
-  case class Config(var i: Boolean = false,
-                    var u: Boolean = false,
-                    var r: Boolean = false,
-                    var nummsg: Int = 100)
-
-  val config = new Config
-
-  private val parser = new OptionParser("RandomEventGen") {
-    opt("i", "im-events", "Generate IM events", {config.i = true})
-    opt("u", "user-create", "Generate IM events that create users", {config.u = true})
-    opt("r", "resource-events", "Generate resource events", {config.r = true})
-    arg("nummsgs", "Number of msgs to generate", {num: String => config.nummsg = Integer.parseInt(num)})
-  }
-
-  def main(args: Array[String]): Unit = {
-
-    if (!parser.parse(args))
-      errexit
-
-    if (!config.i && !config.u && !config.r) {
-      println("One of -i, -u, -r must be specified")
-      errexit
-    }
-
-    println("Publishing %d msgs, hit Ctrl+c to stop".format(config.nummsg))
-    if (config.r) genPublishResEvents(config.nummsg)
-    if (config.u) initUsers(config.nummsg)
-    if (config.i) genPublishUserEvents(config.nummsg)
-  }
-  
-  private def errexit() = {
-    print(parser.usage)
-    System.exit(-1)
-  }
-}
index c84ae0f..0e841fb 100644 (file)
@@ -41,15 +41,30 @@ package gr.grnet.aquarium.util
  */
 
 object ReflectHelpers {
-  def setField[C <: AnyRef, A : Manifest](container: C, fieldName: String, value: A): Unit = {
+  def setField[C <: AnyRef, A : Manifest](container: C,
+                                          fieldName: String,
+                                          value: A,
+                                          synchronizeOnContainer: Boolean = true): Unit = {
+    require(container ne null, "container is null")
+    require(fieldName ne null, "fieldName is null")
+    require(fieldName.length > 0, "fieldName is empty")
+
     val field = container.getClass.getDeclaredField(fieldName)
     field.setAccessible(true)
 
-    manifest[A] match {
-      case Manifest.Byte ⇒ field.setBoolean(container, value.asInstanceOf[Boolean])
-      case Manifest.Int  ⇒ field.setInt(container, value.asInstanceOf[Int])
-      case Manifest.Long ⇒ field.setLong(container, value.asInstanceOf[Long])
-      case _             ⇒ field.set(container, value)
+    def doSet(): Unit = {
+      manifest[A] match {
+        case Manifest.Byte ⇒ field.setBoolean(container, value.asInstanceOf[Boolean])
+        case Manifest.Int  ⇒ field.setInt(container, value.asInstanceOf[Int])
+        case Manifest.Long ⇒ field.setLong(container, value.asInstanceOf[Long])
+        case _             ⇒ field.set(container, value)
+      }
+    }
+
+    if(synchronizeOnContainer) {
+      container synchronized doSet()
+    } else {
+      doSet()
     }
   }
 }
index 3d1855d..570fe20 100644 (file)
@@ -62,6 +62,9 @@ package object util {
     }
   }
 
+  /**
+   * Runs function `f` on `c` and finally calls `c.close()` regardless of any exception.
+   */
   def withCloseable[C <: { def close(): Unit}, A](c: C)(f: C => A): A = {
     try {
       f(c)
diff --git a/src/test/scala/gr/grnet/aquarium/messaging/AkkaAMQPTest.scala b/src/test/scala/gr/grnet/aquarium/messaging/AkkaAMQPTest.scala
deleted file mode 100644 (file)
index a26e4d2..0000000
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- *   1. Redistributions of source code must retain the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer in the documentation and/or other materials
- *      provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.messaging
-
-import gr.grnet.aquarium.util.RandomEventGenerator
-import org.junit.Test
-import akka.actor.Actor
-import java.util.concurrent.{TimeUnit, CountDownLatch}
-import akka.amqp._
-import akka.config.Supervision.Permanent
-import java.util.concurrent.atomic.AtomicInteger
-import org.junit.Assume._
-import gr.grnet.aquarium.{AquariumException, LogicTestsAssumptions}
-
-/**
- *
- *
- * @author Georgios Gousios <gousiosg@gmail.com>
- */
-
-class AkkaAMQPTest extends RandomEventGenerator {
-
-  @Test
-  def testSendReceive() : Unit = {
-
-    assumeTrue(LogicTestsAssumptions.EnableRabbitMQTests)
-
-    val numMsg = 100
-    val msgs = new AtomicInteger(0)
-
-    val publisher = producer(aquarium_exchnage)
-
-    class Consumer extends Actor {
-
-      self.lifeCycle = Permanent
-
-      def receive = {
-        case Delivery(payload, routingKey, deliveryTag, isRedeliver, properties, sender) =>
-          println(this + " Got message: %s (%d)".format(new String(payload), deliveryTag) +
-            (if(isRedeliver) " - redelivered (%d)".format(deliveryTag) else ""))
-          if (msgs.incrementAndGet() == 15) throw new AquariumException("Messed up")
-          if (msgs.incrementAndGet() == 55) sender ! Reject(deliveryTag, true)
-          sender ! Acknowledge(deliveryTag)
-        case Acknowledged(deliveryTag) => println("Acked: " + deliveryTag)
-        case _ => println("Unknown delivery")
-      }
-
-      override def preRestart(reason: Throwable) {
-        println("Actor restarted: " + reason)
-      }
-
-      override def postRestart(reason: Throwable) {
-        // reinit stable state after restart
-      }
-    }
-
-    consumer("foo.#", "aquarium-rabbitmq-test",
-      aquarium_exchnage, Actor.actorOf(new Consumer), false)
-    Thread.sleep(2000)
-
-    (1 to numMsg).foreach{
-      i =>  publisher ! new Message(i.toString.getBytes(), "foo.bar")
-    }
-
-    Thread.sleep(5000)
-
-    //AMQP.shutdownAll
-    Actor.registry.shutdownAll
-  }
-}
\ No newline at end of file
diff --git a/src/test/scala/gr/grnet/aquarium/store/mongodb/EventStoreTest.scala b/src/test/scala/gr/grnet/aquarium/store/mongodb/EventStoreTest.scala
deleted file mode 100644 (file)
index f374dfa..0000000
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- *   1. Redistributions of source code must retain the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer.
- *
- *   2. Redistributions in binary form must reproduce the above
- *      copyright notice, this list of conditions and the following
- *      disclaimer in the documentation and/or other materials
- *      provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.store.mongodb
-
-import org.junit.Assert._
-import org.junit.Assume._
-import gr.grnet.aquarium.Configurator._
-import gr.grnet.aquarium.util.{RandomEventGenerator, TestMethods}
-import collection.mutable.ArrayBuffer
-import org.junit.{After, Test}
-import gr.grnet.aquarium.{StoreConfigurator, LogicTestsAssumptions}
-import gr.grnet.aquarium.store.memory.MemStore
-import gr.grnet.aquarium.event.model.resource.ResourceEventModel
-
-/**
- * @author Georgios Gousios <gousiosg@gmail.com>
- */
-class EventStoreTest extends TestMethods
-with RandomEventGenerator with StoreConfigurator {
-
-  @Test
-  def testStoreEvent() = {
-    assumeTrue(LogicTestsAssumptions.EnableStoreTests)
-
-    val event = nextResourceEvent()
-    val store = config.resourceEventStore
-    val result = store.insertResourceEvent(event)
-  }
-
-  @Test
-  def testFindEventById(): Unit = {
-    assumeTrue(LogicTestsAssumptions.EnableStoreTests)
-
-    val event = nextResourceEvent()
-    val store = config.resourceEventStore
-
-    store.insertResourceEvent(event)
-
-    store.findResourceEventById(event.id)
-  }
-
-  @Test
-  def testfindEventsByUserId(): Unit = {
-    assumeTrue(LogicTestsAssumptions.EnableStoreTests)
-    val events = new ArrayBuffer[ResourceEventModel]()
-    val store = config.resourceEventStore
-
-    (1 to 100).foreach {
-      n =>
-        val e = nextResourceEvent
-        events += e
-        store.insertResourceEvent(e)
-    }
-
-    val mostUsedId = events
-      .map{x => x.userID}
-      .groupBy(identity)
-      .mapValues(_.size)
-      .foldLeft(("",0))((acc, kv) => if (kv._2 > acc._2) kv else acc)._1
-
-    val result = store.findResourceEventsByUserId(mostUsedId)(None)
-    assertEquals(events.filter(p => p.userID.equals(mostUsedId)).size, result.size)
-  }
-
-  @Test
-  def testMultipleMongos = {
-    assumeTrue(LogicTestsAssumptions.EnableStoreTests)
-    val a = getMongo
-    val b = getMongo
-    //assertEquals(a.Connection.mongo.get.hashCode(), b.Connection.mongo.get.hashCode())
-  }
-
-  @After
-  def after: Unit = {
-    if (isInMemStore) return
-
-    val a = getMongo
-
-    val col = a.mongo.getDB(
-      MasterConfigurator.get(MongoDBStoreProvider.MongoDBKeys.dbschema)
-    ).getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION)
-
-    //val res = col.find
-    //while (res.hasNext)
-    //  col.remove(res.next)
-  }
-
-  private lazy val config = configurator
-  private val isInMemStore = config.resourceEventStore.isInstanceOf[MemStore]
-  private def getMongo = config.resourceEventStore.asInstanceOf[MongoDBStore]
-}
\ No newline at end of file