3c406782a17045cd0d7d10b382861670b14d609d
[aquarium] / src / main / scala / gr / grnet / aquarium / service / EventProcessorService.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.service
37
38 import gr.grnet.aquarium.util.{Lifecycle, Loggable, makeString}
39
40 import akka.actor._
41 import akka.actor.Actor._
42 import akka.routing.CyclicIterator
43 import akka.routing.Routing._
44 import akka.dispatch.Dispatchers
45 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
46 import akka.config.Supervision.SupervisorConfig
47 import akka.config.Supervision.OneForOneStrategy
48 import gr.grnet.aquarium.messaging.AkkaAMQP
49 import akka.amqp._
50 import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet}
51 import gr.grnet.aquarium.logic.events.AquariumEvent
52 import gr.grnet.aquarium.Configurator
53 import com.ckkloverdos.maybe._
54
55 /**
56  * An abstract service that retrieves Aquarium events from a queue,
57  * stores them persistently and forwards them for further processing.
58  * The processing happens among two load-balanced actor clusters
59  * asynchronously. The number of employed actors is always equal to
60  * the number of processors. The number of threads per load-balanced
61  * cluster is configurable by subclasses.
62  *
63  * @author Georgios Gousios <gousiosg@gmail.com>
64  */
65 abstract class EventProcessorService[E <: AquariumEvent] extends AkkaAMQP with Loggable with Lifecycle {
66
67   /* Messages exchanged between the persister and the queuereader */
68   case class AckData(msgId: String, deliveryTag: Long, queue: ActorRef)
69
70   case class Persist(event: E, initialPayload: Array[Byte], sender: ActorRef, ackData: AckData)
71
72   case class PersistOK(ackData: AckData)
73
74   case class PersistFailed(ackData: AckData)
75
76   case class Duplicate(ackData: AckData)
77
78   /* Short term storage for delivery tags to work around AMQP
79    * limitation with redelivering rejected messages to same host.
80    */
81   private val redeliveries = new ConcurrentSkipListSet[String]()
82
83   /* Temporarily keeps track of messages while being processed */
84   private val inFlightEvents = new ConcurrentHashMap[Long, E](200, 0.9F, 4)
85
86   /* Supervisor actor for each event processing operation */
87   private lazy val supervisor = Supervisor(SupervisorConfig(
88     OneForOneStrategy(
89       List(classOf[Exception]), //What exceptions will be handled
90       5, // maximum number of restart retries
91       5000 // within time in millis
92     ), Nil
93   ))
94
95   protected def _configurator: Configurator = Configurator.MasterConfigurator
96
97   protected def decode(data: Array[Byte]): E
98
99   protected def forward(event: E): Unit
100
101   protected def exists(event: E): Boolean
102
103   protected def persist(event: E, initialPayload: Array[Byte]): Boolean
104
105   protected def persistUnparsed(initialPayload: Array[Byte], exception: Throwable): Unit
106
107   protected def queueReaderThreads: Int
108
109   protected def persisterThreads: Int
110
111   protected def numQueueActors: Int
112
113   protected def numPersisterActors: Int
114
115   protected def name: String
116
117   protected def persisterManager: PersisterManager
118
119   protected def queueReaderManager: QueueReaderManager
120
121   protected val numCPUs = Runtime.getRuntime.availableProcessors
122
123   def start(): Unit
124
125   def stop(): Unit
126
127   protected def declareQueues(conf: String) = {
128     val decl = _configurator.get(conf)
129     decl.split(";").foreach {
130       q =>
131         val i = q.split(":")
132
133         if(i.size < 3)
134           throw new Exception("Queue declaration \"%s\" not correct".format(q))
135
136         val exchange = i(0)
137         val route = i(1)
138         val qname = i(2)
139         logger.info("Declaring queue %s (%s -> %s)".format(qname, exchange, route))
140         consumer(route, qname, exchange, queueReaderManager.lb, false)
141     }
142   }
143
144   class QueueReader extends Actor {
145
146     def receive = {
147       case Delivery(payload, _, deliveryTag, isRedeliver, _, queue) =>
148         val eventM = MaybeEither {
149           decode(payload)
150         } // either decoded or error
151         eventM match {
152           case Just(event) ⇒
153             inFlightEvents.put(deliveryTag, event)
154
155             if(isRedeliver) {
156               //Message could not be processed 3 times, just ignore it
157               if(redeliveries.contains(event.id)) {
158                 logger.warn("Actor[%s] - Event[%s] msg[%d] redelivered >2 times. Rejecting".format(self.getUuid(), event, deliveryTag))
159                 queue ! Reject(deliveryTag, false)
160                 redeliveries.remove(event.id)
161                 inFlightEvents.remove(deliveryTag)
162               } else {
163                 //Redeliver, but keep track of the message
164                 redeliveries.add(event.id)
165                 persisterManager.lb ! Persist(event, payload, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
166               }
167             } else {
168               val eventWithReceivedMillis = event.copyWithReceivedMillis(System.currentTimeMillis()).asInstanceOf[E]
169               persisterManager.lb ! Persist(eventWithReceivedMillis, payload, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
170             }
171
172           case failed@Failed(e) ⇒
173             logger.error("While decoding payload", e)
174             logger.error("Offensive payload = \n{}", makeString(payload))
175
176             // If we could not create an object from the incoming json, then we just store the message
177             // and then ignore it.
178             // TODO: Possibly the sending site should setup a queue to accept such erroneous messages?
179             MaybeEither {
180               persistUnparsed(payload, e)
181             } match {
182               case Just(_) ⇒
183                 logger.debug("Sending Acknowledge(deliveryTag) = {}", Acknowledge(deliveryTag))
184                 queue ! Acknowledge(deliveryTag)
185               case Failed(e) ⇒
186                 logger.error("While persisting unparsed event", e)
187                 logger.debug("Sending Reject(deliveryTag, true) = {}", Reject(deliveryTag, true))
188                 queue ! Reject(deliveryTag, true)
189             }
190         }
191
192       case PersistOK(ackData) =>
193         logger.debug("Actor[%s] - Stored event[%s] msg[%d] - %d left".format(self.getUuid(), ackData.msgId, ackData.deliveryTag, inFlightEvents.size))
194         ackData.queue ! Acknowledge(ackData.deliveryTag)
195
196       case PersistFailed(ackData) =>
197         //Give the message a chance to be processed by other processors
198         logger.error("Actor[%s] - Storing event[%s] msg[%d] failed".format(self.getUuid(), ackData.msgId, ackData.deliveryTag))
199         inFlightEvents.remove(ackData.deliveryTag)
200         ackData.queue ! Reject(ackData.deliveryTag, true)
201
202       case Duplicate(ackData) =>
203         logger.debug("Actor[%s] - Event[%s] msg[%d] is duplicate".format(self.getUuid(), ackData.msgId, ackData.deliveryTag))
204         inFlightEvents.remove(ackData.deliveryTag)
205         ackData.queue ! Reject(ackData.deliveryTag, false)
206
207       case Acknowledged(deliveryTag) =>
208         logger.debug("Actor[%s] - Msg with tag [%d] acked. Forwarding...".format(self.getUuid(), deliveryTag))
209         forward(inFlightEvents.remove(deliveryTag))
210
211       case Rejected(deliveryTag) =>
212         logger.debug("Actor[%s] - Msg with tag [%d] rejected".format(self.getUuid(), deliveryTag))
213
214       case _ => logger.warn("Unknown message")
215     }
216
217     override def preStart = {
218       logger.debug("Starting actor QueueReader-%s".format(self.getUuid()))
219       super.preStart
220     }
221
222     self.dispatcher = queueReaderManager.dispatcher
223   }
224
225   class Persister extends Actor {
226
227     def receive = {
228       case Persist(event, initialPayload, sender, ackData) ⇒
229         logger.debug("Persister-%s attempting store".format(self.getUuid()))
230         //val time = System.currentTimeMillis()
231         if(exists(event))
232           sender ! Duplicate(ackData)
233         else if(persist(event, initialPayload)) {
234           //logger.debug("Persist time: %d ms".format(System.currentTimeMillis() - time))
235           sender ! PersistOK(ackData)
236         } else
237           sender ! PersistFailed(ackData)
238       case _ => logger.warn("Unknown message")
239     }
240
241     override def preStart = {
242       logger.debug("Starting actor Persister-%s".format(self.getUuid()))
243       super.preStart
244     }
245
246     self.dispatcher = persisterManager.dispatcher
247   }
248
249   class QueueReaderManager {
250     lazy val lb = loadBalancerActor(new CyclicIterator(actors))
251
252     lazy val dispatcher =
253       Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "-queuereader")
254         .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(1000)
255         .setMaxPoolSize(2 * numCPUs)
256         .setCorePoolSize(queueReaderThreads)
257         .setKeepAliveTimeInMillis(60000)
258         .setRejectionPolicy(new CallerRunsPolicy).build
259
260     lazy val actors =
261       for(i <- 0 until numQueueActors) yield {
262         val actor = actorOf(new QueueReader)
263         supervisor.link(actor)
264         actor.start()
265         actor
266       }
267
268     def stop() = dispatcher.stopAllAttachedActors
269   }
270
271   class PersisterManager {
272     lazy val lb = loadBalancerActor(new CyclicIterator(actors))
273
274     val dispatcher =
275       Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "-persister")
276         .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(1000)
277         .setMaxPoolSize(2 * numCPUs)
278         .setCorePoolSize(persisterThreads)
279         .setKeepAliveTimeInMillis(60000)
280         .setRejectionPolicy(new CallerRunsPolicy).build
281
282     lazy val actors =
283       for(i <- 0 until numPersisterActors) yield {
284         val actor = actorOf(new Persister)
285         supervisor.link(actor)
286         actor.start()
287         actor
288       }
289
290     def stop() = dispatcher.stopAllAttachedActors
291   }
292
293 }