2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.service
38 import gr.grnet.aquarium.util.{Lifecycle, Loggable, makeString}
41 import akka.actor.Actor._
42 import akka.routing.CyclicIterator
43 import akka.routing.Routing._
44 import akka.dispatch.Dispatchers
45 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
46 import akka.config.Supervision.SupervisorConfig
47 import akka.config.Supervision.OneForOneStrategy
48 import gr.grnet.aquarium.messaging.AkkaAMQP
50 import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet}
51 import com.ckkloverdos.maybe._
52 import gr.grnet.aquarium.util.date.TimeHelpers
53 import gr.grnet.aquarium.{AquariumException, Configurator}
54 import gr.grnet.aquarium.store.RecordID
55 import gr.grnet.aquarium.event.{AquariumEventModel, AquariumEventSkeleton}
58 * An abstract service that retrieves Aquarium events from a queue,
59 * stores them persistently and forwards them for further processing.
60 * The processing happens among two load-balanced actor clusters
61 * asynchronously. The number of employed actors is always equal to
62 * the number of processors. The number of threads per load-balanced
63 * cluster is configurable by subclasses.
65 * @author Georgios Gousios <gousiosg@gmail.com>
67 abstract class EventProcessorService[E <: AquariumEventModel] extends AkkaAMQP with Loggable with Lifecycle {
69 /* Messages exchanged between the persister and the queuereader */
70 case class AckData(msgId: String, deliveryTag: Long, queue: ActorRef)
72 case class Persist(event: E, initialPayload: Array[Byte], sender: ActorRef, ackData: AckData)
74 case class PersistOK(ackData: AckData)
76 case class PersistFailed(ackData: AckData)
78 case class Duplicate(ackData: AckData)
81 * Short term storage for delivery tags to work around AMQP
82 * limitation with redelivering rejected messages to same host.
84 * FIXME: Grow unbounded???
86 private val redeliveries = new ConcurrentSkipListSet[String]()
89 * Temporarily keeps track of messages while being processed
91 * FIXME: Grow unbounded???
93 private val inFlightEvents = new ConcurrentHashMap[Long, E](200, 0.9F, 4)
95 /* Supervisor actor for each event processing operation */
96 private lazy val supervisor = Supervisor(SupervisorConfig(
98 List(classOf[Exception]), //What exceptions will be handled
99 5, // maximum number of restart retries
100 5000 // within time in millis
104 protected def _configurator: Configurator = Configurator.MasterConfigurator
106 protected def parseJsonBytes(data: Array[Byte]): E
108 protected def forward(event: E): Unit
110 protected def exists(event: E): Boolean
112 protected def persist(event: E, initialPayload: Array[Byte]): Unit
114 protected def persistUnparsed(initialPayload: Array[Byte], exception: Throwable): Unit
116 protected def queueReaderThreads: Int
118 protected def persisterThreads: Int
120 protected def numQueueActors: Int
122 protected def numPersisterActors: Int
124 protected def name: String
126 protected def persisterManager: PersisterManager
128 protected def queueReaderManager: QueueReaderManager
130 protected val numCPUs = Runtime.getRuntime.availableProcessors
136 protected def declareQueues(conf: String) = {
137 val decl = _configurator.get(conf)
138 decl.split(";").foreach {
143 throw new AquariumException("Queue declaration \"%s\" not correct".format(q))
148 logger.info("Declaring queue '%s' for exchange '%s' and key '%s'".format(qname, exchange, route))
149 consumer(route, qname, exchange, queueReaderManager.lb, false)
153 class QueueReader extends Actor {
156 case Delivery(payload, _, deliveryTag, isRedeliver, _, queue) =>
157 val eventM = MaybeEither {
158 parseJsonBytes(payload)
159 } // either decoded or error
162 inFlightEvents.put(deliveryTag, event)
165 //Message could not be processed 3 times, just ignore it
166 if(redeliveries.contains(event.id)) {
167 logger.warn("Actor[%s] - Event[%s] msg[%d] redelivered >2 times. Rejecting".format(self.getUuid(), event, deliveryTag))
168 queue ! Reject(deliveryTag, false)
169 redeliveries.remove(event.id)
170 inFlightEvents.remove(deliveryTag)
172 //Redeliver, but keep track of the message
173 redeliveries.add(event.id)
174 persisterManager.lb ! Persist(event, payload, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
177 val eventWithReceivedMillis = event.withReceivedMillis(TimeHelpers.nowMillis()).asInstanceOf[E]
178 persisterManager.lb ! Persist(eventWithReceivedMillis, payload, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
181 case failed@Failed(e) ⇒
182 logger.error("While decoding payload", e)
183 logger.error("Offensive payload = \n{}", makeString(payload))
185 // If we could not create an object from the incoming json, then we just store the message
186 // and then ignore it.
187 // TODO: Possibly the sending site should setup a queue to accept such erroneous messages?
189 persistUnparsed(payload, e)
192 logger.debug("Sending Acknowledge(deliveryTag) = {}", Acknowledge(deliveryTag))
193 queue ! Acknowledge(deliveryTag)
195 logger.error("Could not persist unparsed event", e)
196 logger.debug("Sending {}", Reject(deliveryTag, true))
197 queue ! Reject(deliveryTag, true)
201 case PersistOK(ackData) =>
202 logger.debug("Actor[%s] - Stored event[%s] msg[%d] - %d left".format(self.getUuid(), ackData.msgId, ackData.deliveryTag, inFlightEvents.size))
203 ackData.queue ! Acknowledge(ackData.deliveryTag)
205 case PersistFailed(ackData) =>
206 //Give the message a chance to be processed by other processors
207 logger.error("Actor[%s] - Storing event[%s] msg[%d] failed".format(self.getUuid(), ackData.msgId, ackData.deliveryTag))
208 inFlightEvents.remove(ackData.deliveryTag)
209 ackData.queue ! Reject(ackData.deliveryTag, true)
211 case Duplicate(ackData) =>
212 logger.debug("Actor[%s] - Event[%s] msg[%d] is duplicate".format(self.getUuid(), ackData.msgId, ackData.deliveryTag))
213 inFlightEvents.remove(ackData.deliveryTag)
214 ackData.queue ! Reject(ackData.deliveryTag, false)
216 case Acknowledged(deliveryTag) =>
217 logger.debug("Actor[%s] - Msg with tag [%d] acked. Forwarding...".format(self.getUuid(), deliveryTag))
218 forward(inFlightEvents.remove(deliveryTag))
220 case Rejected(deliveryTag) =>
221 logger.debug("Actor[%s] - Msg with tag [%d] rejected".format(self.getUuid(), deliveryTag))
223 case _ => logger.warn("Unknown message")
226 override def preStart = {
227 logger.debug("Starting actor QueueReader-%s".format(self.getUuid()))
231 self.dispatcher = queueReaderManager.dispatcher
234 class Persister extends Actor with Loggable {
237 case Persist(event, initialPayload, sender, ackData) ⇒
239 sender ! Duplicate(ackData)
241 persist(event, initialPayload)
243 sender ! PersistOK(ackData)
244 } forFailed { case Failed(e) ⇒
245 sender ! PersistFailed(ackData)
246 logger.error("While persisting event", e)
250 override def preStart = {
252 logStarted(TimeHelpers.nowMillis(), TimeHelpers.nowMillis())
255 self.dispatcher = persisterManager.dispatcher
258 class QueueReaderManager {
259 lazy val lb = loadBalancerActor(new CyclicIterator(actors))
261 lazy val dispatcher =
262 Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "-queuereader")
263 .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(1000)
264 .setMaxPoolSize(2 * numCPUs)
265 .setCorePoolSize(queueReaderThreads)
266 .setKeepAliveTimeInMillis(60000)
267 .setRejectionPolicy(new CallerRunsPolicy).build
270 for(i <- 0 until numQueueActors) yield {
271 val actor = actorOf(new QueueReader)
272 supervisor.link(actor)
277 def stop() = dispatcher.stopAllAttachedActors
280 class PersisterManager {
281 lazy val lb = loadBalancerActor(new CyclicIterator(actors))
284 Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "-persister")
285 .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(1000)
286 .setMaxPoolSize(2 * numCPUs)
287 .setCorePoolSize(persisterThreads)
288 .setKeepAliveTimeInMillis(60000)
289 .setRejectionPolicy(new CallerRunsPolicy).build
292 for(i <- 0 until numPersisterActors) yield {
293 val actor = actorOf(new Persister)
294 supervisor.link(actor)
299 def stop() = dispatcher.stopAllAttachedActors