61d794f7a2ffa2b8faa44071ae6c9a39e98cf41e
[aquarium] / src / main / scala / gr / grnet / aquarium / service / EventProcessorService.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.service
37
38 import gr.grnet.aquarium.util.{Lifecycle, Loggable}
39
40 import akka.actor._
41 import akka.actor.Actor._
42 import akka.routing.CyclicIterator
43 import akka.routing.Routing._
44 import akka.dispatch.Dispatchers
45 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
46 import akka.config.Supervision.SupervisorConfig
47 import akka.config.Supervision.OneForOneStrategy
48 import gr.grnet.aquarium.messaging.AkkaAMQP
49 import akka.amqp._
50 import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet}
51 import com.ckkloverdos.maybe._
52 import gr.grnet.aquarium.util.date.TimeHelpers
53 import gr.grnet.aquarium.{AquariumException, Configurator}
54 import gr.grnet.aquarium.event.AquariumEventModel
55
56 /**
57  * An abstract service that retrieves Aquarium events from a queue,
58  * stores them persistently and forwards them for further processing.
59  * The processing happens among two load-balanced actor clusters
60  * asynchronously. The number of employed actors is always equal to
61  * the number of processors. The number of threads per load-balanced
62  * cluster is configurable by subclasses.
63  *
64  * @author Georgios Gousios <gousiosg@gmail.com>
65  */
66 abstract class EventProcessorService[E <: AquariumEventModel] extends AkkaAMQP with Loggable with Lifecycle {
67
68   /* Messages exchanged between the persister and the queuereader */
69   case class AckData(msgId: String, deliveryTag: Long, queue: ActorRef)
70
71   case class Persist(event: E, initialPayload: Array[Byte], sender: ActorRef, ackData: AckData)
72
73   case class PersistOK(ackData: AckData)
74
75   case class PersistFailed(ackData: AckData)
76
77   case class Duplicate(ackData: AckData)
78
79   /**
80    * Short term storage for delivery tags to work around AMQP
81    * limitation with redelivering rejected messages to same host.
82    *
83    * FIXME: Grow unbounded???
84    */
85   private val redeliveries = new ConcurrentSkipListSet[String]()
86
87   /**
88    *  Temporarily keeps track of messages while being processed
89    *
90    *  FIXME: Grow unbounded???
91    */
92   private val inFlightEvents = new ConcurrentHashMap[Long, E](200, 0.9F, 4)
93
94   /* Supervisor actor for each event processing operation */
95   private lazy val supervisor = Supervisor(SupervisorConfig(
96     OneForOneStrategy(
97       List(classOf[Exception]), //What exceptions will be handled
98       5, // maximum number of restart retries
99       5000 // within time in millis
100     ), Nil
101   ))
102
103   protected def _configurator: Configurator = Configurator.MasterConfigurator
104
105   protected def parseJsonBytes(data: Array[Byte]): E
106
107   protected def forward(event: E): Unit
108
109   protected def existsInStore(event: E): Boolean
110
111   protected def storeParsedEvent(event: E, initialPayload: Array[Byte]): Unit
112
113   protected def storeUnparsedEvent(initialPayload: Array[Byte], exception: Throwable): Unit
114
115   protected def queueReaderThreads: Int
116
117   protected def persisterThreads: Int
118
119   protected def numQueueActors: Int
120
121   protected def numPersisterActors: Int
122
123   protected def name: String
124
125   protected def persisterManager: PersisterManager
126
127   protected def queueReaderManager: QueueReaderManager
128
129   protected val numCPUs = Runtime.getRuntime.availableProcessors
130
131   def start(): Unit
132
133   def stop(): Unit
134
135   protected def declareQueues(conf: String) = {
136     val decl = _configurator.get(conf)
137     decl.split(";").foreach {
138       q =>
139         val i = q.split(":")
140
141         if(i.size < 3)
142           throw new AquariumException("Queue declaration \"%s\" not correct".format(q))
143
144         val exchange = i(0)
145         val route = i(1)
146         val qname = i(2)
147         logger.info("Declaring queue '%s' for exchange '%s' and key '%s'".format(qname, exchange, route))
148         consumer(route, qname, exchange, queueReaderManager.lb, false)
149     }
150   }
151
152   class QueueReader extends Actor {
153
154     def receive = {
155       case Delivery(payload, _, deliveryTag, isRedeliver, _, queue) =>
156         try {
157           val event = parseJsonBytes(payload)
158           inFlightEvents.put(deliveryTag, event)
159
160           if(isRedeliver) {
161             //Message could not be processed 3 times, just ignore it
162             if(redeliveries.contains(event.id)) {
163               logger.warn("Actor[%s] - Event[%s] msg[%d] redelivered >2 times. Rejecting".format(self.getUuid(), event, deliveryTag))
164               queue ! Reject(deliveryTag, false)
165               redeliveries.remove(event.id)
166               inFlightEvents.remove(deliveryTag)
167             } else {
168               //Redeliver, but keep track of the message
169               redeliveries.add(event.id)
170               persisterManager.lb ! Persist(event, payload, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
171             }
172           } else {
173             val eventWithReceivedMillis = event.withReceivedMillis(TimeHelpers.nowMillis()).asInstanceOf[E]
174             persisterManager.lb ! Persist(eventWithReceivedMillis, payload, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
175           }
176
177         } catch { case e: Exception ⇒
178           logger.error("While parsing incoming json bytes payload", e)
179
180           // If we could not create an object from the incoming json, then we just store the message
181           // and then ignore it.
182           // TODO: Possibly the sending site should setup a queue to accept such erroneous messages?
183           try {
184             storeUnparsedEvent(payload, e)
185             queue ! Acknowledge(deliveryTag)
186           } catch { case e: Exception ⇒
187             // Aquarium internal error here...
188             logger.error("Could not store unparsed json bytes payload", e)
189             queue ! Reject(deliveryTag, true)
190           }
191         }
192
193       case PersistOK(ackData) =>
194         logger.debug("Actor[%s] - Stored event[%s] msg[%d] - %d left".format(self.getUuid(), ackData.msgId, ackData.deliveryTag, inFlightEvents.size))
195         ackData.queue ! Acknowledge(ackData.deliveryTag)
196
197       case PersistFailed(ackData) =>
198         //Give the message a chance to be processed by other processors
199         logger.error("Actor[%s] - Storing event[%s] msg[%d] failed".format(self.getUuid(), ackData.msgId, ackData.deliveryTag))
200         inFlightEvents.remove(ackData.deliveryTag)
201         ackData.queue ! Reject(ackData.deliveryTag, true)
202
203       case Duplicate(ackData) =>
204         logger.debug("Actor[%s] - Event[%s] msg[%d] is duplicate".format(self.getUuid(), ackData.msgId, ackData.deliveryTag))
205         inFlightEvents.remove(ackData.deliveryTag)
206         ackData.queue ! Reject(ackData.deliveryTag, false)
207
208       case Acknowledged(deliveryTag) =>
209         logger.debug("Actor[%s] - Msg with tag [%d] acked. Forwarding...".format(self.getUuid(), deliveryTag))
210         forward(inFlightEvents.remove(deliveryTag))
211
212       case Rejected(deliveryTag) =>
213         logger.debug("Actor[%s] - Msg with tag [%d] rejected".format(self.getUuid(), deliveryTag))
214
215       case _ => logger.warn("Unknown message")
216     }
217
218     override def preStart = {
219       logger.debug("Starting actor QueueReader-%s".format(self.getUuid()))
220       super.preStart
221     }
222
223     self.dispatcher = queueReaderManager.dispatcher
224   }
225
226   class Persister extends Actor with Loggable {
227
228     def receive = {
229       case Persist(event, initialPayload, sender, ackData) ⇒
230         if(existsInStore(event))
231           sender ! Duplicate(ackData)
232         else MaybeEither {
233           storeParsedEvent(event, initialPayload)
234         } forJust { just ⇒
235           sender ! PersistOK(ackData)
236         } forFailed { case Failed(e) ⇒
237           sender ! PersistFailed(ackData)
238           logger.error("While persisting event", e)
239         }
240     }
241
242     self.dispatcher = persisterManager.dispatcher
243   }
244
245   class QueueReaderManager {
246     lazy val lb = loadBalancerActor(new CyclicIterator(actors))
247
248     lazy val dispatcher =
249       Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "-queuereader")
250         .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(1000)
251         .setMaxPoolSize(2 * numCPUs)
252         .setCorePoolSize(queueReaderThreads)
253         .setKeepAliveTimeInMillis(60000)
254         .setRejectionPolicy(new CallerRunsPolicy).build
255
256     lazy val actors =
257       for(i <- 0 until numQueueActors) yield {
258         val actor = actorOf(new QueueReader)
259         supervisor.link(actor)
260         actor.start()
261         actor
262       }
263
264     def stop() = dispatcher.stopAllAttachedActors
265   }
266
267   class PersisterManager {
268     lazy val lb = loadBalancerActor(new CyclicIterator(actors))
269
270     val dispatcher =
271       Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "-persister")
272         .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(1000)
273         .setMaxPoolSize(2 * numCPUs)
274         .setCorePoolSize(persisterThreads)
275         .setKeepAliveTimeInMillis(60000)
276         .setRejectionPolicy(new CallerRunsPolicy).build
277
278     lazy val actors =
279       for(i <- 0 until numPersisterActors) yield {
280         val actor = actorOf(new Persister)
281         supervisor.link(actor)
282         actor.start()
283         actor
284       }
285
286     def stop() = dispatcher.stopAllAttachedActors
287   }
288
289 }