e8f5c6c38470c722f227447ebad98f1444c2b68c
[aquarium] / src / main / scala / gr / grnet / aquarium / connector / handler / ResourceEventPayloadHandler.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.connector.handler
37
38 import gr.grnet.aquarium.Aquarium
39 import gr.grnet.aquarium.actor.message.event.ProcessResourceEvent
40 import gr.grnet.aquarium.converter.JsonTextFormat
41 import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
42 import gr.grnet.aquarium.store.LocalFSEventStore
43 import gr.grnet.aquarium.util._
44 import org.slf4j.Logger
45
46 /**
47  * A [[gr.grnet.aquarium.connector.handler.PayloadHandler]] for
48  * [[gr.grnet.aquarium.event.model.resource.ResourceEventModel]]s.
49  *
50  * @author Christos KK Loverdos <loverdos@gmail.com>
51  */
52
53 class ResourceEventPayloadHandler(aquarium: Aquarium, logger: Logger)
54   extends GenericPayloadHandler[ResourceEventModel](
55       // jsonParser: Array[Byte] ⇒ JsonTextFormat
56       payload ⇒ {
57         aquarium.converters.convertEx[JsonTextFormat](payload)
58       },
59
60       // onJsonParserSuccess: (Array[Byte], JsonTextFormat) ⇒ Unit
61       (payload, jsonTextFormat) ⇒ {
62       },
63
64       // onJsonParserError: (Array[Byte], Throwable) ⇒ Unit
65       (payload, error) ⇒ {
66         val errMsg = "Error creating JSON from %s payload".format(Tags.ResourceEventTag)
67         LogHelpers.logChainOfCauses(logger, error, errMsg)
68         logger.error(errMsg, error)
69
70         LocalFSEventStore.storeUnparsedResourceEvent(aquarium, payload, error)
71       },
72
73       // eventParser: JsonTextFormat ⇒ E
74       jsonTextFormat ⇒ {
75         StdResourceEvent.fromJsonTextFormat(jsonTextFormat)
76       },
77
78       // onEventParserSuccess: (Array[Byte], E) ⇒ Unit
79       (payload, event) ⇒ {
80         LocalFSEventStore.storeResourceEvent(aquarium, event, payload)
81       },
82
83       // onEventParserError: (Array[Byte], Throwable) ⇒ Unit
84       (payload, error) ⇒ {
85         val errMsg = "Error creating object model from %s payload".format(Tags.ResourceEventTag)
86         LogHelpers.logChainOfCauses(logger, error, errMsg)
87         logger.error(errMsg, error)
88
89         LocalFSEventStore.storeUnparsedResourceEvent(aquarium, payload, error)
90       },
91
92       // preSaveAction: E ⇒ Option[HandlerResult]
93       rcEvent ⇒ {
94         val className = shortClassNameOf(rcEvent)
95         val id = rcEvent.id
96
97         // Let's decide if it is OK to store the event
98         // Remember that OK == None as the returning result
99         //
100         // NOTE: If anything goes wrong with this function, then the handler
101         //       (handlePayload in GenericPayloadHandler) will issue a Resend,
102         //       so do not bother to catch exceptions here.
103
104         // 1. Check if the same ID exists. Note that we use the ID sent by the event producer.
105         //    It is a requirement that this ID is unique.
106         val store = aquarium.resourceEventStore
107         store.findResourceEventByID(id) match {
108           case Some(_) ⇒
109             // Reject the duplicate
110             Some(HandlerResultReject("Duplicate %s with id = %s".format(className, id)))
111
112           case None ⇒
113             None
114         }
115       },
116
117       // saveAction: E ⇒ S
118       rcEvent ⇒ {
119         aquarium.resourceEventStore.insertResourceEvent(rcEvent)
120       },
121
122       // forwardAction: S ⇒ Unit
123       rcEvent ⇒ {
124         aquarium.akkaService.getOrCreateUserActor(rcEvent.userID) ! ProcessResourceEvent(rcEvent)
125       }
126     )