root / src / main / scala / gr / grnet / aquarium / connector / handler / IMEventPayloadHandler.scala @ 33e4a30e
History | View | Annotate | Download (7.5 kB)
1 |
/* |
---|---|
2 |
* Copyright 2011-2012 GRNET S.A. All rights reserved. |
3 |
* |
4 |
* Redistribution and use in source and binary forms, with or |
5 |
* without modification, are permitted provided that the following |
6 |
* conditions are met: |
7 |
* |
8 |
* 1. Redistributions of source code must retain the above |
9 |
* copyright notice, this list of conditions and the following |
10 |
* disclaimer. |
11 |
* |
12 |
* 2. Redistributions in binary form must reproduce the above |
13 |
* copyright notice, this list of conditions and the following |
14 |
* disclaimer in the documentation and/or other materials |
15 |
* provided with the distribution. |
16 |
* |
17 |
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
18 |
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
19 |
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
20 |
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
21 |
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
22 |
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
23 |
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
24 |
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
25 |
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
26 |
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
27 |
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
28 |
* POSSIBILITY OF SUCH DAMAGE. |
29 |
* |
30 |
* The views and conclusions contained in the software and |
31 |
* documentation are those of the authors and should not be |
32 |
* interpreted as representing official policies, either expressed |
33 |
* or implied, of GRNET S.A. |
34 |
*/ |
35 |
|
36 |
package gr.grnet.aquarium.connector.handler |
37 |
|
38 |
import gr.grnet.aquarium.Aquarium |
39 |
import org.slf4j.Logger |
40 |
import gr.grnet.aquarium.converter.JsonTextFormat |
41 |
import gr.grnet.aquarium.actor.RouterRole |
42 |
import gr.grnet.aquarium.store.{IMEventStore, LocalFSEventStore} |
43 |
import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel} |
44 |
import gr.grnet.aquarium.actor.message.event.ProcessIMEvent |
45 |
import gr.grnet.aquarium.util.date.MutableDateCalc |
46 |
import gr.grnet.aquarium.util.{LogHelpers, Tags, shortClassNameOf} |
47 |
|
48 |
/** |
49 |
* A [[gr.grnet.aquarium.connector.handler.PayloadHandler]] for |
50 |
* [[gr.grnet.aquarium.event.model.im.IMEventModel]]s. |
51 |
* |
52 |
* @author Christos KK Loverdos <loverdos@gmail.com> |
53 |
*/ |
54 |
|
55 |
class IMEventPayloadHandler(aquarium: Aquarium, logger: Logger) |
56 |
extends GenericPayloadHandler[IMEventModel, IMEventStore#IMEvent]( |
57 |
// jsonParser: Array[Byte] ⇒ JsonTextFormat |
58 |
payload ⇒ { |
59 |
aquarium.converters.convertEx[JsonTextFormat](payload) |
60 |
}, |
61 |
|
62 |
// onJsonParserSuccess: (Array[Byte], JsonTextFormat) ⇒ Unit |
63 |
(payload, jsonTextFormat) ⇒ { |
64 |
}, |
65 |
|
66 |
// onJsonParserError: (Array[Byte], Throwable) ⇒ Unit |
67 |
(payload, error) ⇒ { |
68 |
val errMsg = "Error creating JSON from %s payload".format(Tags.IMEventTag) |
69 |
LogHelpers.logChainOfCauses(logger, error, errMsg) |
70 |
logger.error(errMsg, error) |
71 |
|
72 |
LocalFSEventStore.storeUnparsedIMEvent(aquarium, payload, error) |
73 |
}, |
74 |
|
75 |
// eventParser: JsonTextFormat ⇒ E |
76 |
jsonTextFormat ⇒ { |
77 |
StdIMEvent.fromJsonTextFormat(jsonTextFormat) |
78 |
}, |
79 |
|
80 |
// onEventParserSuccess: (Array[Byte], E) ⇒ Unit |
81 |
(payload, event) ⇒ { |
82 |
LocalFSEventStore.storeIMEvent(aquarium, event, payload) |
83 |
}, |
84 |
|
85 |
// onEventParserError: (Array[Byte], Throwable) ⇒ Unit |
86 |
(payload, error) ⇒ { |
87 |
val errMsg = "Error creating object model from %s payload".format(Tags.IMEventTag) |
88 |
LogHelpers.logChainOfCauses(logger, error, errMsg) |
89 |
logger.error(errMsg, error) |
90 |
|
91 |
LocalFSEventStore.storeUnparsedIMEvent(aquarium, payload, error) |
92 |
}, |
93 |
|
94 |
// preSaveAction: E ⇒ Option[HandlerResult] |
95 |
imEvent ⇒ { |
96 |
val id = imEvent.id |
97 |
val acceptMessage = None: Option[HandlerResult] |
98 |
|
99 |
// Let's decide if it is OK to store the event |
100 |
// Remember that OK == None as the returning result |
101 |
// |
102 |
// NOTE: If anything goes wrong with this function, then the handler will issue a Resend, so |
103 |
// do not bother to catch exceptions here. |
104 |
|
105 |
// 1. Check if the same ID exists. Note that we use the ID sent by the event producer. |
106 |
// It is a requirement that this ID is unique. |
107 |
val store = aquarium.imEventStore |
108 |
|
109 |
val imEventDebugString = imEvent.toDebugString |
110 |
|
111 |
store.findIMEventById(id) match { |
112 |
case Some(_) ⇒ |
113 |
// Reject the duplicate |
114 |
logger.debug("Rejecting duplicate ID for %s".format(imEventDebugString)) |
115 |
Some(HandlerResultReject("Duplicate ID for %s".format(imEventDebugString))) |
116 |
|
117 |
case None ⇒ |
118 |
// 2. Check that the new event is not older than our most recent event in DB. |
119 |
// Sorry. We cannot tolerate out-of-order events here, since they really mess with the |
120 |
// agreements selection and thus with the charging procedure. |
121 |
// |
122 |
// 2.1 The only exception is the very first activation ever. We allow late arrival, since |
123 |
// the rest of Aquarium does nothing (but accumulate events) if the user has never |
124 |
// been activated. |
125 |
// |
126 |
// TODO: We really need to store these bad events anyway but somewhere else (BadEventsStore?) |
127 |
val userID = imEvent.userID |
128 |
|
129 |
store.findLatestIMEventByUserID(userID) match { |
130 |
case Some(latestStoredEvent) ⇒ |
131 |
|
132 |
val occurredMillis = imEvent.occurredMillis |
133 |
val latestOccurredMillis = latestStoredEvent.occurredMillis |
134 |
|
135 |
if(occurredMillis < latestOccurredMillis) { |
136 |
// OK this is older than our most recent event. Essentially a glimpse in the past. |
137 |
def rejectMessage = { |
138 |
val occurredDebugString = new MutableDateCalc(occurredMillis).toYYYYMMDDHHMMSSSSS |
139 |
val latestOccurredDebugString = new MutableDateCalc(latestOccurredMillis).toYYYYMMDDHHMMSSSSS |
140 |
|
141 |
val formatter = (x: String) ⇒ x.format( |
142 |
imEventDebugString, |
143 |
occurredDebugString, |
144 |
latestOccurredDebugString |
145 |
) |
146 |
|
147 |
logger.debug(formatter("Rejecting newer %s. [%s] < [%s]")) |
148 |
|
149 |
Some(HandlerResultReject(formatter("Newer %s. [%s] < [%s]"))) |
150 |
} |
151 |
|
152 |
// Has the user been activated before? |
153 |
store.findFirstIsActiveIMEventByUserID(userID) match { |
154 |
case Some(_) ⇒ |
155 |
// Yes, so the new event must be rejected |
156 |
rejectMessage |
157 |
|
158 |
case None ⇒ |
159 |
// No. Process the new event only if it is an activation. |
160 |
if(imEvent.isActive) { |
161 |
logger.info("First activation %s".format(imEventDebugString)) |
162 |
acceptMessage |
163 |
} else { |
164 |
rejectMessage |
165 |
} |
166 |
} |
167 |
} else { |
168 |
// We accept all newer events |
169 |
acceptMessage |
170 |
} |
171 |
|
172 |
case None ⇒ |
173 |
// This is the very first event ever |
174 |
logger.info("First ever %s".format(imEventDebugString)) |
175 |
acceptMessage |
176 |
} |
177 |
} |
178 |
}, |
179 |
|
180 |
// saveAction: E ⇒ S |
181 |
imEvent ⇒ { |
182 |
aquarium.imEventStore.insertIMEvent(imEvent) |
183 |
}, |
184 |
|
185 |
// forwardAction: S ⇒ Unit |
186 |
imEvent ⇒ { |
187 |
aquarium.actorProvider.actorForRole(RouterRole) ! ProcessIMEvent(imEvent) |
188 |
} |
189 |
) |