/**
* Reflectively provide a new instance of a class and configure it appropriately.
*/
- private[this] def newInstance[C : Manifest](className: String): C = {
+ private[this] def newInstance[C : Manifest](_className: String = ""): C = {
+ val className = _className match {
+ case "" ⇒
+ manifest[C].erasure.getName
+
+ case name ⇒
+ name
+ }
+
val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
instanceM match {
case Just(instance) ⇒ instance match {
MaybeEither(configurable configure localProps) match {
case Just(_) ⇒
+ logger.debug("Configured {} with props", configurable.getClass.getName)
instance
case Failed(e) ⇒
}
- private[this] lazy val _actorProvider: RoleableActorProviderService = {
- val instance = newInstance[RoleableActorProviderService](props.getEx(Keys.actor_provider_class))
- logger.info("Loaded %s: %s".format(shortNameOfClass(classOf[RoleableActorProviderService]), instance.getClass))
- instance
- }
+ private[this] lazy val _actorProvider = newInstance[RoleableActorProviderService](props(Keys.actor_provider_class))
/**
* Initializes a store provider, according to the value configured
* in the configuration file. The
*/
- private[this] lazy val _storeProvider: StoreProvider = {
- val instance = newInstance[StoreProvider](props.getEx(Keys.store_provider_class))
- logger.info("Loaded %s: %s".format(shortNameOfClass(classOf[StoreProvider]), instance.getClass))
- instance
- }
+ private[this] lazy val _storeProvider = newInstance[StoreProvider](props(Keys.store_provider_class))
- private[this] lazy val _restService: Lifecycle = {
- val instance = newInstance[Lifecycle](props.getEx(Keys.rest_service_class))
- logger.info("Loaded RESTService: %s".format(instance.getClass))
- instance
- }
+ private[this] lazy val _restService = newInstance[Lifecycle](props(Keys.rest_service_class))
private[this] lazy val _userStateStoreM: Maybe[UserStateStore] = {
// If there is a specific `UserStateStore` implementation specified in the
private[this] lazy val _converters = StdConverters.AllConverters
- private[this] lazy val _resEventProc = new ResourceEventProcessorService
+ private[this] lazy val _akka = newInstance[AkkaService]()
- private[this] lazy val _imEventProc = new IMEventProcessorService
+ private[this] lazy val _eventBus = newInstance[EventBusService]()
- private[this] lazy val _eventBus = newInstance[EventBusService](classOf[EventBusService].getName)
+ private[this] lazy val _rabbitmqService = newInstance[RabbitMQService]()
- private[this] lazy val _lifecycleServices = List(
- AkkaService,
+ private[this] lazy val _allServices = List(
+ _akka,
+ _actorProvider,
_eventBus,
_restService,
- _actorProvider,
- newInstance[RabbitMQService](classOf[RabbitMQService].getName)
- )/*,
- _resEventProc,
- _imEventProc)*/
+ _rabbitmqService
+ )
def get(key: String, default: String = ""): String = props.getOr(key, default)
}
def startServices(): Unit = {
- _lifecycleServices.foreach(_.start())
+ for(service ← _allServices) {
+ service.start()
+ }
}
def stopServices(): Unit = {
- _lifecycleServices.reverse.foreach(_.stop())
-// akka.actor.Actor.registry.shutdownAll()
+ _allServices.reverse.foreach(service ⇒ safeUnit(service.stop()))
}
def stopServicesWithDelay(millis: Long) {
final val events_store_folder = "events.store.folder"
/**
- * If set to `true`, then an IM event that cannot be parsed to [[gr.grnet.aquarium.event.im.IMEventModel]] is
+ * If set to `true`, then an IM event that cannot be parsed to [[gr.grnet.aquarium.event.model.im.IMEventModel]] is
* saved to the [[gr.grnet.aquarium.store.IMEventStore]].
*/
final val save_unparsed_event_im = "save.unparsed.event.im"
--- /dev/null
+/*
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.connector.handler
+
+/**
+ * An executor for a [[gr.grnet.aquarium.connector.handler.PayloadHandler]].
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+
+trait PayloadHandlerExecutor {
+ def exec(payload: Array[Byte], handler: PayloadHandler)
+ (onSuccess: HandlerResult ⇒ Unit)
+ (onError: Throwable ⇒ Unit): Unit
+}
--- /dev/null
+/*
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.connector.handler
+
+/**
+ * A [[gr.grnet.aquarium.connector.handler.PayloadHandlerExecutor]] that calls the
+ * [[gr.grnet.aquarium.connector.handler.PayloadHandler]] synchronously.
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+
+final class SynchronousPayloadHandlerExecutor extends PayloadHandlerExecutor {
+
+ def exec(payload: Array[Byte], handler: PayloadHandler)
+ (onSuccess: HandlerResult ⇒ Unit)
+ (onError: Throwable ⇒ Unit): Unit = {
+
+ try onSuccess(handler.handlePayload(payload))
+ catch { case e: Throwable ⇒ onError(e) }
+ }
+}
import gr.grnet.aquarium.connector.rabbitmq.conf.RabbitMQConsumerConf
import gr.grnet.aquarium.util.{Lifecycle, Loggable}
-import gr.grnet.aquarium.util.safeUnit
+import gr.grnet.aquarium.util.{safeUnit, shortClassNameOf}
import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.{RabbitMQQueueKeys, RabbitMQExchangeKeys}
import com.rabbitmq.client.{Envelope, Consumer, ShutdownSignalException, ShutdownListener, ConnectionFactory, Channel, Connection}
import com.rabbitmq.client.AMQP.BasicProperties
import gr.grnet.aquarium.connector.rabbitmq.eventbus.RabbitMQError
import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
import gr.grnet.aquarium.service.event.BusEvent
-import gr.grnet.aquarium.connector.handler.{HandlerResultPanic, HandlerResultRequeue, HandlerResultReject, HandlerResultSuccess, PayloadHandler}
+import gr.grnet.aquarium.connector.handler.{PayloadHandlerExecutor, HandlerResultPanic, HandlerResultRequeue, HandlerResultReject, HandlerResultSuccess, PayloadHandler}
+import com.ckkloverdos.maybe.MaybeEither
+import gr.grnet.aquarium.util.date.TimeHelpers
/**
+ * A basic `RabbitMQ` consumer. Sufficiently generalized, sufficiently tied to Aquarium.
*
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
-class RabbitMQConsumer(val conf: RabbitMQConsumerConf, handler: PayloadHandler) extends Loggable with Lifecycle {
+class RabbitMQConsumer(conf: RabbitMQConsumerConf,
+ handler: PayloadHandler,
+ executor: PayloadHandlerExecutor) extends Loggable with Lifecycle {
private[this] var _factory: ConnectionFactory = _
private[this] var _connection: Connection = _
private[this] var _channel: Channel = _
- private[this] val _isAlive = new AtomicBoolean(false)
+// private[this] val _isAlive = new AtomicBoolean(false)
private[this] val _state = new AtomicReference[State](Shutdown)
- def isAlive() = {
- _isAlive.get() && _state.get().isStarted
+ def isStarted() = {
+ _state.get().isStarted && MaybeEither(_channel.isOpen).getOr(false)
}
sealed trait State {
_state.set(ShutdownSequence)
safeUnit(_channel.close())
safeUnit(_connection.close())
- _isAlive.set(false)
_state.set(Shutdown)
}
channel.addShutdownListener(RabbitMQShutdownListener)
- if(_channel.isOpen) {
- _isAlive.getAndSet(true)
- }
-
_channel.basicConsume(
queueName,
false, // We send explicit acknowledgements to RabbitMQ
def start(): Unit = {
try {
- doFullStartupSequence()
- _state.set(Started)
+ logStarting()
+ val (ms0, ms1, _) = TimeHelpers.timed {
+ doFullStartupSequence()
+ _state.set(Started)
+ }
+ logStarted(ms0, ms1, toDebugString)
} catch {
case e: Exception ⇒
doSafeFullShutdownSequence(true)
+ logger.error("While starting", e)
}
}
+ def stop() = {
+ logStopping()
+ val (ms0, ms1, _) = TimeHelpers.timed(doSafeFullShutdownSequence(false))
+ logStopped(ms0, ms1, toDebugString)
+ }
+
private[this] def postBusError(event: BusEvent): Unit = {
Configurator.MasterConfigurator.eventBus ! event
}
}
def handleDelivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte]) = {
+ def doError: PartialFunction[Throwable, Unit] = {
+ case e: Exception ⇒
+ logger.warn("Unexpected error", e)
+
+ case e: Throwable ⇒
+ throw e
+ }
+
try {
val deliveryTag = envelope.getDeliveryTag
- val hresult = handler.handlePayload(body)
- hresult match {
+
+ executor.exec(body, handler) {
case HandlerResultSuccess ⇒
doWithChannel(_.basicAck(deliveryTag, false))
// The other end is crucial to the overall operation and it is in panic mode,
// so we stop delivering messages until further notice
doSafeFullShutdownSequence(true)
- }
- } catch {
- case e: Exception ⇒
- logger.warn("Unexpected error", e)
- }
+ } (doError)
+ } catch (doError)
}
}
def shutdownCompleted(cause: ShutdownSignalException) = {
safeUnit { _channel.close() }
- _isAlive.getAndSet(false)
// Now, let's see what happened
if(isConnectionError(cause)) {
}
}
- def stop() = {
+ def toDebugString = {
+ import conf._
+ "(exchange=%s, routingKey=%s, queue=%s)".format(exchangeName, routingKey, queueName)
+ }
+ override def toString = {
+ "%s%s".format(shortClassNameOf(this), toDebugString)
}
}
import gr.grnet.aquarium.util.safeUnit
/**
+ * Generic handler of events arriving to Aquarium.
+ *
+ * We first parse them to JSON ([[gr.grnet.aquarium.converter.JsonTextFormat]]) and an appropriate event model
+ * (`E <:` [[gr.grnet.aquarium.event.model.ExternalEventModel]]),
+ * then store them to DB
+ * (`S <:` [[gr.grnet.aquarium.event.model.ExternalEventModel]])
+ * and then forward them to business logic.
+ *
+ * All the above actions are given polymorphically via appropriate functions.
*
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
--- /dev/null
+/*
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.connector.rabbitmq.service
+
+import akka.dispatch.Future
+import gr.grnet.aquarium.connector.handler.{PayloadHandler, HandlerResult, PayloadHandlerExecutor}
+
+
+/**
+ * An [[gr.grnet.aquarium.connector.handler.PayloadHandlerExecutor]] that uses `Akka` futures.
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+
+final class PayloadHandlerFutureExecutor extends PayloadHandlerExecutor {
+
+ def exec(payload: Array[Byte], handler: PayloadHandler)
+ (onSuccess: HandlerResult ⇒ Unit)
+ (onError: Throwable ⇒ Unit): Unit = {
+
+ val result = Future { handler.handlePayload(payload) }
+
+ result.onComplete { futureHandlerResult ⇒
+ futureHandlerResult.value.get match {
+ case Left(e) ⇒
+ onError(e)
+
+ case Right(handlerResult) ⇒
+ onSuccess(handlerResult)
+ }
+ }
+ }
+}
import com.ckkloverdos.props.Props
import gr.grnet.aquarium.util.date.TimeHelpers
-import gr.grnet.aquarium.util.{ReflectHelpers, Loggable, Lifecycle}
-import com.rabbitmq.client.{ConnectionFactory, Address}
+import gr.grnet.aquarium.util.{Loggable, Lifecycle}
+import com.rabbitmq.client.Address
import gr.grnet.aquarium.{Configurator, Configurable}
-import gr.grnet.aquarium.connector.rabbitmq.eventbus.RabbitMQError
import gr.grnet.aquarium.connector.rabbitmq.conf.{TopicExchange, RabbitMQConsumerConf, RabbitMQExchangeType}
import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.RabbitMQConfKeys
import com.ckkloverdos.key.{ArrayKey, IntKey, TypedKeySkeleton, BooleanKey, StringKey}
import gr.grnet.aquarium.converter.{JsonTextFormat, StdConverters}
import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
-import com.ckkloverdos.maybe.{MaybeEither, Failed, Just, Maybe}
-import gr.grnet.aquarium.connector.handler.{HandlerResultSuccess, HandlerResultPanic, HandlerResultReject, HandlerResult, PayloadHandler}
+import com.ckkloverdos.maybe.MaybeEither
import gr.grnet.aquarium.actor.RouterRole
import gr.grnet.aquarium.actor.message.event.{ProcessIMEvent, ProcessResourceEvent}
import gr.grnet.aquarium.store.{LocalFSEventStore, IMEventStore, ResourceEventStore}
*/
class RabbitMQService extends Loggable with Lifecycle with Configurable {
- private[this] val props: Props = Props()(StdConverters.AllConverters)
+ @volatile private[this] var _props: Props = Props()(StdConverters.AllConverters)
+ @volatile private[this] var _consumers = List[RabbitMQConsumer]()
def propertyPrefix = Some(RabbitMQService.PropertiesPrefix)
* If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
*/
def configure(props: Props) = {
- ReflectHelpers.setField(this, "props", props)
-
- try {
- doConfigure()
- logger.info("Configured with {}", this.props)
- } catch {
- case e: Exception ⇒
- // If we have no internal error, then something is bad with RabbitMQ
- eventBus ! RabbitMQError(e)
- throw e
- }
+ this._props = props
+
+ doConfigure()
}
private[this] def doConfigure(): Unit = {
StdIMEvent.fromJsonTextFormat(jsonTextFormat)
}
+ val rcForwardAction: (ResourceEventStore#ResourceEvent ⇒ Unit) = { rcEvent ⇒
+ router ! ProcessResourceEvent(rcEvent)
+ }
+
+ val rcDebugForwardAction: (ResourceEventStore#ResourceEvent ⇒ Unit) = { rcEvent ⇒
+ logger.info("Forwarding {}", rcEvent)
+ }
+
+ val imForwardAction: (IMEventStore#IMEvent ⇒ Unit) = { imEvent ⇒
+ router ! ProcessIMEvent(imEvent)
+ }
+
+ val imDebugForwardAction: (IMEventStore#IMEvent ⇒ Unit) = { imEvent ⇒
+ logger.info("Forwarding {}", imEvent)
+ }
+
val rcHandler = new GenericPayloadHandler[ResourceEventModel, ResourceEventStore#ResourceEvent](
jsonParser,
(payload, error) ⇒ LocalFSEventStore.storeUnparsedResourceEvent(configurator, payload, error),
rcEventParser,
rcEvent ⇒ resourceEventStore.insertResourceEvent(rcEvent),
- rcEvent ⇒ router ! ProcessResourceEvent(rcEvent)
+ rcDebugForwardAction
)
val imHandler = new GenericPayloadHandler[IMEventModel, IMEventStore#IMEvent](
(payload, error) ⇒ LocalFSEventStore.storeUnparsedIMEvent(configurator, payload, error),
imEventParser,
imEvent ⇒ imEventStore.insertIMEvent(imEvent),
- imEvent ⇒ router ! ProcessIMEvent(imEvent)
+ imDebugForwardAction
)
+ val futureExecutor = new PayloadHandlerFutureExecutor
+
// (e)xchange:(r)outing key:(q)
- val all_rc_ERQs = props.getTrimmedList(RabbitMQConfKeys.rcevents_queues)
- val rcConsumerConfs = for(oneERQ ← all_rc_ERQs) yield {
- RabbitMQService.makeRabbitMQConsumerConf(props, oneERQ)
+ logger.debug("%s=%s".format(RabbitMQConfKeys.rcevents_queues, _props(RabbitMQConfKeys.rcevents_queues)))
+ val all_rc_ERQs = _props.getTrimmedList(RabbitMQConfKeys.rcevents_queues)
+ logger.debug("all_rc_ERQs = %s".format(all_rc_ERQs))
+
+ val rcConsumerConfs_ = for(oneERQ ← all_rc_ERQs) yield {
+ RabbitMQService.makeRabbitMQConsumerConf(_props, oneERQ)
+ }
+ val rcConsumerConfs = rcConsumerConfs_.toSet.toList
+ if(rcConsumerConfs.size != rcConsumerConfs_.size) {
+ logger.warn(
+ "Duplicate %s consumer info in %s=%s".format(
+ RabbitMQService.PropertiesPrefix,
+ RabbitMQConfKeys.rcevents_queues,
+ _props(RabbitMQConfKeys.rcevents_queues)))
}
- val all_im_ERQs = props.getTrimmedList(RabbitMQConfKeys.imevents_queues)
- val imConsumerConfs = for(oneERQ ← all_im_ERQs) yield {
- RabbitMQService.makeRabbitMQConsumerConf(props, oneERQ)
+ val all_im_ERQs = _props.getTrimmedList(RabbitMQConfKeys.imevents_queues)
+ val imConsumerConfs_ = for(oneERQ ← all_im_ERQs) yield {
+ RabbitMQService.makeRabbitMQConsumerConf(_props, oneERQ)
+ }
+ val imConsumerConfs = imConsumerConfs_.toSet.toList
+ if(imConsumerConfs.size != imConsumerConfs_.size) {
+ logger.warn(
+ "Duplicate %s consumer info in %s=%s".format(
+ RabbitMQService.PropertiesPrefix,
+ RabbitMQConfKeys.imevents_queues,
+ _props(RabbitMQConfKeys.imevents_queues)))
}
val rcConsumers = for(rccc ← rcConsumerConfs) yield {
- new RabbitMQConsumer(rccc, rcHandler)
+ logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
+ RabbitMQService.PropertiesPrefix,
+ rccc.exchangeName,
+ rccc.routingKey,
+ rccc.queueName
+ ))
+ new RabbitMQConsumer(rccc, rcHandler, futureExecutor)
}
val imConsumers = for(imcc ← imConsumerConfs) yield {
- new RabbitMQConsumer(imcc, imHandler)
+ logger.info("Declaring %s consumer {exchange=%s, routingKey=%s, queue=%s}".format(
+ RabbitMQService.PropertiesPrefix,
+ imcc.exchangeName,
+ imcc.routingKey,
+ imcc.queueName
+ ))
+ new RabbitMQConsumer(imcc, imHandler, futureExecutor)
}
+
+ this._consumers = rcConsumers ++ imConsumers
+
+ val lg: (String ⇒ Unit) = if(this._consumers.size == 0) logger.warn(_) else logger.debug(_)
+ lg("Got %s consumers".format(this._consumers.size))
+
+ this._consumers.foreach(logger.debug("Configured {}", _))
}
def start() = {
- logStarted(TimeHelpers.nowMillis(), TimeHelpers.nowMillis())
- System.exit(1)
+ val (ms0, ms1, _) = TimeHelpers.timed {
+ this._consumers.foreach(_.start())
+
+ for(consumer ← this._consumers) {
+ if(!consumer.isStarted()) {
+ logger.warn("Not started yet {}", consumer.toDebugString)
+ }
+ }
+ }
+ logStarted(ms0, ms1)
}
def stop() = {
- logStopped(TimeHelpers.nowMillis(), TimeHelpers.nowMillis())
+ val (ms0, ms1, _) = TimeHelpers.timed {
+ this._consumers.foreach(_.stop())
+ }
+
+ logStopped(ms0, ms1)
}
}
final val PropertiesPrefix = "rabbitmq"
final val PropertiesPrefixAndDot = PropertiesPrefix + "."
- @inline private[this] def p(name: String) = PropertiesPrefix + name
+ @inline private[this] def p(name: String) = PropertiesPrefixAndDot + name
final val DefaultExchangeConfArguments = Env()
* configuration.
*/
final val servers = p("servers")
- final val amqp_servers = servers
/**
* Comma separated list of AMQP servers running in active-active
* configuration.
*/
final val port = p("port")
- final val amqp_port = port
/**
* User name for connecting with the AMQP server
*/
final val username = p("username")
- final val amqp_username = username
/**
* Password for connecting with the AMQP server
*/
final val password = p("passwd")
- final val amqp_password = password
/**
* Virtual host on the AMQP server
*/
final val vhost = p("vhost")
- final val amqp_vhost = vhost
/**
* Comma separated list of exchanges known to aquarium
* FIXME: What is this??
*/
final val exchange = p("exchange")
- final val amqp_exchange = exchange
/**
* Queues for retrieving resource events from. Multiple queues can be
* Format is `exchange:routing.key:queue-name,...`
*/
final val rcevents_queues = p("rcevents.queues")
- final val amqp_rcevents_queues = rcevents_queues
/**
* Queues for retrieving user events from. Multiple queues can be
* Format is `exchange:routing.key:queue-name,...`
*/
final val imevents_queues = p("imevents.queues")
- final val amqp_imevents_queues = imevents_queues
}
-
}
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.messaging
-
-import akka.actor._
-import akka.amqp.{Topic, AMQP}
-import akka.amqp.AMQP._
-import gr.grnet.aquarium.Configurator
-import com.rabbitmq.client.Address
-import gr.grnet.aquarium.util.Loggable
-
-import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.RabbitMQConfKeys
-
-/**
- * Functionality for working with queues.
- *
- * @author Georgios Gousios <gousiosg@gmail.com>
- */
-trait AkkaAMQP extends Loggable {
-
- class AMQPConnection {
- private[messaging] lazy val connection = {
-
- val mc = Configurator.MasterConfigurator
-
- val servers = mc.get(RabbitMQConfKeys.amqp_servers)
- val port = mc.get(RabbitMQConfKeys.amqp_port).toInt
-
- // INFO: Can connect to more than one rabbitmq nodes
- val addresses = servers.split(",").foldLeft(Array[Address]()) {
- (x, y) => x ++ Array(new Address(y, port))
- }
-
- AMQP.newConnection(
- ConnectionParameters(
- addresses,
- mc.get(RabbitMQConfKeys.amqp_username),
- mc.get(RabbitMQConfKeys.amqp_password),
- mc.get(RabbitMQConfKeys.amqp_vhost),
- 1000,
- None))
- }
- }
-
- lazy val im_exchanges =
- Configurator.MasterConfigurator.get(RabbitMQConfKeys.amqp_imevents_queues).split(';').map(e => e.split(':')(0))
-
- lazy val aquarium_exchnage = Configurator.MasterConfigurator.get(RabbitMQConfKeys.amqp_exchange)
-
- lazy val resevent_exchanges =
- Configurator.MasterConfigurator.get(RabbitMQConfKeys.amqp_rcevents_queues).split(';').map(e => e.split(':')(0))
-
- //Queues and exchnages are by default durable and persistent
- val decl = ActiveDeclaration(durable = true, autoDelete = false)
-
- def consumer(routekey: String, queue: String, exchange: String,
- recipient: ActorRef, selfAck: Boolean) =
- AMQP.newConsumer(
- connection = (new AMQPConnection()).connection,
- consumerParameters = ConsumerParameters(
- routingKey = routekey,
- exchangeParameters =
- // INFO "x-ha-policy" is used to make queues on all nodes
- Some(ExchangeParameters(exchange, Topic, decl, Map("x-ha-policy" -> "all"))),
- deliveryHandler = recipient,
- queueName = Some(queue),
- queueDeclaration = decl,
- selfAcknowledging = selfAck,
- /* Better safe than sorry */
- channelParameters = Some(ChannelParameters(prefetchSize = 1))
- ))
-
- def producer(exchange: String) = {
-
- AMQP.newProducer(
- connection = (new AMQPConnection()).connection,
- producerParameters = ProducerParameters(
- exchangeParameters = Some(ExchangeParameters(exchange, Topic, decl)),
- channelParameters = Some(ChannelParameters(prefetchSize = 1))))
- }
-}
\ No newline at end of file
* @author Christos KK Loverdos <loverdos@gmail.com>
*/
-object AkkaService extends Lifecycle with Loggable {
+final class AkkaService extends Lifecycle with Loggable {
def start() = {
logStarted(TimeHelpers.nowMillis(), TimeHelpers.nowMillis())
}
import gr.grnet.aquarium.service.event.BusEvent
import com.google.common.eventbus.{DeadEvent, Subscribe, EventBus}
import akka.actor.{ActorRef, Actor}
-import gr.grnet.aquarium.util.{ReflectHelpers, Lifecycle, Loggable}
+import gr.grnet.aquarium.util.{Lifecycle, Loggable}
+import gr.grnet.aquarium.util.safeUnit
/**
class EventBusService extends Loggable with Lifecycle with Configurable {
private[this] val theBus = new EventBus(classOf[EventBusService].getName)
- private[this] val poster: ActorRef = null
+ private[this] var _poster: ActorRef = null
def propertyPrefix = None
* If `propertyPrefix` is defined, then `props` contains only keys that start with the given prefix.
*/
def configure(props: Props) = {
-
}
def start() = {
val (ms0, ms1, _) = TimeHelpers.timed {
this addSubsciber this // Wow!
- val poster = Actor actorOf AsyncPoster
- ReflectHelpers.setField(this, "poster", poster)
+ this._poster = Actor actorOf AsyncPoster
}
logStarted(ms0, ms1)
}
def stop() = {
- logStopped(TimeHelpers.nowMillis(), TimeHelpers.nowMillis())
+ val (ms0, ms1, _) = TimeHelpers.timed(safeUnit(_poster.stop()))
+ logStopped(ms0, ms1)
}
@inline
}
def ![A <: BusEvent](event: A): Unit = {
- poster ! event
+ _poster ! event
}
def addSubsciber[A <: AnyRef](subscriber: A): Unit = {
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.service
-
-import gr.grnet.aquarium.util.{Lifecycle, Loggable}
-
-import akka.actor._
-import akka.actor.Actor._
-import akka.routing.CyclicIterator
-import akka.routing.Routing._
-import akka.dispatch.Dispatchers
-import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
-import akka.config.Supervision.SupervisorConfig
-import akka.config.Supervision.OneForOneStrategy
-import gr.grnet.aquarium.messaging.AkkaAMQP
-import akka.amqp._
-import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet}
-import gr.grnet.aquarium.util.date.TimeHelpers
-import gr.grnet.aquarium.{AquariumException, Configurator}
-import gr.grnet.aquarium.actor.ReflectiveActor
-import gr.grnet.aquarium.event.model.ExternalEventModel
-
-/**
- * An abstract service that retrieves Aquarium events from a queue,
- * stores them persistently and forwards them for further processing.
- * The processing happens among two load-balanced actor clusters
- * asynchronously. The number of employed actors is always equal to
- * the number of processors. The number of threads per load-balanced
- * cluster is configurable by subclasses.
- *
- * @author Georgios Gousios <gousiosg@gmail.com>
- */
-abstract class EventProcessorService[E <: ExternalEventModel] extends AkkaAMQP with Loggable with Lifecycle {
-
- /* Messages exchanged between the persister and the queuereader */
- case class AckData(msgId: String, deliveryTag: Long, queue: ActorRef)
-
- case class Persist(event: E, initialPayload: Array[Byte], sender: ActorRef, ackData: AckData)
-
- case class PersistOK(ackData: AckData)
-
- case class PersistFailed(ackData: AckData)
-
- case class Duplicate(ackData: AckData)
-
- /**
- * Short term storage for delivery tags to work around AMQP
- * limitation with redelivering rejected messages to same host.
- *
- * FIXME: Grow unbounded???
- */
- private val redeliveries = new ConcurrentSkipListSet[String]()
-
- /**
- * Temporarily keeps track of messages while being processed
- *
- * FIXME: Grow unbounded???
- */
- private val inFlightEvents = new ConcurrentHashMap[Long, E](200, 0.9F, 4)
-
- /* Supervisor actor for each event processing operation */
- private lazy val supervisor = Supervisor(SupervisorConfig(
- OneForOneStrategy(
- List(classOf[Exception]), //What exceptions will be handled
- 5, // maximum number of restart retries
- 5000 // within time in millis
- ), Nil
- ))
-
- protected def _configurator: Configurator = Configurator.MasterConfigurator
-
- protected def parseJsonBytes(data: Array[Byte]): E
-
- protected def forward(event: E): Unit
-
- protected def existsInStore(event: E): Boolean
-
- protected def storeParsedEvent(event: E, initialPayload: Array[Byte]): Unit
-
- protected def storeUnparsedEvent(initialPayload: Array[Byte], exception: Throwable): Unit
-
- protected def queueReaderThreads: Int
-
- protected def persisterThreads: Int
-
- protected def numQueueActors: Int
-
- protected def numPersisterActors: Int
-
- protected def name: String
-
- protected def persisterManager: PersisterManager
-
- protected def queueReaderManager: QueueReaderManager
-
- protected val numCPUs = Runtime.getRuntime.availableProcessors
-
- def start(): Unit
-
- def stop(): Unit
-
- protected def declareQueues(conf: String) = {
- val decl = _configurator.get(conf)
- decl.split(";").foreach {
- q =>
- val i = q.split(":")
-
- if(i.size < 3)
- throw new AquariumException("Queue declaration \"%s\" not correct".format(q))
-
- val exchange = i(0)
- val route = i(1)
- val qname = i(2)
- logger.info("Declaring queue '%s' for exchange '%s' and key '%s'".format(qname, exchange, route))
- consumer(route, qname, exchange, queueReaderManager.lb, false)
- }
- }
-
- class QueueReader extends Actor {
-
- def receive = {
- case Delivery(payload, _, deliveryTag, isRedeliver, _, queue) =>
- try {
- val event = parseJsonBytes(payload)
- inFlightEvents.put(deliveryTag, event)
-
- if(isRedeliver) {
- //Message could not be processed 3 times, just ignore it
- if(redeliveries.contains(event.id)) {
- logger.warn("Actor[%s] - Event[%s] msg[%d] redelivered >2 times. Rejecting".format(self.getUuid(), event, deliveryTag))
- queue ! Reject(deliveryTag, false)
- redeliveries.remove(event.id)
- inFlightEvents.remove(deliveryTag)
- } else {
- //Redeliver, but keep track of the message
- redeliveries.add(event.id)
- persisterManager.lb ! Persist(event, payload, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
- }
- } else {
- val eventWithReceivedMillis = event.withReceivedMillis(TimeHelpers.nowMillis()).asInstanceOf[E]
- persisterManager.lb ! Persist(eventWithReceivedMillis, payload, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
- }
-
- } catch { case e: Exception ⇒
- logger.error("While parsing incoming json bytes payload", e)
-
- // If we could not create an object from the incoming json, then we just store the message
- // and then ignore it.
- // TODO: Possibly the sending site should setup a queue to accept such erroneous messages?
- try {
- storeUnparsedEvent(payload, e)
- queue ! Acknowledge(deliveryTag)
- } catch { case e: Exception ⇒
- // Aquarium internal error here...
- logger.error("Could not store unparsed json bytes payload", e)
- queue ! Reject(deliveryTag, true)
- }
- }
-
- case PersistOK(ackData) =>
- logger.debug("Actor[%s] - Stored event[%s] msg[%d] - %d left".format(self.getUuid(), ackData.msgId, ackData.deliveryTag, inFlightEvents.size))
- ackData.queue ! Acknowledge(ackData.deliveryTag)
-
- case PersistFailed(ackData) =>
- //Give the message a chance to be processed by other processors
- logger.error("Actor[%s] - Storing event[%s] msg[%d] failed".format(self.getUuid(), ackData.msgId, ackData.deliveryTag))
- inFlightEvents.remove(ackData.deliveryTag)
- ackData.queue ! Reject(ackData.deliveryTag, true)
-
- case Duplicate(ackData) =>
- logger.debug("Actor[%s] - Event[%s] msg[%d] is duplicate".format(self.getUuid(), ackData.msgId, ackData.deliveryTag))
- inFlightEvents.remove(ackData.deliveryTag)
- ackData.queue ! Reject(ackData.deliveryTag, false)
-
- case Acknowledged(deliveryTag) =>
- logger.debug("Actor[%s] - Msg with tag [%d] acked. Forwarding...".format(self.getUuid(), deliveryTag))
- forward(inFlightEvents.remove(deliveryTag))
-
- case Rejected(deliveryTag) =>
- logger.debug("Actor[%s] - Msg with tag [%d] rejected".format(self.getUuid(), deliveryTag))
-
- case _ => logger.warn("Unknown message")
- }
-
- override def preStart = {
- logger.debug("Starting actor QueueReader-%s".format(self.getUuid()))
- super.preStart
- }
-
- self.dispatcher = queueReaderManager.dispatcher
- }
-
- class Persister extends ReflectiveActor {
-
- def knownMessageTypes = Set(classOf[Persist])
-
- override protected def onThrowable(t: Throwable, servicingMessage: AnyRef) {
- logChainOfCauses(t)
- servicingMessage match {
- case Persist(event, initialPayload, sender, ackData) ⇒
- sender ! PersistFailed(ackData)
- logger.error("While persisting", t)
- }
- }
-
- def onPersist(persist: Persist): Unit = {
- persist match {
- case Persist(event, initialPayload, sender, ackData) ⇒
- if(existsInStore(event)) {
- sender ! Duplicate(ackData)
- }
- else {
- storeParsedEvent(event, initialPayload)
- sender ! PersistOK(ackData)
- }
- }
- }
-
- self.dispatcher = persisterManager.dispatcher
- }
-
- class QueueReaderManager {
- lazy val lb = loadBalancerActor(new CyclicIterator(actors))
-
- lazy val dispatcher =
- Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "-queuereader")
- .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(1000)
- .setMaxPoolSize(2 * numCPUs)
- .setCorePoolSize(queueReaderThreads)
- .setKeepAliveTimeInMillis(60000)
- .setRejectionPolicy(new CallerRunsPolicy).build
-
- lazy val actors =
- for(i <- 0 until numQueueActors) yield {
- val actor = actorOf(new QueueReader)
- supervisor.link(actor)
- actor.start()
- actor
- }
-
- def stop() = dispatcher.stopAllAttachedActors
- }
-
- class PersisterManager {
- lazy val lb = loadBalancerActor(new CyclicIterator(actors))
-
- val dispatcher =
- Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "-persister")
- .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(1000)
- .setMaxPoolSize(2 * numCPUs)
- .setCorePoolSize(persisterThreads)
- .setKeepAliveTimeInMillis(60000)
- .setRejectionPolicy(new CallerRunsPolicy).build
-
- lazy val actors =
- for(i <- 0 until numPersisterActors) yield {
- val actor = actorOf(new Persister)
- supervisor.link(actor)
- actor.start()
- actor
- }
-
- def stop() = dispatcher.stopAllAttachedActors
- }
-
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.service
-
-
-import gr.grnet.aquarium.actor.RouterRole
-import gr.grnet.aquarium.store.LocalFSEventStore
-import gr.grnet.aquarium.util.date.TimeHelpers
-import gr.grnet.aquarium.util.makeString
-import com.ckkloverdos.maybe._
-import gr.grnet.aquarium.actor.message.event.ProcessIMEvent
-import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
-import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService
-
-/**
- * An event processor service for user events coming from the IM system
- *
- * @author Georgios Gousios <gousiosg@gmail.com>
- */
-class IMEventProcessorService extends EventProcessorService[IMEventModel] {
-
- override def parseJsonBytes(data: Array[Byte]) = {
- StdIMEvent.fromJsonBytes(data)
- }
-
- override def forward(event: IMEventModel) = {
- if(event ne null) {
- _configurator.actorProvider.actorForRole(RouterRole) ! ProcessIMEvent(event)
- }
- }
-
- override def existsInStore(event: IMEventModel) =
- _configurator.imEventStore.findIMEventById(event.id).isDefined
-
- override def storeParsedEvent(event: IMEventModel, initialPayload: Array[Byte]) = {
- // 1. Store to local FS for debugging purposes.
- // BUT be resilient to errors, since this is not critical
- if(_configurator.eventsStoreFolder.isJust) {
- Maybe {
- LocalFSEventStore.storeIMEvent(_configurator, event, initialPayload)
- }
- }
-
- // 2. Store to DB
- _configurator.imEventStore.insertIMEvent(event)
- }
-
- protected def storeUnparsedEvent(initialPayload: Array[Byte], exception: Throwable): Unit = {
- val json = makeString(initialPayload)
-
- LocalFSEventStore.storeUnparsedIMEvent(_configurator, initialPayload, exception)
- }
-
- override def queueReaderThreads: Int = 1
-
- override def persisterThreads: Int = numCPUs
-
- protected def numQueueActors = 2 * queueReaderThreads
-
- protected def numPersisterActors = 2 * persisterThreads
-
- override def name = "usrevtproc"
-
- lazy val persister = new PersisterManager
- lazy val queueReader = new QueueReaderManager
-
- override def persisterManager = persister
-
- override def queueReaderManager = queueReader
-
- def start() {
- logStarting()
- val (ms0, ms1, _) = TimeHelpers.timed {
- declareQueues(RabbitMQService.RabbitMQConfKeys.amqp_imevents_queues)
- }
- logStarted(ms0, ms1)
- }
-
- def stop() {
- logStopping()
- val (ms0, ms1, _) = TimeHelpers.timed {
- queueReaderManager.stop()
- persisterManager.stop()
- }
- logStopped(ms0, ms1)
- }
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.service
-
-import gr.grnet.aquarium.actor.RouterRole
-import gr.grnet.aquarium.store.LocalFSEventStore
-import com.ckkloverdos.maybe.Maybe
-import gr.grnet.aquarium.util.date.TimeHelpers
-import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
-import gr.grnet.aquarium.actor.message.event.ProcessResourceEvent
-import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.{RabbitMQConfKeys ⇒ Keys}
-
-/**
- * An event processor service for resource events
- *
- * @author Georgios Gousios <gousiosg@gmail.com>
- */
-final class ResourceEventProcessorService extends EventProcessorService[ResourceEventModel] {
-
- override def parseJsonBytes(data: Array[Byte]) = {
- StdResourceEvent.fromJsonBytes(data)
- }
-
- override def forward(event: ResourceEventModel): Unit = {
- if(event ne null) {
- val businessLogicDispacther = _configurator.actorProvider.actorForRole(RouterRole)
- businessLogicDispacther ! ProcessResourceEvent(event)
- }
- }
-
- override def existsInStore(event: ResourceEventModel): Boolean =
- _configurator.resourceEventStore.findResourceEventById(event.id).isDefined
-
- override def storeParsedEvent(event: ResourceEventModel, initialPayload: Array[Byte]): Unit = {
- // 1. Store to local FS for debugging purposes.
- // BUT be resilient to errors, since this is not critical
- if(_configurator.eventsStoreFolder.isJust) {
- Maybe {
- LocalFSEventStore.storeResourceEvent(_configurator, event, initialPayload)
- }
- }
-
- // 2. Store to DB
- _configurator.resourceEventStore.insertResourceEvent(event)
- }
-
-
- protected def storeUnparsedEvent(initialPayload: Array[Byte], exception: Throwable): Unit = {
- // TODO: Also save to DB, just like we do for UserEvents
- LocalFSEventStore.storeUnparsedResourceEvent(_configurator, initialPayload, exception)
- }
-
- override def queueReaderThreads: Int = 1
-
- override def persisterThreads: Int = numCPUs + 4
-
- override def numQueueActors: Int = 1 * queueReaderThreads
-
- override def numPersisterActors: Int = 2 * persisterThreads
-
- override def name = "resevtproc"
-
- lazy val persister = new PersisterManager
- lazy val queueReader = new QueueReaderManager
-
- override def persisterManager = persister
-
- override def queueReaderManager = queueReader
-
- def start() {
- logStarting()
- val (ms0, ms1, _) = TimeHelpers.timed {
- declareQueues(Keys.amqp_rcevents_queues)
- }
- logStarted(ms0, ms1)
- }
-
- def stop() {
- logStopping()
- val (ms0, ms1, _) = TimeHelpers.timed {
- queueReaderManager.stop()
- persisterManager.stop()
- }
- logStopped(ms0, ms1)
- }
-}
\ No newline at end of file
def configure(props: Props): Unit = {
this._props = props
- logger.debug("Configured with props")
}
private[this] def __doStart(): Unit = {
}
object SimpleLocalRoleableActorProviderService {
- // Always set Dispatcher at the end.
+ // Always set Router at the end.
// We could definitely use some automatic dependency sorting here (topological sorting anyone?)
final val RolesToBeStarted = List(
// ResourceProcessorRole,
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.util
-
-import akka.amqp._
-import util.Random
-import scopt.OptionParser
-import gr.grnet.aquarium.messaging.AkkaAMQP
-import java.lang.StringBuffer
-import gr.grnet.aquarium.logic.accounting.Policy
-import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
-import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
-
-/**
- * Generates random resource events to use as input for testing and
- * injects them to the specified exchange.
- *
- * @author Georgios Gousios <gousiosg@gmail.com>
- */
-trait RandomEventGenerator extends AkkaAMQP {
-
- val userIds = 1 to 1000
- val clientIds = 1 to 4
- val vmIds = 1 to 4000
- val resources = Policy.policy.resources.map{r => r.name}
- val tsFrom = 1293840000000L //1/1/2011 0:00:00 GMT
- val tsTo = 1325376000000L //1/1/2012 0:00:00 GMT
- val eventVersion = 1 to 4
-
- private val seed = 0xdeadbeef
- private lazy val rnd = new Random(seed)
-
- /**
- * Generate a random resource event
- */
- def nextUserEvent(): IMEventModel = {
- val ts = tsFrom + (scala.math.random * ((tsTo - tsFrom) + 1)).asInstanceOf[Long]
-
- new StdIMEvent(
- id = CryptoUtils.sha1(genRndAsciiString(35)),
- occurredMillis = ts.toLong,
- receivedMillis = ts.toLong,
- userID = userIds(rnd.nextInt(100)).toString,
- clientID = "defclient",
- isActive = rnd.nextBoolean,
- role = Array("PROF", "STUDENT", "ADMIN").apply(rnd.nextInt(3)),
- eventVersion = 1.toString,
- eventType = Array("ACTIVE", "SUSPENDED").apply(rnd.nextInt(2)),
- details = Map()
- )
- }
-
- /**
- * Generate a random resource event
- */
- def genPublishUserEvents(num: Int) = {
- val publisher = producer(im_exchanges(0))
-
- (1 to num).foreach {
- n =>
- var event = nextUserEvent()
- publisher ! Message(event.toJsonString.getBytes, "astakos.user")
- }
- }
-
- /**
- * Generete and publish create events for test users
- */
- def initUsers(num: Int) = {
- val publisher = producer(im_exchanges(0))
-
- userIds.filter(_ < num).foreach {
- i =>
- val ts = tsFrom + (scala.math.random * ((tsTo - tsFrom) + 1)).asInstanceOf[Long]
- val user = new StdIMEvent(
- id = CryptoUtils.sha1(genRndAsciiString(35)),
- occurredMillis = ts.toLong,
- receivedMillis = ts.toLong,
- userID = i.toString,
- clientID = "defclient",
- isActive = rnd.nextBoolean,
- role = Array("PROF", "STUDENT", "ADMIN").apply(rnd.nextInt(3)),
- eventVersion = 1.toString,
- eventType = "CREATE",
- details = Map()
- )
- publisher ! Message(user.toJsonString.getBytes, "astakos.user")
- }
- }
-
- /**
- * Get the next random resource event
- */
- def nextResourceEvent() : ResourceEventModel = {
- val res = rnd.shuffle(resources).head
-
- val extra = res match {
- case "vmtime" => Map("vmid" -> rnd.nextInt(vmIds.max).toString)
- case _ => Map[String, String]()
- }
-
- val value = res match {
- case "vmtime" => rnd.nextInt(1)
- case _ => rnd.nextInt(5000)
- }
-
- val ts = tsFrom + (scala.math.random * ((tsTo - tsFrom) + 1)).asInstanceOf[Long]
- val str = genRndAsciiString(35)
-
- new StdResourceEvent(
- CryptoUtils.sha1(str),
- ts, ts,
- rnd.nextInt(userIds.max).toString,
- rnd.nextInt(clientIds.max).toString,
- res, "1", value, 1.toString, extra)
- }
-
- def genRndAsciiString(size: Int): String = {
- (1 to size).map{
- i => rnd.nextPrintableChar()
- }.foldLeft(new StringBuffer()){
- (a, b) => a.append(b)
- }.toString
- }
-
- /**
- * Generate resource events and publish them to the queue
- */
- def genPublishResEvents(num: Int) = {
-
- assert(num > 0)
- val publisher = producer(resevent_exchanges(0))
-
- (1 to num).foreach {
- n =>
- var event = nextResourceEvent
- publisher ! Message(event.toBytes,
- "%s.%s.%s".format("",event.clientID, event.resource))
- }
- }
-}
-
-object RandomEventGen extends RandomEventGenerator {
-
- case class Config(var i: Boolean = false,
- var u: Boolean = false,
- var r: Boolean = false,
- var nummsg: Int = 100)
-
- val config = new Config
-
- private val parser = new OptionParser("RandomEventGen") {
- opt("i", "im-events", "Generate IM events", {config.i = true})
- opt("u", "user-create", "Generate IM events that create users", {config.u = true})
- opt("r", "resource-events", "Generate resource events", {config.r = true})
- arg("nummsgs", "Number of msgs to generate", {num: String => config.nummsg = Integer.parseInt(num)})
- }
-
- def main(args: Array[String]): Unit = {
-
- if (!parser.parse(args))
- errexit
-
- if (!config.i && !config.u && !config.r) {
- println("One of -i, -u, -r must be specified")
- errexit
- }
-
- println("Publishing %d msgs, hit Ctrl+c to stop".format(config.nummsg))
- if (config.r) genPublishResEvents(config.nummsg)
- if (config.u) initUsers(config.nummsg)
- if (config.i) genPublishUserEvents(config.nummsg)
- }
-
- private def errexit() = {
- print(parser.usage)
- System.exit(-1)
- }
-}
*/
object ReflectHelpers {
- def setField[C <: AnyRef, A : Manifest](container: C, fieldName: String, value: A): Unit = {
+ def setField[C <: AnyRef, A : Manifest](container: C,
+ fieldName: String,
+ value: A,
+ synchronizeOnContainer: Boolean = true): Unit = {
+ require(container ne null, "container is null")
+ require(fieldName ne null, "fieldName is null")
+ require(fieldName.length > 0, "fieldName is empty")
+
val field = container.getClass.getDeclaredField(fieldName)
field.setAccessible(true)
- manifest[A] match {
- case Manifest.Byte ⇒ field.setBoolean(container, value.asInstanceOf[Boolean])
- case Manifest.Int ⇒ field.setInt(container, value.asInstanceOf[Int])
- case Manifest.Long ⇒ field.setLong(container, value.asInstanceOf[Long])
- case _ ⇒ field.set(container, value)
+ def doSet(): Unit = {
+ manifest[A] match {
+ case Manifest.Byte ⇒ field.setBoolean(container, value.asInstanceOf[Boolean])
+ case Manifest.Int ⇒ field.setInt(container, value.asInstanceOf[Int])
+ case Manifest.Long ⇒ field.setLong(container, value.asInstanceOf[Long])
+ case _ ⇒ field.set(container, value)
+ }
+ }
+
+ if(synchronizeOnContainer) {
+ container synchronized doSet()
+ } else {
+ doSet()
}
}
}
}
}
+ /**
+ * Runs function `f` on `c` and finally calls `c.close()` regardless of any exception.
+ */
def withCloseable[C <: { def close(): Unit}, A](c: C)(f: C => A): A = {
try {
f(c)
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.messaging
-
-import gr.grnet.aquarium.util.RandomEventGenerator
-import org.junit.Test
-import akka.actor.Actor
-import java.util.concurrent.{TimeUnit, CountDownLatch}
-import akka.amqp._
-import akka.config.Supervision.Permanent
-import java.util.concurrent.atomic.AtomicInteger
-import org.junit.Assume._
-import gr.grnet.aquarium.{AquariumException, LogicTestsAssumptions}
-
-/**
- *
- *
- * @author Georgios Gousios <gousiosg@gmail.com>
- */
-
-class AkkaAMQPTest extends RandomEventGenerator {
-
- @Test
- def testSendReceive() : Unit = {
-
- assumeTrue(LogicTestsAssumptions.EnableRabbitMQTests)
-
- val numMsg = 100
- val msgs = new AtomicInteger(0)
-
- val publisher = producer(aquarium_exchnage)
-
- class Consumer extends Actor {
-
- self.lifeCycle = Permanent
-
- def receive = {
- case Delivery(payload, routingKey, deliveryTag, isRedeliver, properties, sender) =>
- println(this + " Got message: %s (%d)".format(new String(payload), deliveryTag) +
- (if(isRedeliver) " - redelivered (%d)".format(deliveryTag) else ""))
- if (msgs.incrementAndGet() == 15) throw new AquariumException("Messed up")
- if (msgs.incrementAndGet() == 55) sender ! Reject(deliveryTag, true)
- sender ! Acknowledge(deliveryTag)
- case Acknowledged(deliveryTag) => println("Acked: " + deliveryTag)
- case _ => println("Unknown delivery")
- }
-
- override def preRestart(reason: Throwable) {
- println("Actor restarted: " + reason)
- }
-
- override def postRestart(reason: Throwable) {
- // reinit stable state after restart
- }
- }
-
- consumer("foo.#", "aquarium-rabbitmq-test",
- aquarium_exchnage, Actor.actorOf(new Consumer), false)
- Thread.sleep(2000)
-
- (1 to numMsg).foreach{
- i => publisher ! new Message(i.toString.getBytes(), "foo.bar")
- }
-
- Thread.sleep(5000)
-
- //AMQP.shutdownAll
- Actor.registry.shutdownAll
- }
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright 2011-2012 GRNET S.A. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or
- * without modification, are permitted provided that the following
- * conditions are met:
- *
- * 1. Redistributions of source code must retain the above
- * copyright notice, this list of conditions and the following
- * disclaimer.
- *
- * 2. Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials
- * provided with the distribution.
- *
- * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
- * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
- * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
- * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
- * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
- * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
- * POSSIBILITY OF SUCH DAMAGE.
- *
- * The views and conclusions contained in the software and
- * documentation are those of the authors and should not be
- * interpreted as representing official policies, either expressed
- * or implied, of GRNET S.A.
- */
-
-package gr.grnet.aquarium.store.mongodb
-
-import org.junit.Assert._
-import org.junit.Assume._
-import gr.grnet.aquarium.Configurator._
-import gr.grnet.aquarium.util.{RandomEventGenerator, TestMethods}
-import collection.mutable.ArrayBuffer
-import org.junit.{After, Test}
-import gr.grnet.aquarium.{StoreConfigurator, LogicTestsAssumptions}
-import gr.grnet.aquarium.store.memory.MemStore
-import gr.grnet.aquarium.event.model.resource.ResourceEventModel
-
-/**
- * @author Georgios Gousios <gousiosg@gmail.com>
- */
-class EventStoreTest extends TestMethods
-with RandomEventGenerator with StoreConfigurator {
-
- @Test
- def testStoreEvent() = {
- assumeTrue(LogicTestsAssumptions.EnableStoreTests)
-
- val event = nextResourceEvent()
- val store = config.resourceEventStore
- val result = store.insertResourceEvent(event)
- }
-
- @Test
- def testFindEventById(): Unit = {
- assumeTrue(LogicTestsAssumptions.EnableStoreTests)
-
- val event = nextResourceEvent()
- val store = config.resourceEventStore
-
- store.insertResourceEvent(event)
-
- store.findResourceEventById(event.id)
- }
-
- @Test
- def testfindEventsByUserId(): Unit = {
- assumeTrue(LogicTestsAssumptions.EnableStoreTests)
- val events = new ArrayBuffer[ResourceEventModel]()
- val store = config.resourceEventStore
-
- (1 to 100).foreach {
- n =>
- val e = nextResourceEvent
- events += e
- store.insertResourceEvent(e)
- }
-
- val mostUsedId = events
- .map{x => x.userID}
- .groupBy(identity)
- .mapValues(_.size)
- .foldLeft(("",0))((acc, kv) => if (kv._2 > acc._2) kv else acc)._1
-
- val result = store.findResourceEventsByUserId(mostUsedId)(None)
- assertEquals(events.filter(p => p.userID.equals(mostUsedId)).size, result.size)
- }
-
- @Test
- def testMultipleMongos = {
- assumeTrue(LogicTestsAssumptions.EnableStoreTests)
- val a = getMongo
- val b = getMongo
- //assertEquals(a.Connection.mongo.get.hashCode(), b.Connection.mongo.get.hashCode())
- }
-
- @After
- def after: Unit = {
- if (isInMemStore) return
-
- val a = getMongo
-
- val col = a.mongo.getDB(
- MasterConfigurator.get(MongoDBStoreProvider.MongoDBKeys.dbschema)
- ).getCollection(MongoDBStore.RESOURCE_EVENTS_COLLECTION)
-
- //val res = col.find
- //while (res.hasNext)
- // col.remove(res.next)
- }
-
- private lazy val config = configurator
- private val isInMemStore = config.resourceEventStore.isInstanceOf[MemStore]
- private def getMongo = config.resourceEventStore.asInstanceOf[MongoDBStore]
-}
\ No newline at end of file