Remove dead code before migrating to single project setup
[aquarium] / logic / src / main / scala / gr / grnet / aquarium / processor / actor / ResourceEventProcessorService.scala
1 /*
2  * Copyright 2011 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.processor.actor
37
38 import gr.grnet.aquarium.messaging.AkkaAMQP
39 import gr.grnet.aquarium.logic.events.ResourceEvent
40 import akka.amqp.{Reject, Acknowledge, Acknowledged, Delivery}
41 import com.ckkloverdos.maybe.{NoVal, Failed, Just}
42 import gr.grnet.aquarium.{MasterConf}
43 import gr.grnet.aquarium.util.{Lifecycle, Loggable}
44
45 import akka.actor._
46 import akka.actor.Actor._
47 import akka.routing.CyclicIterator
48 import akka.routing.Routing._
49 import akka.dispatch.Dispatchers
50 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
51 import akka.config.Supervision.SupervisorConfig
52 import akka.config.Supervision.OneForOneStrategy
53 import java.util.concurrent.ConcurrentSkipListSet
54
55 /**
56  * An actor that gets events from the queue, stores them persistently
57  * and forwards them for further processing.
58  *
59  * @author Georgios Gousios <gousiosg@gmail.com>
60  */
61 class ResourceEventProcessorService extends AkkaAMQP with Loggable
62 with Lifecycle {
63
64   case class AckData(msgId: String, deliveryTag: Long, queue: ActorRef)
65   case class Persist(event: ResourceEvent, sender: ActorRef, ackData: AckData)
66   case class PersistOK(ackData: AckData)
67   case class PersistFailed(ackData: AckData)
68   case class Duplicate(ackData: AckData)
69
70   val redeliveries = new ConcurrentSkipListSet[String]()
71
72   class QueueReader extends Actor {
73
74     def receive = {
75       case Delivery(payload, _, deliveryTag, isRedeliver, _, queue) =>
76         val event = ResourceEvent.fromBytes(payload)
77         if (isRedeliver) {
78           //Message could not be processed 3 times, just
79           if (redeliveries.contains(event.id)) {
80             logger.warn("Event %s redelivered >2 times. Ignoring".format(event))
81             queue ! Reject(deliveryTag, false)
82             redeliveries.remove(event.id)
83           } else {
84             //Redeliver, but keep track of the message
85             redeliveries.add(event.id)
86             PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
87           }
88         } else
89           PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
90
91       case PersistOK(ackData) =>
92         logger.debug("Stored event:%s".format(ackData.msgId))
93         ackData.queue ! Acknowledge(ackData.deliveryTag)
94
95       case PersistFailed(ackData) =>
96         //Give the message a chance to be processed by other processors
97         logger.debug("Storing event:%s failed".format(ackData.msgId))
98         ackData.queue ! Reject(ackData.deliveryTag, true)
99
100       case Duplicate(ackData) =>
101         logger.debug("Event:%s is duplicate".format(ackData.msgId))
102         ackData.queue ! Reject(ackData.deliveryTag, false)
103
104       case Acknowledged(deliveryTag) =>
105       //Forward to the dispatcher
106
107       case _ => logger.warn("Unknown message")
108     }
109
110     self.dispatcher = QueueReaderManager.dispatcher
111   }
112
113   class Persister extends Actor {
114
115     def receive = {
116       case Persist(event, sender, ackData) =>
117         if (exists(event))
118           sender ! Duplicate(ackData)
119         else if (persist(event))
120           sender ! PersistOK(ackData)
121         else
122           sender ! PersistFailed(ackData)
123       case _ => logger.warn("Unknown message")
124     }
125
126     def exists(event: ResourceEvent): Boolean =
127       !MasterConf.MasterConf.eventStore.findEventById(event.id).isEmpty
128
129     def persist(event: ResourceEvent): Boolean = {
130       MasterConf.MasterConf.eventStore.storeEvent(event) match {
131         case Just(x) => true
132         case x: Failed =>
133           logger.error("Could not save event: %s".format(event))
134           false
135         case NoVal => false
136       }
137     }
138
139     self.dispatcher = PersisterManager.dispatcher
140   }
141
142   lazy val supervisor = Supervisor(SupervisorConfig(
143     OneForOneStrategy(
144       List(classOf[Exception]), //What exceptions will be handled
145       3, // maximum number of restart retries
146       5000 // within time in millis
147     ), Nil
148
149   ))
150
151   object QueueReaderManager {
152     val numCPUs = Runtime.getRuntime.availableProcessors
153     var actors: List[ActorRef] = _
154     lazy val lb = loadBalancerActor(new CyclicIterator(actors))
155
156     lazy val dispatcher =
157       Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("QueueReaderDispatcher")
158         .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
159         .setMaxPoolSize(numCPUs)
160         .setCorePoolSize(2)
161         .setKeepAliveTimeInMillis(60000)
162         .setRejectionPolicy(new CallerRunsPolicy).build
163
164     def start() = {
165       actors = {
166         for (i <- 0 until numCPUs) yield {
167           val actor = actorOf(new QueueReader)
168           supervisor.link(actor)
169           actor.start()
170           actor
171         }
172       }.toList
173     }
174
175     def stop() = dispatcher.stopAllAttachedActors
176   }
177
178   object PersisterManager {
179     val numCPUs = Runtime.getRuntime.availableProcessors
180     var actors: List[ActorRef] = _
181
182     lazy val lb = loadBalancerActor(new CyclicIterator(actors))
183
184     val dispatcher =
185       Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("PersisterDispatcher")
186         .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
187         .setMaxPoolSize(numCPUs)
188         .setCorePoolSize(2)
189         .setKeepAliveTimeInMillis(60000)
190         .setRejectionPolicy(new CallerRunsPolicy).build
191
192     def start() = {
193       actors = {
194         for (i <- 0 until numCPUs) yield {
195           val actor = actorOf(new Persister)
196           supervisor.link(actor)
197           actor.start()
198           actor
199         }
200       }.toList
201     }
202
203     def stop() = dispatcher.stopAllAttachedActors
204   }
205
206   def start() {
207     logger.info("Starting resource event processor service")
208
209     QueueReaderManager.start()
210     PersisterManager.start()
211     consumer("event.#", "resource-events", "aquarium", QueueReaderManager.lb, false)
212   }
213
214   def stop() {
215     QueueReaderManager.stop()
216     PersisterManager.stop()
217
218     logger.info("Stopping resource event processor service")
219   }
220 }