Statistics
| Branch: | Tag: | Revision:

root / src / main / scala / gr / grnet / aquarium / service / EventProcessorService.scala @ 3bf7c95c

History | View | Annotate | Download (10.6 kB)

1
/*
2
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *
12
 *   2. Redistributions in binary form must reproduce the above
13
 *      copyright notice, this list of conditions and the following
14
 *      disclaimer in the documentation and/or other materials
15
 *      provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
 * POSSIBILITY OF SUCH DAMAGE.
29
 *
30
 * The views and conclusions contained in the software and
31
 * documentation are those of the authors and should not be
32
 * interpreted as representing official policies, either expressed
33
 * or implied, of GRNET S.A.
34
 */
35

    
36
package gr.grnet.aquarium.service
37

    
38
import gr.grnet.aquarium.util.{Lifecycle, Loggable}
39

    
40
import akka.actor._
41
import akka.actor.Actor._
42
import akka.routing.CyclicIterator
43
import akka.routing.Routing._
44
import akka.dispatch.Dispatchers
45
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
46
import akka.config.Supervision.SupervisorConfig
47
import akka.config.Supervision.OneForOneStrategy
48
import gr.grnet.aquarium.messaging.AkkaAMQP
49
import akka.amqp._
50
import java.util.concurrent.{ConcurrentHashMap, ConcurrentSkipListSet}
51
import com.ckkloverdos.maybe._
52
import gr.grnet.aquarium.util.date.TimeHelpers
53
import gr.grnet.aquarium.{AquariumException, Configurator}
54
import gr.grnet.aquarium.event.ExternalEventModel
55
import gr.grnet.aquarium.actor.ReflectiveActor
56

    
57
/**
58
 * An abstract service that retrieves Aquarium events from a queue,
59
 * stores them persistently and forwards them for further processing.
60
 * The processing happens among two load-balanced actor clusters
61
 * asynchronously. The number of employed actors is always equal to
62
 * the number of processors. The number of threads per load-balanced
63
 * cluster is configurable by subclasses.
64
 *
65
 * @author Georgios Gousios <gousiosg@gmail.com>
66
 */
