Statistics
| Branch: | Tag: | Revision:

root / src / main / scala / gr / grnet / aquarium / processor / actor / ResourceEventProcessorService.scala @ 7958f348

History | View | Annotate | Download (7.7 kB)

1
/*
2
 * Copyright 2011 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *
12
 *   2. Redistributions in binary form must reproduce the above
13
 *      copyright notice, this list of conditions and the following
14
 *      disclaimer in the documentation and/or other materials
15
 *      provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
 * POSSIBILITY OF SUCH DAMAGE.
29
 *
30
 * The views and conclusions contained in the software and
31
 * documentation are those of the authors and should not be
32
 * interpreted as representing official policies, either expressed
33
 * or implied, of GRNET S.A.
34
 */
35

    
36
package gr.grnet.aquarium.processor.actor
37

    
38
import gr.grnet.aquarium.logic.events.ResourceEvent
39
import com.ckkloverdos.maybe.{NoVal, Failed, Just}
40
import gr.grnet.aquarium.{Configurator}
41
import gr.grnet.aquarium.util.{Lifecycle, Loggable}
42

    
43
import akka.actor._
44
import akka.actor.Actor._
45
import akka.routing.CyclicIterator
46
import akka.routing.Routing._
47
import akka.dispatch.Dispatchers
48
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
49
import akka.config.Supervision.SupervisorConfig
50
import akka.config.Supervision.OneForOneStrategy
51
import java.util.concurrent.ConcurrentSkipListSet
52
import gr.grnet.aquarium.messaging.{MessagingNames, AkkaAMQP}
53
import akka.amqp._
54

    
55
/**
56
 * An actor that gets events from the queue, stores them persistently
57
 * and forwards them for further processing.
58
 *
59
 * @author Georgios Gousios <gousiosg@gmail.com>
60
 */
