GenericPayloadHandler: one type less
[aquarium] / src / main / scala / gr / grnet / aquarium / connector / handler / IMEventPayloadHandler.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.connector.handler
37
38 import gr.grnet.aquarium.Aquarium
39 import gr.grnet.aquarium.actor.message.event.ProcessIMEvent
40 import gr.grnet.aquarium.converter.JsonTextFormat
41 import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
42 import gr.grnet.aquarium.store.LocalFSEventStore
43 import gr.grnet.aquarium.util.{LogHelpers, Tags}
44 import org.slf4j.Logger
45
46 /**
47  * A [[gr.grnet.aquarium.connector.handler.PayloadHandler]] for
48  * [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
49  *
50  * @author Christos KK Loverdos <loverdos@gmail.com>
51  */
52
53 class IMEventPayloadHandler(aquarium: Aquarium, logger: Logger)
54   extends GenericPayloadHandler[IMEventModel](
55       // jsonParser: Array[Byte] ⇒ JsonTextFormat
56       payload ⇒ {
57         aquarium.converters.convertEx[JsonTextFormat](payload)
58       },
59
60       // onJsonParserSuccess: (Array[Byte], JsonTextFormat) ⇒ Unit
61       (payload, jsonTextFormat) ⇒ {
62       },
63
64       // onJsonParserError: (Array[Byte], Throwable) ⇒ Unit
65       (payload, error) ⇒ {
66         val errMsg = "Error creating JSON from %s payload".format(Tags.IMEventTag)
67         LogHelpers.logChainOfCausesAndException(logger, error, errMsg)
68
69         LocalFSEventStore.storeUnparsedIMEvent(aquarium, payload, error)
70       },
71
72       // eventParser: JsonTextFormat ⇒ E
73       jsonTextFormat ⇒ {
74         StdIMEvent.fromJsonTextFormat(jsonTextFormat)
75       },
76
77       // onEventParserSuccess: (Array[Byte], E) ⇒ Unit
78       (payload, event) ⇒ {
79         LocalFSEventStore.storeIMEvent(aquarium, event, payload)
80       },
81
82       // onEventParserError: (Array[Byte], Throwable) ⇒ Unit
83       (payload, error) ⇒ {
84         val errMsg = "Error creating object model from %s payload".format(Tags.IMEventTag)
85         LogHelpers.logChainOfCausesAndException(logger, error, errMsg)
86
87         LocalFSEventStore.storeUnparsedIMEvent(aquarium, payload, error)
88       },
89
90       // preSaveAction: E ⇒ Option[HandlerResult]
91       imEvent ⇒ {
92         val id = imEvent.id
93
94         // Let's decide if it is OK to store the event
95         // Remember that OK == None as the returning result
96         //
97         // NOTE: If anything goes wrong with this function, then the handler
98         //       (handlePayload in GenericPayloadHandler) will issue a Resend,
99         //       so do not bother to catch exceptions here.
100
101         // 1. Check if the same ID exists. Note that we use the ID sent by the event producer.
102         //    It is a requirement that this ID is unique.
103         val store = aquarium.imEventStore
104
105         val imEventDebugString = imEvent.toDebugString
106
107         store.findIMEventByID(id) match {
108           case Some(_) ⇒
109            // Reject the duplicate
110            logger.debug("Rejecting duplicate ID for %s".format(imEventDebugString))
111            Some(HandlerResultReject("Duplicate ID for %s".format(imEventDebugString)))
112
113           case None ⇒
114             // No duplicate. Find the CREATE event if any
115             val userID = imEvent.userID
116             val createIMEventOpt = store.findCreateIMEventByUserID(userID)
117             val userHasBeenCreated = createIMEventOpt.isDefined
118             val isCreateUser = imEvent.isCreateUser
119
120             (userHasBeenCreated, isCreateUser) match {
121               case (true, true) ⇒
122                 // (User CREATEd, CREATE event)
123                 val reason = "User %s is already created. Rejecting %s".format(userID, imEventDebugString)
124                 logger.warn(reason)
125                 Some(HandlerResultReject(reason))
126
127               case (true, false) ⇒
128                 // (User CREATEd, MODIFY event)
129                 // Everything BEFORE the CREATE event is rejected
130                 val createIMEvent = createIMEventOpt.get
131                 if(imEvent.occurredMillis < createIMEvent.occurredMillis) {
132                   val reason = "IMEvent(id=%s) is before the creation event (id=%s). Rejecting".format(
133                     imEvent.id,
134                     createIMEvent.id
135                   )
136                   logger.warn(reason)
137                   Some(HandlerResultReject(reason))
138                 }
139                 else {
140                   None
141                 }
142
143               case (false, true) ⇒
144                 // (User not CREATEd, CREATE event)
145                 logger.info("User created by %s".format(imEventDebugString))
146                 None
147
148               case (false, false) ⇒
149                 // (User not CREATEd, MODIFY event)
150                 // We allow any older modification events until the user is created
151                 logger.debug("User not created yet. Processing %s".format(imEventDebugString))
152                 None
153             }
154         }
155       },
156
157       // saveAction: E ⇒ S
158       imEvent ⇒ {
159         aquarium.imEventStore.insertIMEvent(imEvent)
160       },
161
162       // forwardAction: S ⇒ Unit
163       imEvent ⇒ {
164         aquarium.akkaService.getOrCreateUserActor(imEvent.userID) ! ProcessIMEvent(imEvent)
165       }
166     )