Now using schema-based messages
[aquarium] / src / main / scala / gr / grnet / aquarium / connector / handler / GenericPayloadHandler.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.connector.handler
37
38 import com.ckkloverdos.maybe.{Just, Failed, MaybeEither}
39 import gr.grnet.aquarium.converter.JsonTextFormat
40 import gr.grnet.aquarium.util.{LogHelpers, Loggable, safeUnit, shortInfoOf, shortClassNameOf}
41
42 /**
43  * Generic handler of events arriving to Aquarium.
44  *
45  * @author Christos KK Loverdos <loverdos@gmail.com>
46  */
47
48 class GenericPayloadHandler[E <: AnyRef: ClassManifest](
49     /**
50      * Parses payload bytes to a JSON string.
51      * The incoming payload must be in UTF-8.
52      */
53     jsonParser: Array[Byte] ⇒ JsonTextFormat,
54
55     /**
56      * This is called if no error happens while parsing to JSON.
57      */
58     onJsonParserSuccess: (Array[Byte], JsonTextFormat) ⇒ Unit,
59
60     /**
61      * This is called if an error happens while parsing to JSON.
62      */
63     onJsonParserError: (Array[Byte], Throwable) ⇒ Unit,
64
65     /**
66      * Parses JSON into a domain object.
67      */
68     eventParser: JsonTextFormat ⇒ E,
69
70     /**
71      * This is called if no error happens while parsing to a domain object.
72      */
73     onEventParserSuccess: (Array[Byte], E) ⇒ Unit,
74
75     /**
76      * This is called if an error happens while parsing to a domain object.
77      */
78     onEventParserError: (Array[Byte], Throwable) ⇒ Unit,
79
80     /**
81      * This is called with the parsed domain object as a final check before saving to DB.
82      * If the result is `None`, then we proceed with the `saveAction` else the returned
83      * [[gr.grnet.aquarium.connector.handler.HandlerResult]] is communicated back from the
84      * `handlePayload` method.
85      */
86     preSaveAction: E ⇒ Option[HandlerResult],
87
88     /**
89      * Saves the parsed domain object to DB. Returns the saved domain object.
90      */
91     saveAction: E ⇒ E,
92
93     /**
94      * Forwards the saved domain object for further processing.
95      */
96     forwardAction: E ⇒ Unit) extends PayloadHandler with Loggable {
97
98   /**
99    * This is the core business logic that Aquarium applies to an incoming event.
100    * The method is marked `final` to indicate that the business logic is fixed
101    * and any parameterization must happen via the constructor parameters.
102    *
103    * The implementation is careful to catch any exceptions and return the proper result.
104    */
105   final def handlePayload(payload: Array[Byte]): HandlerResult = {
106     // 1. try to parse as json
107     MaybeEither {
108       jsonParser(payload)
109     } match {
110       case Failed(e) ⇒
111         safeUnit(onJsonParserError(payload, e))
112
113         HandlerResultReject(e.getMessage)
114
115       case Just(jsonTextFormat) ⇒
116         safeUnit(onJsonParserSuccess(payload, jsonTextFormat))
117
118         // 2. try to parse as model
119         MaybeEither {
120           eventParser(jsonTextFormat)
121         } match {
122           case Failed(e) ⇒
123             safeUnit(onEventParserError(payload, e))
124
125             HandlerResultReject(e.getMessage)
126
127           case Just(event) ⇒
128             safeUnit(onEventParserSuccess(payload, event))
129
130             // 3. See if we are ready to save to DB
131             MaybeEither {
132               preSaveAction(event)
133             } match {
134               case Failed(e) ⇒
135                 val errMsg = "While running preSaveAction(%s) from %s".format(
136                   shortClassNameOf(event),
137                   shortClassNameOf(this))
138
139                 LogHelpers.logChainOfCauses(logger, e, errMsg)
140                 logger.error(errMsg, e)
141
142                 // oops. must resend this message due to unexpected result
143                 HandlerResultResend
144
145               case Just(Some(handlerResult)) ⇒
146                 // Nope. Not ready to save.
147                 handlerResult
148
149               case Just(None) ⇒
150                 // Yep. Ready to save
151                 // 4. try to save to DB
152                 MaybeEither {
153                   saveAction(event)
154                 } match {
155                   case Failed(e) ⇒
156                     val errMsg = "While running saveAction(%s) from %s".format(
157                       shortClassNameOf(event),
158                       shortClassNameOf(this))
159
160                     LogHelpers.logChainOfCauses(logger, e, errMsg)
161                     logger.error(errMsg, e)
162
163                     HandlerResultPanic(shortInfoOf(e))
164
165                   case Just(s) ⇒
166                     // 4. try forward but it's OK if something bad happens here.
167                     safeUnit {
168                       forwardAction(s)
169                     }
170
171                     HandlerResultSuccess
172                 }
173             }
174         }
175     }
176   }
177 }