Statistics
| Branch: | Tag: | Revision:

root / src / main / scala / gr / grnet / aquarium / processor / actor / ResourceEventProcessorService.scala @ ad8d8db2

History | View | Annotate | Download (2 kB)

1
package gr.grnet.aquarium.processor.actor
2

    
3
import com.ckkloverdos.maybe.{Just, Failed, NoVal}
4
import gr.grnet.aquarium.messaging.MessagingNames
5
import gr.grnet.aquarium.logic.events.{AquariumEvent, ResourceEvent}
6
import gr.grnet.aquarium.actor.DispatcherRole
7

    
8

    
9
/**
10
 * An event processor service for resource events
11
 *
12
 * @author Georgios Gousios <gousiosg@gmail.com>
13
 */
14
final class ResourceEventProcessorService extends EventProcessorService {
15

    
16
  override def decode(data: Array[Byte]) : AquariumEvent = ResourceEvent.fromBytes(data)
17

    
18
  override def forward(evt: AquariumEvent): Unit = {
19
    val resourceEvent = evt.asInstanceOf[ResourceEvent]
20
    val businessLogicDispacther = _configurator.actorProvider.actorForRole(DispatcherRole)
21
    businessLogicDispacther ! ProcessResourceEvent(resourceEvent)
22
  }
23

    
24
  override def exists(event: AquariumEvent): Boolean =
25
    _configurator.resourceEventStore.findResourceEventById(event.id).isJust
26

    
27
  override def persist(evt: AquariumEvent): Boolean = {
28
    val event = evt.asInstanceOf[ResourceEvent]
29
    _configurator.resourceEventStore.storeResourceEvent(event) match {
30
      case Just(x) => true
31
      case x: Failed =>
32
        logger.error("Could not save event: %s. Reason:".format(event, x.toString))
33
        false
34
      case NoVal => false
35
    }
36
  }
37

    
38
  override def queueReaderThreads: Int = 4
39
  override def persisterThreads: Int = numCPUs
40
  override def name = "resevtproc"
41

    
42
  lazy val persister = new PersisterManager
43
  lazy val queueReader = new QueueReaderManager
44

    
45
  override def persisterManager   = persister
46
  override def queueReaderManager = queueReader
47

    
48
  def start() {
49
    logger.info("Starting resource event processor service")
50

    
51
    consumer("%s.#".format(MessagingNames.RES_EVENT_KEY),
52
      MessagingNames.RESOURCE_EVENT_QUEUE, MessagingNames.AQUARIUM_EXCHANGE,
53
      queueReaderManager.lb, false)
54
  }
55

    
56
  def stop() {
57
    queueReaderManager.stop()
58
    persisterManager.stop()
59

    
60
    logger.info("Stopping resource event processor service")
61
  }
62
}