2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium.connector.handler
38 import com.ckkloverdos.maybe.{Just, Failed, MaybeEither}
39 import gr.grnet.aquarium.converter.JsonTextFormat
40 import gr.grnet.aquarium.util.{LogHelpers, Loggable, safeUnit, shortInfoOf, shortClassNameOf}
43 * Generic handler of events arriving to Aquarium.
45 * @author Christos KK Loverdos <loverdos@gmail.com>
48 class GenericPayloadHandler[E <: AnyRef: ClassManifest](
50 * Parses payload bytes to a JSON string.
51 * The incoming payload must be in UTF-8.
53 jsonParser: Array[Byte] ⇒ JsonTextFormat,
56 * This is called if no error happens while parsing to JSON.
58 onJsonParserSuccess: (Array[Byte], JsonTextFormat) ⇒ Unit,
61 * This is called if an error happens while parsing to JSON.
63 onJsonParserError: (Array[Byte], Throwable) ⇒ Unit,
66 * Parses JSON into a domain object.
68 eventParser: JsonTextFormat ⇒ E,
71 * This is called if no error happens while parsing to a domain object.
73 onEventParserSuccess: (Array[Byte], E) ⇒ Unit,
76 * This is called if an error happens while parsing to a domain object.
78 onEventParserError: (Array[Byte], Throwable) ⇒ Unit,
81 * This is called with the parsed domain object as a final check before saving to DB.
82 * If the result is `None`, then we proceed with the `saveAction` else the returned
83 * [[gr.grnet.aquarium.connector.handler.HandlerResult]] is communicated back from the
84 * `handlePayload` method.
86 preSaveAction: E ⇒ Option[HandlerResult],
89 * Saves the parsed domain object to DB. Returns the saved domain object.
94 * Forwards the saved domain object for further processing.
96 forwardAction: E ⇒ Unit) extends PayloadHandler with Loggable {
99 * This is the core business logic that Aquarium applies to an incoming event.
100 * The method is marked `final` to indicate that the business logic is fixed
101 * and any parameterization must happen via the constructor parameters.
103 * The implementation is careful to catch any exceptions and return the proper result.
105 final def handlePayload(payload: Array[Byte]): HandlerResult = {
106 // 1. try to parse as json
111 safeUnit(onJsonParserError(payload, e))
113 HandlerResultReject(e.getMessage)
115 case Just(jsonTextFormat) ⇒
116 safeUnit(onJsonParserSuccess(payload, jsonTextFormat))
118 // 2. try to parse as model
120 eventParser(jsonTextFormat)
123 safeUnit(onEventParserError(payload, e))
125 HandlerResultReject(e.getMessage)
128 safeUnit(onEventParserSuccess(payload, event))
130 // 3. See if we are ready to save to DB
135 val errMsg = "While running preSaveAction(%s) from %s".format(
136 shortClassNameOf(event),
137 shortClassNameOf(this))
139 LogHelpers.logChainOfCauses(logger, e, errMsg)
140 logger.error(errMsg, e)
142 // oops. must resend this message due to unexpected result
145 case Just(Some(handlerResult)) ⇒
146 // Nope. Not ready to save.
150 // Yep. Ready to save
151 // 4. try to save to DB
156 val errMsg = "While running saveAction(%s) from %s".format(
157 shortClassNameOf(event),
158 shortClassNameOf(this))
160 LogHelpers.logChainOfCauses(logger, e, errMsg)
161 logger.error(errMsg, e)
163 HandlerResultPanic(shortInfoOf(e))
166 // 4. try forward but it's OK if something bad happens here.