67
abstract class EventProcessorService[E <: ExternalEventModel] extends AkkaAMQP with Loggable with Lifecycle {
68

    
69
  /* Messages exchanged between the persister and the queuereader */
70
  case class AckData(msgId: String, deliveryTag: Long, queue: ActorRef)
71

    
72
  case class Persist(event: E, initialPayload: Array[Byte], sender: ActorRef, ackData: AckData)
73

    
74
  case class PersistOK(ackData: AckData)
75

    
76
  case class PersistFailed(ackData: AckData)
77

    
78
  case class Duplicate(ackData: AckData)
79

    
80
  /**
81
   * Short term storage for delivery tags to work around AMQP
82
   * limitation with redelivering rejected messages to same host.
83
   *
84
   * FIXME: Grow unbounded???
85
   */
86
  private val redeliveries = new ConcurrentSkipListSet[String]()
87

    
88
  /**
89
   *  Temporarily keeps track of messages while being processed
90
   *
91
   *  FIXME: Grow unbounded???
92
   */
93
  private val inFlightEvents = new ConcurrentHashMap[Long, E](200, 0.9F, 4)
94

    
95
  /* Supervisor actor for each event processing operation */
96
  private lazy val supervisor = Supervisor(SupervisorConfig(
97
    OneForOneStrategy(
98
      List(classOf[Exception]), //What exceptions will be handled
99
      5, // maximum number of restart retries
100
      5000 // within time in millis
101
    ), Nil
102
  ))
103

    
104
  protected def _configurator: Configurator = Configurator.MasterConfigurator
105

    
106
  protected def parseJsonBytes(data: Array[Byte]): E
107

    
108
  protected def forward(event: E): Unit
109

    
110
  protected def existsInStore(event: E): Boolean
111

    
112
  protected def storeParsedEvent(event: E, initialPayload: Array[Byte]): Unit
113

    
114
  protected def storeUnparsedEvent(initialPayload: Array[Byte], exception: Throwable): Unit
115

    
116
  protected def queueReaderThreads: Int
117

    
118
  protected def persisterThreads: Int
119

    
120
  protected def numQueueActors: Int
121

    
122
  protected def numPersisterActors: Int
123

    
124
  protected def name: String
125

    
126
  protected def persisterManager: PersisterManager
127

    
128
  protected def queueReaderManager: QueueReaderManager
129

    
130
  protected val numCPUs = Runtime.getRuntime.availableProcessors
131

    
132
  def start(): Unit
133

    
134
  def stop(): Unit
135

    
136
  protected def declareQueues(conf: String) = {
137
    val decl = _configurator.get(conf)
138
    decl.split(";").foreach {
139
      q =>
140
        val i = q.split(":")
141

    
142
        if(i.size < 3)
143
          throw new AquariumException("Queue declaration \"%s\" not correct".format(q))
144

    
145
        val exchange = i(0)
146
        val route = i(1)
147
        val qname = i(2)
148
        logger.info("Declaring queue '%s' for exchange '%s' and key '%s'".format(qname, exchange, route))
149
        consumer(route, qname, exchange, queueReaderManager.lb, false)
150
    }
151
  }
152

    
153
  class QueueReader extends Actor {
154

    
155
    def receive = {
156
      case Delivery(payload, _, deliveryTag, isRedeliver, _, queue) =>
157
        try {
158
          val event = parseJsonBytes(payload)
159
          inFlightEvents.put(deliveryTag, event)
160

    
161
          if(isRedeliver) {
162
            //Message could not be processed 3 times, just ignore it
163
            if(redeliveries.contains(event.id)) {
164
              logger.warn("Actor[%s] - Event[%s] msg[%d] redelivered >2 times. Rejecting".format(self.getUuid(), event, deliveryTag))
165
              queue ! Reject(deliveryTag, false)
166
              redeliveries.remove(event.id)
167
              inFlightEvents.remove(deliveryTag)
168
            } else {
169
              //Redeliver, but keep track of the message
170
              redeliveries.add(event.id)
171
              persisterManager.lb ! Persist(event, payload, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
172
            }
173
          } else {
174
            val eventWithReceivedMillis = event.withReceivedMillis(TimeHelpers.nowMillis()).asInstanceOf[E]
175
            persisterManager.lb ! Persist(eventWithReceivedMillis, payload, queueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
176
          }
177

    
178
        } catch { case e: Exception ⇒
179
          logger.error("While parsing incoming json bytes payload", e)
180

    
181
          // If we could not create an object from the incoming json, then we just store the message
182
          // and then ignore it.
183
          // TODO: Possibly the sending site should setup a queue to accept such erroneous messages?
184
          try {
185
            storeUnparsedEvent(payload, e)
186
            queue ! Acknowledge(deliveryTag)
187
          } catch { case e: Exception ⇒
188
            // Aquarium internal error here...
189
            logger.error("Could not store unparsed json bytes payload", e)
190
            queue ! Reject(deliveryTag, true)
191
          }
192
        }
193

    
194
      case PersistOK(ackData) =>
195
        logger.debug("Actor[%s] - Stored event[%s] msg[%d] - %d left".format(self.getUuid(), ackData.msgId, ackData.deliveryTag, inFlightEvents.size))
196
        ackData.queue ! Acknowledge(ackData.deliveryTag)
197

    
198
      case PersistFailed(ackData) =>
199
        //Give the message a chance to be processed by other processors
200
        logger.error("Actor[%s] - Storing event[%s] msg[%d] failed".format(self.getUuid(), ackData.msgId, ackData.deliveryTag))
201
        inFlightEvents.remove(ackData.deliveryTag)
202
        ackData.queue ! Reject(ackData.deliveryTag, true)
203

    
204
      case Duplicate(ackData) =>
205
        logger.debug("Actor[%s] - Event[%s] msg[%d] is duplicate".format(self.getUuid(), ackData.msgId, ackData.deliveryTag))
206
        inFlightEvents.remove(ackData.deliveryTag)
207
        ackData.queue ! Reject(ackData.deliveryTag, false)
208

    
209
      case Acknowledged(deliveryTag) =>
210
        logger.debug("Actor[%s] - Msg with tag [%d] acked. Forwarding...".format(self.getUuid(), deliveryTag))
211
        forward(inFlightEvents.remove(deliveryTag))
212

    
213
      case Rejected(deliveryTag) =>
214
        logger.debug("Actor[%s] - Msg with tag [%d] rejected".format(self.getUuid(), deliveryTag))
215

    
216
      case _ => logger.warn("Unknown message")
217
    }
218

    
219
    override def preStart = {
220
      logger.debug("Starting actor QueueReader-%s".format(self.getUuid()))
221
      super.preStart
222
    }
223

    
224
    self.dispatcher = queueReaderManager.dispatcher
225
  }
226

    
227
  class Persister extends ReflectiveActor {
228

    
229
    def knownMessageTypes = Set(classOf[Persist])
230

    
231
    override protected def onThrowable(t: Throwable, servicingMessage: AnyRef) {
232
      logChainOfCauses(t)
233
      servicingMessage match {
234
        case Persist(event, initialPayload, sender, ackData) ⇒
235
          sender ! PersistFailed(ackData)
236
          logger.error("While persisting", t)
237
      }
238
    }
239

    
240
    def onPersist(persist: Persist): Unit = {
241
      persist match {
242
        case Persist(event, initialPayload, sender, ackData) ⇒
243
          if(existsInStore(event)) {
244
            sender ! Duplicate(ackData)
245
          }
246
          else {
247
            storeParsedEvent(event, initialPayload)
248
            sender ! PersistOK(ackData)
249
          }
250
      }
251
    }
252

    
253
    self.dispatcher = persisterManager.dispatcher
254
  }
255

    
256
  class QueueReaderManager {
257
    lazy val lb = loadBalancerActor(new CyclicIterator(actors))
258

    
259
    lazy val dispatcher =
260
      Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "-queuereader")
261
        .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(1000)
262
        .setMaxPoolSize(2 * numCPUs)
263
        .setCorePoolSize(queueReaderThreads)
264
        .setKeepAliveTimeInMillis(60000)
265
        .setRejectionPolicy(new CallerRunsPolicy).build
266

    
267
    lazy val actors =
268
      for(i <- 0 until numQueueActors) yield {
269
        val actor = actorOf(new QueueReader)
270
        supervisor.link(actor)
271
        actor.start()
272
        actor
273
      }
274

    
275
    def stop() = dispatcher.stopAllAttachedActors
276
  }
277

    
278
  class PersisterManager {
279
    lazy val lb = loadBalancerActor(new CyclicIterator(actors))
280

    
281
    val dispatcher =
282
      Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(name + "-persister")
283
        .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(1000)
284
        .setMaxPoolSize(2 * numCPUs)
285
        .setCorePoolSize(persisterThreads)
286
        .setKeepAliveTimeInMillis(60000)
287
        .setRejectionPolicy(new CallerRunsPolicy).build
288

    
289
    lazy val actors =
290
      for(i <- 0 until numPersisterActors) yield {
291
        val actor = actorOf(new Persister)
292
        supervisor.link(actor)
293
        actor.start()
294
        actor
295
      }
296

    
297
    def stop() = dispatcher.stopAllAttachedActors
298
  }
299

    
300
}