root / src / main / scala / gr / grnet / aquarium / processor / actor / ResourceEventProcessorService.scala @ ad8d8db2
History | View | Annotate | Download (2 kB)
1 |
package gr.grnet.aquarium.processor.actor |
---|---|
2 |
|
3 |
import com.ckkloverdos.maybe.{Just, Failed, NoVal} |
4 |
import gr.grnet.aquarium.messaging.MessagingNames |
5 |
import gr.grnet.aquarium.logic.events.{AquariumEvent, ResourceEvent} |
6 |
import gr.grnet.aquarium.actor.DispatcherRole |
7 |
|
8 |
|
9 |
/** |
10 |
* An event processor service for resource events |
11 |
* |
12 |
* @author Georgios Gousios <gousiosg@gmail.com> |
13 |
*/ |
14 |
final class ResourceEventProcessorService extends EventProcessorService { |
15 |
|
16 |
override def decode(data: Array[Byte]) : AquariumEvent = ResourceEvent.fromBytes(data) |
17 |
|
18 |
override def forward(evt: AquariumEvent): Unit = { |
19 |
val resourceEvent = evt.asInstanceOf[ResourceEvent] |
20 |
val businessLogicDispacther = _configurator.actorProvider.actorForRole(DispatcherRole) |
21 |
businessLogicDispacther ! ProcessResourceEvent(resourceEvent) |
22 |
} |
23 |
|
24 |
override def exists(event: AquariumEvent): Boolean = |
25 |
_configurator.resourceEventStore.findResourceEventById(event.id).isJust |
26 |
|
27 |
override def persist(evt: AquariumEvent): Boolean = { |
28 |
val event = evt.asInstanceOf[ResourceEvent] |
29 |
_configurator.resourceEventStore.storeResourceEvent(event) match { |
30 |
case Just(x) => true |
31 |
case x: Failed => |
32 |
logger.error("Could not save event: %s. Reason:".format(event, x.toString)) |
33 |
false |
34 |
case NoVal => false |
35 |
} |
36 |
} |
37 |
|
38 |
override def queueReaderThreads: Int = 4 |
39 |
override def persisterThreads: Int = numCPUs |
40 |
override def name = "resevtproc" |
41 |
|
42 |
lazy val persister = new PersisterManager |
43 |
lazy val queueReader = new QueueReaderManager |
44 |
|
45 |
override def persisterManager = persister |
46 |
override def queueReaderManager = queueReader |
47 |
|
48 |
def start() { |
49 |
logger.info("Starting resource event processor service") |
50 |
|
51 |
consumer("%s.#".format(MessagingNames.RES_EVENT_KEY), |
52 |
MessagingNames.RESOURCE_EVENT_QUEUE, MessagingNames.AQUARIUM_EXCHANGE, |
53 |
queueReaderManager.lb, false) |
54 |
} |
55 |
|
56 |
def stop() { |
57 |
queueReaderManager.stop() |
58 |
persisterManager.stop() |
59 |
|
60 |
logger.info("Stopping resource event processor service") |
61 |
} |
62 |
} |