2 * Copyright 2011 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.processor.actor
38 import gr.grnet.aquarium.messaging.AkkaAMQP
39 import gr.grnet.aquarium.logic.events.ResourceEvent
40 import akka.amqp.{Reject, Acknowledge, Acknowledged, Delivery}
41 import com.ckkloverdos.maybe.{NoVal, Failed, Just}
42 import gr.grnet.aquarium.{MasterConf}
43 import gr.grnet.aquarium.util.{Lifecycle, Loggable}
46 import akka.actor.Actor._
47 import akka.routing.CyclicIterator
48 import akka.routing.Routing._
49 import akka.dispatch.Dispatchers
50 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
51 import akka.config.Supervision.SupervisorConfig
52 import akka.config.Supervision.OneForOneStrategy
53 import java.util.concurrent.ConcurrentSkipListSet
56 * An actor that gets events from the queue, stores them persistently
57 * and forwards them for further processing.
59 * @author Georgios Gousios <gousiosg@gmail.com>
61 class ResourceEventProcessorService extends AkkaAMQP with Loggable
64 case class AckData(msgId: String, deliveryTag: Long, queue: ActorRef)
65 case class Persist(event: ResourceEvent, sender: ActorRef, ackData: AckData)
66 case class PersistOK(ackData: AckData)
67 case class PersistFailed(ackData: AckData)
68 case class Duplicate(ackData: AckData)
70 val redeliveries = new ConcurrentSkipListSet[String]()
72 class QueueReader extends Actor {
75 case Delivery(payload, _, deliveryTag, isRedeliver, _, queue) =>
76 val event = ResourceEvent.fromBytes(payload)
78 //Message could not be processed 3 times, just
79 if (redeliveries.contains(event.id)) {
80 logger.warn("Event %s redelivered >2 times. Ignoring".format(event))
81 queue ! Reject(deliveryTag, false)
82 redeliveries.remove(event.id)
84 //Redeliver, but keep track of the message
85 redeliveries.add(event.id)
86 PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
89 PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
91 case PersistOK(ackData) =>
92 logger.debug("Stored event:%s".format(ackData.msgId))
93 ackData.queue ! Acknowledge(ackData.deliveryTag)
95 case PersistFailed(ackData) =>
96 //Give the message a chance to be processed by other processors
97 logger.debug("Storing event:%s failed".format(ackData.msgId))
98 ackData.queue ! Reject(ackData.deliveryTag, true)
100 case Duplicate(ackData) =>
101 logger.debug("Event:%s is duplicate".format(ackData.msgId))
102 ackData.queue ! Reject(ackData.deliveryTag, false)
104 case Acknowledged(deliveryTag) =>
105 //Forward to the dispatcher
107 case _ => logger.warn("Unknown message")
110 self.dispatcher = QueueReaderManager.dispatcher
113 class Persister extends Actor {
116 case Persist(event, sender, ackData) =>
118 sender ! Duplicate(ackData)
119 else if (persist(event))
120 sender ! PersistOK(ackData)
122 sender ! PersistFailed(ackData)
123 case _ => logger.warn("Unknown message")
126 def exists(event: ResourceEvent): Boolean =
127 !MasterConf.MasterConf.eventStore.findEventById(event.id).isEmpty
129 def persist(event: ResourceEvent): Boolean = {
130 MasterConf.MasterConf.eventStore.storeEvent(event) match {
133 logger.error("Could not save event: %s".format(event))
139 self.dispatcher = PersisterManager.dispatcher
142 lazy val supervisor = Supervisor(SupervisorConfig(
144 List(classOf[Exception]), //What exceptions will be handled
145 3, // maximum number of restart retries
146 5000 // within time in millis
151 object QueueReaderManager {
152 val numCPUs = Runtime.getRuntime.availableProcessors
153 var actors: List[ActorRef] = _
154 lazy val lb = loadBalancerActor(new CyclicIterator(actors))
156 lazy val dispatcher =
157 Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("QueueReaderDispatcher")
158 .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
159 .setMaxPoolSize(numCPUs)
161 .setKeepAliveTimeInMillis(60000)
162 .setRejectionPolicy(new CallerRunsPolicy).build
166 for (i <- 0 until numCPUs) yield {
167 val actor = actorOf(new QueueReader)
168 supervisor.link(actor)
175 def stop() = dispatcher.stopAllAttachedActors
178 object PersisterManager {
179 val numCPUs = Runtime.getRuntime.availableProcessors
180 var actors: List[ActorRef] = _
182 lazy val lb = loadBalancerActor(new CyclicIterator(actors))
185 Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("PersisterDispatcher")
186 .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
187 .setMaxPoolSize(numCPUs)
189 .setKeepAliveTimeInMillis(60000)
190 .setRejectionPolicy(new CallerRunsPolicy).build
194 for (i <- 0 until numCPUs) yield {
195 val actor = actorOf(new Persister)
196 supervisor.link(actor)
203 def stop() = dispatcher.stopAllAttachedActors
207 logger.info("Starting resource event processor service")
209 QueueReaderManager.start()
210 PersisterManager.start()
211 consumer("event.#", "resource-events", "aquarium", QueueReaderManager.lb, false)
215 QueueReaderManager.stop()
216 PersisterManager.stop()
218 logger.info("Stopping resource event processor service")