Dispatcher -> Router (II)
[aquarium] / src / main / scala / gr / grnet / aquarium / service / IMEventProcessorService.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.service
37
38
39 import gr.grnet.aquarium.actor.RouterRole
40 import gr.grnet.aquarium.Configurator.Keys
41 import gr.grnet.aquarium.store.LocalFSEventStore
42 import gr.grnet.aquarium.actor.message.service.router.ProcessIMEvent
43 import gr.grnet.aquarium.util.date.TimeHelpers
44 import gr.grnet.aquarium.util.makeString
45 import com.ckkloverdos.maybe._
46 import gr.grnet.aquarium.event.im.IMEventModel
47 import gr.grnet.aquarium.store.memory.MemStore
48
49 /**
50  * An event processor service for user events coming from the IM system
51  *
52  * @author Georgios Gousios <gousiosg@gmail.com>
53  */
54 class IMEventProcessorService extends EventProcessorService[IMEventModel] {
55
56   override def parseJsonBytes(data: Array[Byte]) = {
57     MemStore.createIMEventFromJsonBytes(data)
58   }
59
60   override def forward(event: IMEventModel) = {
61     if(event ne null) {
62       _configurator.actorProvider.actorForRole(RouterRole) ! ProcessIMEvent(event)
63     }
64   }
65
66   override def existsInStore(event: IMEventModel) =
67     _configurator.imEventStore.findIMEventById(event.id).isJust
68
69   override def storeParsedEvent(event: IMEventModel, initialPayload: Array[Byte]) = {
70     // 1. Store to local FS for debugging purposes.
71     //    BUT be resilient to errors, since this is not critical
72     if(_configurator.eventsStoreFolder.isJust) {
73       Maybe {
74         LocalFSEventStore.storeIMEvent(_configurator, event, initialPayload)
75       }
76     }
77
78     // 2. Store to DB
79     _configurator.imEventStore.insertIMEvent(event)
80   }
81
82   protected def storeUnparsedEvent(initialPayload: Array[Byte], exception: Throwable): Unit = {
83     val json = makeString(initialPayload)
84
85     LocalFSEventStore.storeUnparsedIMEvent(_configurator, initialPayload, exception)
86   }
87
88   override def queueReaderThreads: Int = 1
89
90   override def persisterThreads: Int = numCPUs
91
92   protected def numQueueActors = 2 * queueReaderThreads
93
94   protected def numPersisterActors = 2 * persisterThreads
95
96   override def name = "usrevtproc"
97
98   lazy val persister = new PersisterManager
99   lazy val queueReader = new QueueReaderManager
100
101   override def persisterManager = persister
102
103   override def queueReaderManager = queueReader
104
105   def start() {
106     logStarting()
107     val (ms0, ms1, _) = TimeHelpers.timed {
108       declareQueues(Keys.amqp_userevents_queues)
109     }
110     logStarted(ms0, ms1)
111   }
112
113   def stop() {
114     logStopping()
115     val (ms0, ms1, _) = TimeHelpers.timed {
116       queueReaderManager.stop()
117       persisterManager.stop()
118     }
119     logStopped(ms0, ms1)
120   }
121 }