Rename package and make-dist with maven offline mode
[aquarium] / src / main / scala / gr / grnet / aquarium / service / EventProcessorService.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.service
37
38 import gr.grnet.aquarium.util.{Lifecycle, Loggable, makeString}
39
40 import akka.actor._
41 import akka.actor.Actor._
42 import akka.routing.CyclicIterator
43 import akka.routing.Routing._
44 import akka.dispatch.Dispatchers
45 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
46 import akka.config.Supervision.SupervisorConfig
47 import akka.config.Supervision.OneForOneStrategy
48 import gr.grnet.aquarium.messaging.AkkaAMQP
49 import akka.amqp._
50 import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet}
51 import com.ckkloverdos.maybe._
52 import gr.grnet.aquarium.util.date.TimeHelpers
53 import gr.grnet.aquarium.{AquariumException, Configurator}
54 import gr.grnet.aquarium.store.RecordID
55 import gr.grnet.aquarium.event.{AquariumEventModel, AquariumEventSkeleton}
56
57 /**
58  * An abstract service that retrieves Aquarium events from a queue,
59  * stores them persistently and forwards them for further processing.
60  * The processing happens among two load-balanced actor clusters
61  * asynchronously. The number of employed actors is always equal to
62  * the number of processors. The number of threads per load-balanced
63  * cluster is configurable by subclasses.
64  *
65  * @author Georgios Gousios <gousiosg@gmail.com>
66  */
67 abstract class EventProcessorService[E <: AquariumEventModel] extends AkkaAMQP with Loggable with Lifecycle {
68
69   /* Messages exchanged between the persister and the queuereader */
70   case class AckData(msgId: String, deliveryTag: Long, queue: ActorRef)
71
72   case class Persist(event: E, initialPayload: Array[Byte], sender: ActorRef, ackData: AckData)
73
74   case class PersistOK(ackData: AckData)
75
76   case class PersistFailed(ackData: AckData)
77
78   case class Duplicate(ackData: AckData)
79
80   /**
81    * Short term storage for delivery tags to work around AMQP
82    * limitation with redelivering rejected messages to same host.
83    *
84    * FIXME: Grow unbounded???
85    */
86   private val redeliveries = new ConcurrentSkipListSet[String]()
87
88   /**
89    *  Temporarily keeps track of messages while being processed
90    *
91    *  FIXME: Grow unbounded???
92    */
93   private val inFlightEvents = new ConcurrentHashMap[Long, E](200, 0.9F, 4)
94
95   /* Supervisor actor for each event processing operation */
96   private lazy val supervisor = Supervisor(SupervisorConfig(
97     OneForOneStrategy(
98       List(classOf[Exception]), //What exceptions will be handled
99       5, // maximum number of restart retries
100       5000 // within time in millis
101     ), Nil
102   ))
103
104   protected def _configurator: Configurator = Configurator.MasterConfigurator
105
106   protected def parseJsonBytes(data: Array[Byte]): E
107
108   protected def forward(event: E): Unit
109
110   protected def exists(event: E): Boolean
111
112   protected def persist(event: E, initialPayload: Array[Byte]): Unit
113
114   protected def persistUnparsed(initialPayload: Array[Byte], exception: Throwable): Unit
115
116   protected def queueReaderThreads: Int
117
118   protected def persisterThreads: Int
119
120   protected def numQueueActors: Int
121
122   protected def numPersisterActors: Int
123
124   protected def name: String
125
126   protected def persisterManager: PersisterManager
127
128   protected def queueReaderManager: QueueReaderManager
129
130   protected val numCPUs = Runtime.getRuntime.availableProcessors
131
132   def start(): Unit
133
134   def stop(): Unit
135
136   protected def declareQueues(conf: String) = {
137     val decl = _configurator.get(conf)
138     decl.split(";").foreach {
139       q =>
140         val i = q.split(":")
141
142         if(i.size < 3)
143           throw new AquariumException("Queue declaration \"%s\" not correct".format(q))
144
145         val exchange = i(0)
146         val route = i(1)
147         val qname = i(2)
148         logger.info("Declaring queue '%s' for exchange '%s' and key '%s'".format(qname, exchange, route))
149         consumer(route, qname, exchange, queueReaderManager.lb, false)
150     }
151   }
152
153   class QueueReader extends Actor {
154
155     def receive = {
156       case Delivery(payload, _, deliveryTag, isRedeliver, _, queue) =>
157         val eventM = MaybeEither {
158           parseJsonBytes(payload)
159         } // either decoded or error
160         eventM match {
161           case Just(event) ⇒
162             inFlightEvents.put(deliveryTag, event)
163
164             if(isRedeliver) {
165               //Message could not be processed 3 times, just ignore it
166               if(redeliveries.contains(event.id)) {
167                 logger.warn("Actor[%s] - Event[%s] msg[%d] redelivered >2 times. Rejecting".format(self.getUuid(), event, deliveryTag))
168                 queue ! Reject(deliveryTag, false)
169                 redeliveries.remove(event.id)
170                 inFlightEvents.remove(deliveryTag)
171               } else {
172                 //Redeliver, but keep track of the message
173                 redeliveries.add(event.id)
174                 persisterManager.lb ! Persist(event, payload, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
175               }
176             } else {
177               val eventWithReceivedMillis = event.withReceivedMillis(TimeHelpers.nowMillis()).asInstanceOf[E]
178               persisterManager.lb ! Persist(eventWithReceivedMillis, payload, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
179             }
180
181           case failed@Failed(e) ⇒
182             logger.error("While decoding payload", e)
183             logger.error("Offensive payload = \n{}", makeString(payload))
184
185             // If we could not create an object from the incoming json, then we just store the message
186             // and then ignore it.
187             // TODO: Possibly the sending site should setup a queue to accept such erroneous messages?
188             MaybeEither {
189               persistUnparsed(payload, e)
190             } match {
191               case Just(_) ⇒
192                 logger.debug("Sending Acknowledge(deliveryTag) = {}", Acknowledge(deliveryTag))
193                 queue ! Acknowledge(deliveryTag)
194               case Failed(e) ⇒
195                 logger.error("Could not persist unparsed event", e)
196                 logger.debug("Sending {}", Reject(deliveryTag, true))
197                 queue ! Reject(deliveryTag, true)
198             }
199         }
200
201       case PersistOK(ackData) =>
202         logger.debug("Actor[%s] - Stored event[%s] msg[%d] - %d left".format(self.getUuid(), ackData.msgId, ackData.deliveryTag, inFlightEvents.size))
203         ackData.queue ! Acknowledge(ackData.deliveryTag)
204
205       case PersistFailed(ackData) =>
206         //Give the message a chance to be processed by other processors
207         logger.error("Actor[%s] - Storing event[%s] msg[%d] failed".format(self.getUuid(), ackData.msgId, ackData.deliveryTag))
208         inFlightEvents.remove(ackData.deliveryTag)
209         ackData.queue ! Reject(ackData.deliveryTag, true)
210
211       case Duplicate(ackData) =>
212         logger.debug("Actor[%s] - Event[%s] msg[%d] is duplicate".format(self.getUuid(), ackData.msgId, ackData.deliveryTag))
213         inFlightEvents.remove(ackData.deliveryTag)
214         ackData.queue ! Reject(ackData.deliveryTag, false)
215
216       case Acknowledged(deliveryTag) =>
217         logger.debug("Actor[%s] - Msg with tag [%d] acked. Forwarding...".format(self.getUuid(), deliveryTag))
218         forward(inFlightEvents.remove(deliveryTag))
219
220       case Rejected(deliveryTag) =>
221         logger.debug("Actor[%s] - Msg with tag [%d] rejected".format(self.getUuid(), deliveryTag))
222
223       case _ => logger.warn("Unknown message")
224     }
225
226     override def preStart = {
227       logger.debug("Starting actor QueueReader-%s".format(self.getUuid()))
228       super.preStart
229     }
230
231     self.dispatcher = queueReaderManager.dispatcher
232   }
233
234   class Persister extends Actor with Loggable {
235
236     def receive = {
237       case Persist(event, initialPayload, sender, ackData) ⇒
238         if(exists(event))
239           sender ! Duplicate(ackData)
240         else MaybeEither {
241           persist(event, initialPayload)
242         } forJust { just ⇒
243           sender ! PersistOK(ackData)
244         } forFailed { case Failed(e) ⇒
245           sender ! PersistFailed(ackData)
246           logger.error("While persisting event", e)
247         }
248     }
249
250     override def preStart = {
251       super.preStart
252       logStarted(TimeHelpers.nowMillis(), TimeHelpers.nowMillis())
253     }
254
255     self.dispatcher = persisterManager.dispatcher
256   }
257
258   class QueueReaderManager {
259     lazy val lb = loadBalancerActor(new CyclicIterator(actors))
260
261     lazy val dispatcher =
262       Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "-queuereader")
263         .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(1000)
264         .setMaxPoolSize(2 * numCPUs)
265         .setCorePoolSize(queueReaderThreads)
266         .setKeepAliveTimeInMillis(60000)
267         .setRejectionPolicy(new CallerRunsPolicy).build
268
269     lazy val actors =
270       for(i <- 0 until numQueueActors) yield {
271         val actor = actorOf(new QueueReader)
272         supervisor.link(actor)
273         actor.start()
274         actor
275       }
276
277     def stop() = dispatcher.stopAllAttachedActors
278   }
279
280   class PersisterManager {
281     lazy val lb = loadBalancerActor(new CyclicIterator(actors))
282
283     val dispatcher =
284       Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "-persister")
285         .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(1000)
286         .setMaxPoolSize(2 * numCPUs)
287         .setCorePoolSize(persisterThreads)
288         .setKeepAliveTimeInMillis(60000)
289         .setRejectionPolicy(new CallerRunsPolicy).build
290
291     lazy val actors =
292       for(i <- 0 until numPersisterActors) yield {
293         val actor = actorOf(new Persister)
294         supervisor.link(actor)
295         actor.start()
296         actor
297       }
298
299     def stop() = dispatcher.stopAllAttachedActors
300   }
301
302 }