Calculate user state from resource events (wip).
[aquarium] / src / main / scala / gr / grnet / aquarium / processor / actor / ResourceEventProcessorService.scala
1 /*
2  * Copyright 2011 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.processor.actor
37
38 import gr.grnet.aquarium.logic.events.ResourceEvent
39 import com.ckkloverdos.maybe.{NoVal, Failed, Just}
40 import gr.grnet.aquarium.{Configurator}
41 import gr.grnet.aquarium.util.{Lifecycle, Loggable}
42
43 import akka.actor._
44 import akka.actor.Actor._
45 import akka.routing.CyclicIterator
46 import akka.routing.Routing._
47 import akka.dispatch.Dispatchers
48 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
49 import akka.config.Supervision.SupervisorConfig
50 import akka.config.Supervision.OneForOneStrategy
51 import java.util.concurrent.ConcurrentSkipListSet
52 import gr.grnet.aquarium.messaging.{MessagingNames, AkkaAMQP}
53 import akka.amqp._
54 import gr.grnet.aquarium.actor.DispatcherRole
55
56 /**
57  * An actor that gets events from the queue, stores them persistently
58  * and forwards them for further processing.
59  *
60  * @author Georgios Gousios <gousiosg@gmail.com>
61  */
62 class ResourceEventProcessorService extends AkkaAMQP with Loggable
63 with Lifecycle {
64
65   case class AckData(msgId: String, deliveryTag: Long, queue: ActorRef)
66   case class Persist(event: ResourceEvent, sender: ActorRef, ackData: AckData)
67   case class PersistOK(ackData: AckData)
68   case class PersistFailed(ackData: AckData)
69   case class Duplicate(ackData: AckData)
70
71   val redeliveries = new ConcurrentSkipListSet[String]()
72
73   private[this] def _configurator: Configurator = Configurator.MasterConfigurator
74   private[this] def _calcStateChanges(resourceEvent: ResourceEvent): Unit = {
75     val businessLogicDispacther = _configurator.actorProvider.actorForRole(DispatcherRole)
76     businessLogicDispacther ! ProcessResourceEvent(resourceEvent) // all state change, credit calc etc will happen there
77   }
78
79   class QueueReader extends Actor {
80
81     def receive = {
82       case Delivery(payload, _, deliveryTag, isRedeliver, _, queue) =>
83         val event = ResourceEvent.fromBytes(payload)
84         if (isRedeliver) {
85           //Message could not be processed 3 times, just
86           if (redeliveries.contains(event.id)) {
87             logger.warn("Event[%s] msg[%d] redelivered >2 times. Rejecting".format(event, deliveryTag))
88             queue ! Reject(deliveryTag, false)
89             redeliveries.remove(event.id)
90           } else {
91             //Redeliver, but keep track of the message
92             redeliveries.add(event.id)
93             PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
94           }
95         } else {
96           val eventWithReceivedMillis = event.copy(receivedMillis = System.currentTimeMillis())
97           PersisterManager.lb ! Persist(eventWithReceivedMillis, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
98         }
99
100       case PersistOK(ackData) =>
101         logger.debug("Stored event[%s] msg[%d]".format(ackData.msgId, ackData.deliveryTag))
102         ackData.queue ! Acknowledge(ackData.deliveryTag)
103
104       case PersistFailed(ackData) =>
105         //Give the message a chance to be processed by other processors
106         logger.debug("Storing event[%s] msg[%d] failed".format(ackData.msgId, ackData.deliveryTag))
107         ackData.queue ! Reject(ackData.deliveryTag, true)
108
109       case Duplicate(ackData) =>
110         logger.debug("Event[%s] msg[%d] is duplicate".format(ackData.msgId, ackData.deliveryTag))
111         ackData.queue ! Reject(ackData.deliveryTag, false)
112
113       case Acknowledged(deliveryTag) =>
114         logger.debug("Msg with tag [%d] acked".format(deliveryTag))
115       //TODO: Forward to the dispatcher
116
117       case Rejected(deliveryTag) =>
118         logger.debug("Msg with tag [%d] rejected".format(deliveryTag))
119
120       case _ => logger.warn("Unknown message")
121     }
122
123     self.dispatcher = QueueReaderManager.dispatcher
124   }
125
126   class Persister extends Actor {
127
128     def receive = {
129       case Persist(event, sender, ackData) =>
130         if (exists(event))
131           sender ! Duplicate(ackData)
132         else if (persist(event)) {
133           sender ! PersistOK(ackData)
134           // TODO: Move to some proper place (after ACK?)
135           _calcStateChanges(event)
136         } else
137           sender ! PersistFailed(ackData)
138       case _ => logger.warn("Unknown message")
139     }
140
141     def exists(event: ResourceEvent): Boolean =
142       _configurator.resourceEventStore.findResourceEventById(event.id).isJust
143
144     def persist(event: ResourceEvent): Boolean = {
145       _configurator.resourceEventStore.storeResourceEvent(event) match {
146         case Just(x) => true
147         case x: Failed =>
148           logger.error("Could not save event: %s".format(event))
149           false
150         case NoVal => false
151       }
152     }
153
154     self.dispatcher = PersisterManager.dispatcher
155   }
156
157   lazy val supervisor = Supervisor(SupervisorConfig(
158     OneForOneStrategy(
159       List(classOf[Exception]), //What exceptions will be handled
160       3, // maximum number of restart retries
161       5000 // within time in millis
162     ), Nil
163
164   ))
165
166   object QueueReaderManager {
167     val numCPUs = Runtime.getRuntime.availableProcessors
168     var actors: List[ActorRef] = _
169     lazy val lb = loadBalancerActor(new CyclicIterator(actors))
170
171     lazy val dispatcher =
172       Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("QueueReaderDispatcher")
173         .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
174         .setMaxPoolSize(numCPUs)
175         .setCorePoolSize(2)
176         .setKeepAliveTimeInMillis(60000)
177         .setRejectionPolicy(new CallerRunsPolicy).build
178
179     def start() = {
180       actors = {
181         for (i <- 0 until numCPUs) yield {
182           val actor = actorOf(new QueueReader)
183           supervisor.link(actor)
184           actor.start()
185           actor
186         }
187       }.toList
188     }
189
190     def stop() = dispatcher.stopAllAttachedActors
191   }
192
193   object PersisterManager {
194     val numCPUs = Runtime.getRuntime.availableProcessors
195     var actors: List[ActorRef] = _
196
197     lazy val lb = loadBalancerActor(new CyclicIterator(actors))
198
199     val dispatcher =
200       Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("PersisterDispatcher")
201         .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
202         .setMaxPoolSize(numCPUs)
203         .setCorePoolSize(2)
204         .setKeepAliveTimeInMillis(60000)
205         .setRejectionPolicy(new CallerRunsPolicy).build
206
207     def start() = {
208       actors = {
209         for (i <- 0 until numCPUs) yield {
210           val actor = actorOf(new Persister)
211           supervisor.link(actor)
212           actor.start()
213           actor
214         }
215       }.toList
216     }
217
218     def stop() = dispatcher.stopAllAttachedActors
219   }
220
221   def start() {
222     logger.info("Starting resource event processor service")
223
224     QueueReaderManager.start()
225     PersisterManager.start()
226
227     consumer("%s.#".format(MessagingNames.RES_EVENT_KEY),
228       MessagingNames.RESOURCE_EVENT_QUEUE, MessagingNames.AQUARIUM_EXCHANGE,
229       QueueReaderManager.lb, false)
230   }
231
232   def stop() {
233     QueueReaderManager.stop()
234     PersisterManager.stop()
235
236     logger.info("Stopping resource event processor service")
237   }
238 }