root / src / main / scala / gr / grnet / aquarium / processor / actor / ResourceEventProcessorService.scala @ 7958f348
History | View | Annotate | Download (7.7 kB)
1 |
/* |
---|---|
2 |
* Copyright 2011 GRNET S.A. All rights reserved. |
3 |
* |
4 |
* Redistribution and use in source and binary forms, with or |
5 |
* without modification, are permitted provided that the following |
6 |
* conditions are met: |
7 |
* |
8 |
* 1. Redistributions of source code must retain the above |
9 |
* copyright notice, this list of conditions and the following |
10 |
* disclaimer. |
11 |
* |
12 |
* 2. Redistributions in binary form must reproduce the above |
13 |
* copyright notice, this list of conditions and the following |
14 |
* disclaimer in the documentation and/or other materials |
15 |
* provided with the distribution. |
16 |
* |
17 |
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
18 |
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
19 |
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
20 |
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
21 |
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
22 |
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
23 |
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
24 |
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
25 |
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
26 |
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
27 |
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
28 |
* POSSIBILITY OF SUCH DAMAGE. |
29 |
* |
30 |
* The views and conclusions contained in the software and |
31 |
* documentation are those of the authors and should not be |
32 |
* interpreted as representing official policies, either expressed |
33 |
* or implied, of GRNET S.A. |
34 |
*/ |
35 |
|
36 |
package gr.grnet.aquarium.processor.actor |
37 |
|
38 |
import gr.grnet.aquarium.logic.events.ResourceEvent |
39 |
import com.ckkloverdos.maybe.{NoVal, Failed, Just} |
40 |
import gr.grnet.aquarium.{Configurator} |
41 |
import gr.grnet.aquarium.util.{Lifecycle, Loggable} |
42 |
|
43 |
import akka.actor._ |
44 |
import akka.actor.Actor._ |
45 |
import akka.routing.CyclicIterator |
46 |
import akka.routing.Routing._ |
47 |
import akka.dispatch.Dispatchers |
48 |
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy |
49 |
import akka.config.Supervision.SupervisorConfig |
50 |
import akka.config.Supervision.OneForOneStrategy |
51 |
import java.util.concurrent.ConcurrentSkipListSet |
52 |
import gr.grnet.aquarium.messaging.{MessagingNames, AkkaAMQP} |
53 |
import akka.amqp._ |
54 |
|
55 |
/** |
56 |
* An actor that gets events from the queue, stores them persistently |
57 |
* and forwards them for further processing. |
58 |
* |
59 |
* @author Georgios Gousios <gousiosg@gmail.com> |
60 |
*/ |
61 |
class ResourceEventProcessorService extends AkkaAMQP with Loggable |
62 |
with Lifecycle { |
63 |
|
64 |
case class AckData(msgId: String, deliveryTag: Long, queue: ActorRef) |
65 |
case class Persist(event: ResourceEvent, sender: ActorRef, ackData: AckData) |
66 |
case class PersistOK(ackData: AckData) |
67 |
case class PersistFailed(ackData: AckData) |
68 |
case class Duplicate(ackData: AckData) |
69 |
|
70 |
val redeliveries = new ConcurrentSkipListSet[String]() |
71 |
|
72 |
class QueueReader extends Actor { |
73 |
|
74 |
def receive = { |
75 |
case Delivery(payload, _, deliveryTag, isRedeliver, _, queue) => |
76 |
val event = ResourceEvent.fromBytes(payload) |
77 |
if (isRedeliver) { |
78 |
//Message could not be processed 3 times, just |
79 |
if (redeliveries.contains(event.id)) { |
80 |
logger.warn("Event[%s] msg[%d] redelivered >2 times. Rejecting".format(event, deliveryTag)) |
81 |
queue ! Reject(deliveryTag, false) |
82 |
redeliveries.remove(event.id) |
83 |
} else { |
84 |
//Redeliver, but keep track of the message |
85 |
redeliveries.add(event.id) |
86 |
PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get)) |
87 |
} |
88 |
} else |
89 |
PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get)) |
90 |
|
91 |
case PersistOK(ackData) => |
92 |
logger.debug("Stored event[%s] msg[%d]".format(ackData.msgId, ackData.deliveryTag)) |
93 |
ackData.queue ! Acknowledge(ackData.deliveryTag) |
94 |
|
95 |
case PersistFailed(ackData) => |
96 |
//Give the message a chance to be processed by other processors |
97 |
logger.debug("Storing event[%s] msg[%d] failed".format(ackData.msgId, ackData.deliveryTag)) |
98 |
ackData.queue ! Reject(ackData.deliveryTag, true) |
99 |
|
100 |
case Duplicate(ackData) => |
101 |
logger.debug("Event[%s] msg[%d] is duplicate".format(ackData.msgId, ackData.deliveryTag)) |
102 |
ackData.queue ! Reject(ackData.deliveryTag, false) |
103 |
|
104 |
case Acknowledged(deliveryTag) => |
105 |
logger.debug("Msg with tag [%d] acked".format(deliveryTag)) |
106 |
//TODO: Forward to the dispatcher |
107 |
|
108 |
case Rejected(deliveryTag) => |
109 |
logger.debug("Msg with tag [%d] rejected".format(deliveryTag)) |
110 |
|
111 |
case _ => logger.warn("Unknown message") |
112 |
} |
113 |
|
114 |
self.dispatcher = QueueReaderManager.dispatcher |
115 |
} |
116 |
|
117 |
class Persister extends Actor { |
118 |
|
119 |
def receive = { |
120 |
case Persist(event, sender, ackData) => |
121 |
if (exists(event)) |
122 |
sender ! Duplicate(ackData) |
123 |
else if (persist(event)) |
124 |
sender ! PersistOK(ackData) |
125 |
else |
126 |
sender ! PersistFailed(ackData) |
127 |
case _ => logger.warn("Unknown message") |
128 |
} |
129 |
|
130 |
def exists(event: ResourceEvent): Boolean = |
131 |
Configurator.MasterConfigurator.resourceEventStore.findResourceEventById(event.id).isJust |
132 |
|
133 |
def persist(event: ResourceEvent): Boolean = { |
134 |
Configurator.MasterConfigurator.resourceEventStore.storeResourceEvent(event) match { |
135 |
case Just(x) => true |
136 |
case x: Failed => |
137 |
logger.error("Could not save event: %s".format(event)) |
138 |
false |
139 |
case NoVal => false |
140 |
} |
141 |
} |
142 |
|
143 |
self.dispatcher = PersisterManager.dispatcher |
144 |
} |
145 |
|
146 |
lazy val supervisor = Supervisor(SupervisorConfig( |
147 |
OneForOneStrategy( |
148 |
List(classOf[Exception]), //What exceptions will be handled |
149 |
3, // maximum number of restart retries |
150 |
5000 // within time in millis |
151 |
), Nil |
152 |
|
153 |
)) |
154 |
|
155 |
object QueueReaderManager { |
156 |
val numCPUs = Runtime.getRuntime.availableProcessors |
157 |
var actors: List[ActorRef] = _ |
158 |
lazy val lb = loadBalancerActor(new CyclicIterator(actors)) |
159 |
|
160 |
lazy val dispatcher = |
161 |
Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("QueueReaderDispatcher") |
162 |
.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100) |
163 |
.setMaxPoolSize(numCPUs) |
164 |
.setCorePoolSize(2) |
165 |
.setKeepAliveTimeInMillis(60000) |
166 |
.setRejectionPolicy(new CallerRunsPolicy).build |
167 |
|
168 |
def start() = { |
169 |
actors = { |
170 |
for (i <- 0 until numCPUs) yield { |
171 |
val actor = actorOf(new QueueReader) |
172 |
supervisor.link(actor) |
173 |
actor.start() |
174 |
actor |
175 |
} |
176 |
}.toList |
177 |
} |
178 |
|
179 |
def stop() = dispatcher.stopAllAttachedActors |
180 |
} |
181 |
|
182 |
object PersisterManager { |
183 |
val numCPUs = Runtime.getRuntime.availableProcessors |
184 |
var actors: List[ActorRef] = _ |
185 |
|
186 |
lazy val lb = loadBalancerActor(new CyclicIterator(actors)) |
187 |
|
188 |
val dispatcher = |
189 |
Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("PersisterDispatcher") |
190 |
.withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100) |
191 |
.setMaxPoolSize(numCPUs) |
192 |
.setCorePoolSize(2) |
193 |
.setKeepAliveTimeInMillis(60000) |
194 |
.setRejectionPolicy(new CallerRunsPolicy).build |
195 |
|
196 |
def start() = { |
197 |
actors = { |
198 |
for (i <- 0 until numCPUs) yield { |
199 |
val actor = actorOf(new Persister) |
200 |
supervisor.link(actor) |
201 |
actor.start() |
202 |
actor |
203 |
} |
204 |
}.toList |
205 |
} |
206 |
|
207 |
def stop() = dispatcher.stopAllAttachedActors |
208 |
} |
209 |
|
210 |
def start() { |
211 |
logger.info("Starting resource event processor service") |
212 |
|
213 |
QueueReaderManager.start() |
214 |
PersisterManager.start() |
215 |
|
216 |
consumer("%s.#".format(MessagingNames.RES_EVENT_KEY), |
217 |
MessagingNames.RESOURCE_EVENT_QUEUE, MessagingNames.AQUARIUM_EXCHANGE, |
218 |
QueueReaderManager.lb, false) |
219 |
} |
220 |
|
221 |
def stop() { |
222 |
QueueReaderManager.stop() |
223 |
PersisterManager.stop() |
224 |
|
225 |
logger.info("Stopping resource event processor service") |
226 |
} |
227 |
} |