The bus service uses an executor instead of actors
authorChristos KK Loverdos <loverdos@gmail.com>
Wed, 23 May 2012 11:12:55 +0000 (14:12 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Wed, 23 May 2012 11:12:55 +0000 (14:12 +0300)
src/main/scala/gr/grnet/aquarium/service/EventBusService.scala
src/main/scala/gr/grnet/aquarium/util/DaemonThreadFactory.scala [new file with mode: 0644]

index ba726a6..adab928 100644 (file)
@@ -37,12 +37,11 @@ package gr.grnet.aquarium.service
 
 import gr.grnet.aquarium.Configurable
 import com.ckkloverdos.props.Props
-import gr.grnet.aquarium.util.date.TimeHelpers
 import gr.grnet.aquarium.service.event.BusEvent
-import com.google.common.eventbus.{DeadEvent, Subscribe, EventBus}
-import akka.actor.{ActorRef, Actor}
-import gr.grnet.aquarium.util.{Lifecycle, Loggable}
-import gr.grnet.aquarium.util.safeUnit
+import com.google.common.eventbus.{AsyncEventBus, DeadEvent, Subscribe}
+import gr.grnet.aquarium.util.{DaemonThreadFactory, Lifecycle, Loggable}
+import java.util.concurrent.{ConcurrentHashMap, Executors}
+import java.util.Collections
 
 
 /**
@@ -52,8 +51,12 @@ import gr.grnet.aquarium.util.safeUnit
  */
 
 class EventBusService extends Loggable with Lifecycle with Configurable {
-  private[this] val theBus = new EventBus(classOf[EventBusService].getName)
-  private[this] var _poster: ActorRef = null
+  private[this] val asyncBus = new AsyncEventBus(
+    classOf[EventBusService].getName,
+    Executors.newFixedThreadPool(1, new DaemonThreadFactory)
+  )
+
+  private[this] val subscribers = Collections.newSetFromMap[AnyRef](new ConcurrentHashMap())
 
   def propertyPrefix = None
 
@@ -67,12 +70,14 @@ class EventBusService extends Loggable with Lifecycle with Configurable {
 
   def start() = {
     this addSubsciber this // Wow!
-
-    this._poster = Actor.actorOf(AsyncPoster).start()
   }
 
-  def stop() = {
-    safeUnit(_poster.stop())
+  def stop() = synchronized {
+    val iterator = subscribers.iterator()
+    while(iterator.hasNext) {
+      asyncBus.unregister(iterator.next())
+    }
+    subscribers.clear()
   }
 
   @inline
@@ -81,11 +86,17 @@ class EventBusService extends Loggable with Lifecycle with Configurable {
   }
 
   def ![A <: BusEvent](event: A): Unit = {
-    _poster ! event
+    asyncBus.post(event)
+  }
+
+  def removeSubsciber[A <: AnyRef](subscriber: A): Unit = synchronized {
+    subscribers.remove(subscriber)
+    asyncBus.unregister(subscriber)
   }
 
-  def addSubsciber[A <: AnyRef](subscriber: A): Unit = {
-    theBus register subscriber
+  def addSubsciber[A <: AnyRef](subscriber: A): Unit = synchronized {
+    subscribers.add(subscriber)
+    asyncBus.register(subscriber)
   }
 
   @Subscribe
@@ -93,15 +104,4 @@ class EventBusService extends Loggable with Lifecycle with Configurable {
     event.getSource
     logger.warn("DeadEvent %s".format(event.getEvent))
   }
-
-  /**
-   * Actor that takes care of asynchronously posting to the underlying event bus
-   */
-  object AsyncPoster extends Actor {
-    protected def receive = {
-      case event: AnyRef ⇒
-        try theBus post event
-        catch { case _ ⇒ }
-    }
-  }
 }
diff --git a/src/main/scala/gr/grnet/aquarium/util/DaemonThreadFactory.scala b/src/main/scala/gr/grnet/aquarium/util/DaemonThreadFactory.scala
new file mode 100644 (file)
index 0000000..b5f6524
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.util
+
+import java.util.concurrent.ThreadFactory
+
+/**
+ * A [[java.util.concurrent.ThreadFactory]] that creates daemon threads.
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+
+class DaemonThreadFactory extends ThreadFactory {
+  def newThread(r: Runnable) = {
+    val thread = new Thread(r)
+    thread.setDaemon(true)
+    thread
+  }
+}