2 * Copyright 2011 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.processor.actor
38 import gr.grnet.aquarium.logic.events.ResourceEvent
39 import com.ckkloverdos.maybe.{NoVal, Failed, Just}
40 import gr.grnet.aquarium.{Configurator}
41 import gr.grnet.aquarium.util.{Lifecycle, Loggable}
44 import akka.actor.Actor._
45 import akka.routing.CyclicIterator
46 import akka.routing.Routing._
47 import akka.dispatch.Dispatchers
48 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
49 import akka.config.Supervision.SupervisorConfig
50 import akka.config.Supervision.OneForOneStrategy
51 import java.util.concurrent.ConcurrentSkipListSet
52 import gr.grnet.aquarium.messaging.{MessagingNames, AkkaAMQP}
54 import gr.grnet.aquarium.actor.DispatcherRole
57 * An actor that gets events from the queue, stores them persistently
58 * and forwards them for further processing.
60 * @author Georgios Gousios <gousiosg@gmail.com>
62 class ResourceEventProcessorService extends AkkaAMQP with Loggable
65 case class AckData(msgId: String, deliveryTag: Long, queue: ActorRef)
66 case class Persist(event: ResourceEvent, sender: ActorRef, ackData: AckData)
67 case class PersistOK(ackData: AckData)
68 case class PersistFailed(ackData: AckData)
69 case class Duplicate(ackData: AckData)
71 val redeliveries = new ConcurrentSkipListSet[String]()
73 private[this] def _configurator: Configurator = Configurator.MasterConfigurator
74 private[this] def _calcStateChanges(resourceEvent: ResourceEvent): Unit = {
75 val businessLogicDispacther = _configurator.actorProvider.actorForRole(DispatcherRole)
76 businessLogicDispacther ! ProcessResourceEvent(resourceEvent) // all state change, credit calc etc will happen there
79 class QueueReader extends Actor {
82 case Delivery(payload, _, deliveryTag, isRedeliver, _, queue) =>
83 val event = ResourceEvent.fromBytes(payload)
85 //Message could not be processed 3 times, just
86 if (redeliveries.contains(event.id)) {
87 logger.warn("Event[%s] msg[%d] redelivered >2 times. Rejecting".format(event, deliveryTag))
88 queue ! Reject(deliveryTag, false)
89 redeliveries.remove(event.id)
91 //Redeliver, but keep track of the message
92 redeliveries.add(event.id)
93 PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
96 val eventWithReceivedMillis = event.copy(receivedMillis = System.currentTimeMillis())
97 PersisterManager.lb ! Persist(eventWithReceivedMillis, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
100 case PersistOK(ackData) =>
101 logger.debug("Stored event[%s] msg[%d]".format(ackData.msgId, ackData.deliveryTag))
102 ackData.queue ! Acknowledge(ackData.deliveryTag)
104 case PersistFailed(ackData) =>
105 //Give the message a chance to be processed by other processors
106 logger.debug("Storing event[%s] msg[%d] failed".format(ackData.msgId, ackData.deliveryTag))
107 ackData.queue ! Reject(ackData.deliveryTag, true)
109 case Duplicate(ackData) =>
110 logger.debug("Event[%s] msg[%d] is duplicate".format(ackData.msgId, ackData.deliveryTag))
111 ackData.queue ! Reject(ackData.deliveryTag, false)
113 case Acknowledged(deliveryTag) =>
114 logger.debug("Msg with tag [%d] acked".format(deliveryTag))
115 //TODO: Forward to the dispatcher
117 case Rejected(deliveryTag) =>
118 logger.debug("Msg with tag [%d] rejected".format(deliveryTag))
120 case _ => logger.warn("Unknown message")
123 self.dispatcher = QueueReaderManager.dispatcher
126 class Persister extends Actor {
129 case Persist(event, sender, ackData) =>
131 sender ! Duplicate(ackData)
132 else if (persist(event)) {
133 sender ! PersistOK(ackData)
134 // TODO: Move to some proper place (after ACK?)
135 _calcStateChanges(event)
137 sender ! PersistFailed(ackData)
138 case _ => logger.warn("Unknown message")
141 def exists(event: ResourceEvent): Boolean =
142 _configurator.resourceEventStore.findResourceEventById(event.id).isJust
144 def persist(event: ResourceEvent): Boolean = {
145 _configurator.resourceEventStore.storeResourceEvent(event) match {
148 logger.error("Could not save event: %s".format(event))
154 self.dispatcher = PersisterManager.dispatcher
157 lazy val supervisor = Supervisor(SupervisorConfig(
159 List(classOf[Exception]), //What exceptions will be handled
160 3, // maximum number of restart retries
161 5000 // within time in millis
166 object QueueReaderManager {
167 val numCPUs = Runtime.getRuntime.availableProcessors
168 var actors: List[ActorRef] = _
169 lazy val lb = loadBalancerActor(new CyclicIterator(actors))
171 lazy val dispatcher =
172 Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("QueueReaderDispatcher")
173 .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
174 .setMaxPoolSize(numCPUs)
176 .setKeepAliveTimeInMillis(60000)
177 .setRejectionPolicy(new CallerRunsPolicy).build
181 for (i <- 0 until numCPUs) yield {
182 val actor = actorOf(new QueueReader)
183 supervisor.link(actor)
190 def stop() = dispatcher.stopAllAttachedActors
193 object PersisterManager {
194 val numCPUs = Runtime.getRuntime.availableProcessors
195 var actors: List[ActorRef] = _
197 lazy val lb = loadBalancerActor(new CyclicIterator(actors))
200 Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("PersisterDispatcher")
201 .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
202 .setMaxPoolSize(numCPUs)
204 .setKeepAliveTimeInMillis(60000)
205 .setRejectionPolicy(new CallerRunsPolicy).build
209 for (i <- 0 until numCPUs) yield {
210 val actor = actorOf(new Persister)
211 supervisor.link(actor)
218 def stop() = dispatcher.stopAllAttachedActors
222 logger.info("Starting resource event processor service")
224 QueueReaderManager.start()
225 PersisterManager.start()
227 consumer("%s.#".format(MessagingNames.RES_EVENT_KEY),
228 MessagingNames.RESOURCE_EVENT_QUEUE, MessagingNames.AQUARIUM_EXCHANGE,
229 QueueReaderManager.lb, false)
233 QueueReaderManager.stop()
234 PersisterManager.stop()
236 logger.info("Stopping resource event processor service")