Statistics
| Branch: | Tag: | Revision:

root / src / main / scala / gr / grnet / aquarium / connector / handler / IMEventPayloadHandler.scala @ 33e4a30e

History | View | Annotate | Download (7.5 kB)

1
/*
2
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *
12
 *   2. Redistributions in binary form must reproduce the above
13
 *      copyright notice, this list of conditions and the following
14
 *      disclaimer in the documentation and/or other materials
15
 *      provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
 * POSSIBILITY OF SUCH DAMAGE.
29
 *
30
 * The views and conclusions contained in the software and
31
 * documentation are those of the authors and should not be
32
 * interpreted as representing official policies, either expressed
33
 * or implied, of GRNET S.A.
34
 */
35

    
36
package gr.grnet.aquarium.connector.handler
37

    
38
import gr.grnet.aquarium.Aquarium
39
import org.slf4j.Logger
40
import gr.grnet.aquarium.converter.JsonTextFormat
41
import gr.grnet.aquarium.actor.RouterRole
42
import gr.grnet.aquarium.store.{IMEventStore, LocalFSEventStore}
43
import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
44
import gr.grnet.aquarium.actor.message.event.ProcessIMEvent
45
import gr.grnet.aquarium.util.date.MutableDateCalc
46
import gr.grnet.aquarium.util.{LogHelpers, Tags, shortClassNameOf}
47

    
48
/**
49
 * A [[gr.grnet.aquarium.connector.handler.PayloadHandler]] for
50
 * [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
51
 *
52
 * @author Christos KK Loverdos <loverdos@gmail.com>
53
 */
54

    
55
class IMEventPayloadHandler(aquarium: Aquarium, logger: Logger)
56
  extends GenericPayloadHandler[IMEventModel, IMEventStore#IMEvent](
57
      // jsonParser: Array[Byte] ⇒ JsonTextFormat
58
      payload ⇒ {
59
        aquarium.converters.convertEx[JsonTextFormat](payload)
60
      },
61

    
62
      // onJsonParserSuccess: (Array[Byte], JsonTextFormat) ⇒ Unit
63
      (payload, jsonTextFormat) ⇒ {
64
      },
65

    
66
      // onJsonParserError: (Array[Byte], Throwable) ⇒ Unit
67
      (payload, error) ⇒ {
68
        val errMsg = "Error creating JSON from %s payload".format(Tags.IMEventTag)
69
        LogHelpers.logChainOfCauses(logger, error, errMsg)
70
        logger.error(errMsg, error)
71

    
72
        LocalFSEventStore.storeUnparsedIMEvent(aquarium, payload, error)
73
      },
74

    
75
      // eventParser: JsonTextFormat ⇒ E
76
      jsonTextFormat ⇒ {
77
        StdIMEvent.fromJsonTextFormat(jsonTextFormat)
78
      },
79

    
80
      // onEventParserSuccess: (Array[Byte], E) ⇒ Unit
81
      (payload, event) ⇒ {
82
        LocalFSEventStore.storeIMEvent(aquarium, event, payload)
83
      },
84

    
85
      // onEventParserError: (Array[Byte], Throwable) ⇒ Unit
86
      (payload, error) ⇒ {
87
        val errMsg = "Error creating object model from %s payload".format(Tags.IMEventTag)
88
        LogHelpers.logChainOfCauses(logger, error, errMsg)
89
        logger.error(errMsg, error)
90

    
91
        LocalFSEventStore.storeUnparsedIMEvent(aquarium, payload, error)
92
      },
93

    
94
      // preSaveAction: E ⇒ Option[HandlerResult]
95
      imEvent ⇒ {
96
        val id = imEvent.id
97
        val acceptMessage = None: Option[HandlerResult]
98

    
99
        // Let's decide if it is OK to store the event
100
        // Remember that OK == None as the returning result
101
        //
102
        // NOTE: If anything goes wrong with this function, then the handler will issue a Resend, so
103
        //       do not bother to catch exceptions here.
104

    
105
        // 1. Check if the same ID exists. Note that we use the ID sent by the event producer.
106
        //    It is a requirement that this ID is unique.
107
        val store = aquarium.imEventStore
108

    
109
        val imEventDebugString = imEvent.toDebugString
110

    
111
        store.findIMEventById(id) match {
112
          case Some(_) ⇒
113
           // Reject the duplicate
114
           logger.debug("Rejecting duplicate ID for %s".format(imEventDebugString))
115
           Some(HandlerResultReject("Duplicate ID for %s".format(imEventDebugString)))
116

    
117
          case None ⇒
118
            // 2. Check that the new event is not older than our most recent event in DB.
119
            //    Sorry. We cannot tolerate out-of-order events here, since they really mess with the
120
            //    agreements selection and thus with the charging procedure.
121
            //
122
            // 2.1 The only exception is the very first activation ever. We allow late arrival, since
123
            //     the rest of Aquarium does nothing (but accumulate events) if the user has never
124
            //     been activated.
125
            //
126
            // TODO: We really need to store these bad events anyway but somewhere else (BadEventsStore?)
127
            val userID = imEvent.userID
128

    
129
            store.findLatestIMEventByUserID(userID) match {
130
              case Some(latestStoredEvent) ⇒
131

    
132
               val occurredMillis       = imEvent.occurredMillis
133
               val latestOccurredMillis = latestStoredEvent.occurredMillis
134

    
135
               if(occurredMillis < latestOccurredMillis) {
136
                  // OK this is older than our most recent event. Essentially a glimpse in the past.
137
                  def rejectMessage = {
138
                    val occurredDebugString       = new MutableDateCalc(occurredMillis).toYYYYMMDDHHMMSSSSS
139
                    val latestOccurredDebugString = new MutableDateCalc(latestOccurredMillis).toYYYYMMDDHHMMSSSSS
140

    
141
                    val formatter = (x: String) ⇒ x.format(
142
                      imEventDebugString,
143
                      occurredDebugString,
144
                      latestOccurredDebugString
145
                    )
146

    
147
                    logger.debug(formatter("Rejecting newer %s. [%s] < [%s]"))
148

    
149
                    Some(HandlerResultReject(formatter("Newer %s. [%s] < [%s]")))
150
                  }
151

    
152
                  // Has the user been activated before?
153
                  store.findFirstIsActiveIMEventByUserID(userID) match {
154
                    case Some(_) ⇒
155
                      // Yes, so the new event must be rejected
156
                      rejectMessage
157

    
158
                    case None ⇒
159
                      // No. Process the new event only if it is an activation.
160
                      if(imEvent.isActive) {
161
                       logger.info("First activation %s".format(imEventDebugString))
162
                       acceptMessage
163
                      } else {
164
                       rejectMessage
165
                      }
166
                  }
167
               } else {
168
                 // We accept all newer events
169
                 acceptMessage
170
               }
171

    
172
              case None ⇒
173
                // This is the very first event ever
174
                logger.info("First ever %s".format(imEventDebugString))
175
                acceptMessage
176
            }
177
        }
178
      },
179

    
180
      // saveAction: E ⇒ S
181
      imEvent ⇒ {
182
        aquarium.imEventStore.insertIMEvent(imEvent)
183
      },
184

    
185
      // forwardAction: S ⇒ Unit
186
      imEvent ⇒ {
187
        aquarium.actorProvider.actorForRole(RouterRole) ! ProcessIMEvent(imEvent)
188
      }
189
    )