40eb921e29c9d6c433f0cecf197b10fa361bd6bb
[aquarium] / src / main / scala / gr / grnet / aquarium / service / IMEventProcessorService.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.service
37
38 import com.ckkloverdos.maybe.{Maybe, NoVal, Failed, Just}
39
40 import gr.grnet.aquarium.actor.DispatcherRole
41 import gr.grnet.aquarium.Configurator.Keys
42 import gr.grnet.aquarium.store.LocalFSEventStore
43 import gr.grnet.aquarium.Configurator
44 import gr.grnet.aquarium.actor.message.service.dispatcher.ProcessUserEvent
45 import gr.grnet.aquarium.events.IMEvent
46 import gr.grnet.aquarium.util.date.TimeHelpers
47 import gr.grnet.aquarium.util.{LogHelpers, makeString}
48
49 /**
50  * An event processor service for user events coming from the IM system
51  *
52  * @author Georgios Gousios <gousiosg@gmail.com>
53  */
54 class IMEventProcessorService extends EventProcessorService[IMEvent] {
55
56   override def decode(data: Array[Byte]) = IMEvent.fromBytes(data)
57
58   override def forward(event: IMEvent) = {
59     if(event ne null) {
60       _configurator.actorProvider.actorForRole(DispatcherRole) ! ProcessUserEvent(event)
61     }
62   }
63
64   override def exists(event: IMEvent) =
65     _configurator.userEventStore.findIMEventById(event.id).isJust
66
67   override def persist(event: IMEvent, initialPayload: Array[Byte]) = {
68     Maybe {
69       LocalFSEventStore.storeIMEvent(_configurator, event, initialPayload)
70     } match {
71       case Just(_) ⇒
72         _configurator.userEventStore.storeIMEvent(event) match {
73           case Just(x) => true
74           case x: Failed =>
75             logger.error("Could not save user event: %s".format(event))
76             false
77           case NoVal => false
78         }
79
80       case failed@Failed(e) ⇒
81         logger.error("While LocalFSEventStore.storeUserEvent", e)
82         false
83
84       case _ ⇒
85         false
86     }
87   }
88
89   protected def persistUnparsed(initialPayload: Array[Byte], exception: Throwable): Unit = {
90     val json = makeString(initialPayload)
91
92     LocalFSEventStore.storeUnparsedIMEvent(_configurator, initialPayload, exception)
93
94     _configurator.props.getBoolean(Configurator.Keys.save_unparsed_event_im) match {
95       case Just(true) ⇒
96         val recordIDM = _configurator.userEventStore.storeUnparsed(json)
97         logger.info("Saved unparsed {}", recordIDM)
98       case _ ⇒
99     }
100   }
101
102   override def queueReaderThreads: Int = 1
103
104   override def persisterThreads: Int = numCPUs
105
106   protected def numQueueActors = 2 * queueReaderThreads
107
108   protected def numPersisterActors = 2 * persisterThreads
109
110   override def name = "usrevtproc"
111
112   lazy val persister = new PersisterManager
113   lazy val queueReader = new QueueReaderManager
114
115   override def persisterManager = persister
116
117   override def queueReaderManager = queueReader
118
119   def start() {
120     logStarting()
121     val (ms0, ms1, _) = TimeHelpers.timed {
122       declareQueues(Keys.amqp_userevents_queues)
123     }
124     logStarted(ms0, ms1)
125   }
126
127   def stop() {
128     logStopping()
129     val (ms0, ms1, _) = TimeHelpers.timed {
130       queueReaderManager.stop()
131       persisterManager.stop()
132     }
133     logStopped(ms0, ms1)
134   }
135 }