61
class ResourceEventProcessorService extends AkkaAMQP with Loggable
62
with Lifecycle {
63

    
64
  case class AckData(msgId: String, deliveryTag: Long, queue: ActorRef)
65
  case class Persist(event: ResourceEvent, sender: ActorRef, ackData: AckData)
66
  case class PersistOK(ackData: AckData)
67
  case class PersistFailed(ackData: AckData)
68
  case class Duplicate(ackData: AckData)
69

    
70
  val redeliveries = new ConcurrentSkipListSet[String]()
71

    
72
  class QueueReader extends Actor {
73

    
74
    def receive = {
75
      case Delivery(payload, _, deliveryTag, isRedeliver, _, queue) =>
76
        val event = ResourceEvent.fromBytes(payload)
77
        if (isRedeliver) {
78
          //Message could not be processed 3 times, just
79
          if (redeliveries.contains(event.id)) {
80
            logger.warn("Event[%s] msg[%d] redelivered >2 times. Rejecting".format(event, deliveryTag))
81
            queue ! Reject(deliveryTag, false)
82
            redeliveries.remove(event.id)
83
          } else {
84
            //Redeliver, but keep track of the message
85
            redeliveries.add(event.id)
86
            PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
87
          }
88
        } else
89
          PersisterManager.lb ! Persist(event, QueueReaderManager.lb, AckData(event.id, deliveryTag, queue.get))
90

    
91
      case PersistOK(ackData) =>
92
        logger.debug("Stored event[%s] msg[%d]".format(ackData.msgId, ackData.deliveryTag))
93
        ackData.queue ! Acknowledge(ackData.deliveryTag)
94

    
95
      case PersistFailed(ackData) =>
96
        //Give the message a chance to be processed by other processors
97
        logger.debug("Storing event[%s] msg[%d] failed".format(ackData.msgId, ackData.deliveryTag))
98
        ackData.queue ! Reject(ackData.deliveryTag, true)
99

    
100
      case Duplicate(ackData) =>
101
        logger.debug("Event[%s] msg[%d] is duplicate".format(ackData.msgId, ackData.deliveryTag))
102
        ackData.queue ! Reject(ackData.deliveryTag, false)
103

    
104
      case Acknowledged(deliveryTag) =>
105
        logger.debug("Msg with tag [%d] acked".format(deliveryTag))
106
      //TODO: Forward to the dispatcher
107

    
108
      case Rejected(deliveryTag) =>
109
        logger.debug("Msg with tag [%d] rejected".format(deliveryTag))
110

    
111
      case _ => logger.warn("Unknown message")
112
    }
113

    
114
    self.dispatcher = QueueReaderManager.dispatcher
115
  }
116

    
117
  class Persister extends Actor {
118

    
119
    def receive = {
120
      case Persist(event, sender, ackData) =>
121
        if (exists(event))
122
          sender ! Duplicate(ackData)
123
        else if (persist(event))
124
          sender ! PersistOK(ackData)
125
        else
126
          sender ! PersistFailed(ackData)
127
      case _ => logger.warn("Unknown message")
128
    }
129

    
130
    def exists(event: ResourceEvent): Boolean =
131
      Configurator.MasterConfigurator.resourceEventStore.findResourceEventById(event.id).isJust
132

    
133
    def persist(event: ResourceEvent): Boolean = {
134
      Configurator.MasterConfigurator.resourceEventStore.storeResourceEvent(event) match {
135
        case Just(x) => true
136
        case x: Failed =>
137
          logger.error("Could not save event: %s".format(event))
138
          false
139
        case NoVal => false
140
      }
141
    }
142

    
143
    self.dispatcher = PersisterManager.dispatcher
144
  }
145

    
146
  lazy val supervisor = Supervisor(SupervisorConfig(
147
    OneForOneStrategy(
148
      List(classOf[Exception]), //What exceptions will be handled
149
      3, // maximum number of restart retries
150
      5000 // within time in millis
151
    ), Nil
152

    
153
  ))
154

    
155
  object QueueReaderManager {
156
    val numCPUs = Runtime.getRuntime.availableProcessors
157
    var actors: List[ActorRef] = _
158
    lazy val lb = loadBalancerActor(new CyclicIterator(actors))
159

    
160
    lazy val dispatcher =
161
      Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("QueueReaderDispatcher")
162
        .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
163
        .setMaxPoolSize(numCPUs)
164
        .setCorePoolSize(2)
165
        .setKeepAliveTimeInMillis(60000)
166
        .setRejectionPolicy(new CallerRunsPolicy).build
167

    
168
    def start() = {
169
      actors = {
170
        for (i <- 0 until numCPUs) yield {
171
          val actor = actorOf(new QueueReader)
172
          supervisor.link(actor)
173
          actor.start()
174
          actor
175
        }
176
      }.toList
177
    }
178

    
179
    def stop() = dispatcher.stopAllAttachedActors
180
  }
181

    
182
  object PersisterManager {
183
    val numCPUs = Runtime.getRuntime.availableProcessors
184
    var actors: List[ActorRef] = _
185

    
186
    lazy val lb = loadBalancerActor(new CyclicIterator(actors))
187

    
188
    val dispatcher =
189
      Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("PersisterDispatcher")
190
        .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
191
        .setMaxPoolSize(numCPUs)
192
        .setCorePoolSize(2)
193
        .setKeepAliveTimeInMillis(60000)
194
        .setRejectionPolicy(new CallerRunsPolicy).build
195

    
196
    def start() = {
197
      actors = {
198
        for (i <- 0 until numCPUs) yield {
199
          val actor = actorOf(new Persister)
200
          supervisor.link(actor)
201
          actor.start()
202
          actor
203
        }
204
      }.toList
205
    }
206

    
207
    def stop() = dispatcher.stopAllAttachedActors
208
  }
209

    
210
  def start() {
211
    logger.info("Starting resource event processor service")
212

    
213
    QueueReaderManager.start()
214
    PersisterManager.start()
215

    
216
    consumer("%s.#".format(MessagingNames.RES_EVENT_KEY),
217
      MessagingNames.RESOURCE_EVENT_QUEUE, MessagingNames.AQUARIUM_EXCHANGE,
218
      QueueReaderManager.lb, false)
219
  }
220

    
221
  def stop() {
222
    QueueReaderManager.stop()
223
    PersisterManager.stop()
224

    
225
    logger.info("Stopping resource event processor service")
226
  }
227
}