Now using schema-based messages
[aquarium] / src / main / scala / gr / grnet / aquarium / store / LocalFSEventStore.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium.store
37
38 import gr.grnet.aquarium.Aquarium
39 import gr.grnet.aquarium.message.avro.gen.{IMEventMsg, ResourceEventMsg}
40 import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
41 import gr.grnet.aquarium.util.{Loggable, stringOfStackTrace, makeBytes, UTF_8_Charset}
42 import java.io.{FileOutputStream, File}
43
44 /**
45  * This is used whenever the property `events.store.folder` is setup in aquarium configuration.
46  *
47  * This is mainly a debugging aid. You normally want to disable it in a production environment.
48  *
49  * @author Christos KK Loverdos <loverdos@gmail.com>
50  */
51
52 object LocalFSEventStore extends Loggable {
53   private[this] final val NewLine  = makeBytes("\n", UTF_8_Charset) // super-fluous!
54
55   private[this] def writeToFile(
56       file: File,
57       dataHeader: String,
58       data: Array[Byte],
59       dataFooter: String,
60       appendString: Option[String] = None
61   ): Unit = {
62
63     val out = new FileOutputStream(file)
64
65     out.write(makeBytes(dataHeader, UTF_8_Charset))
66     out.write(data)
67     out.write(makeBytes(dataFooter, UTF_8_Charset))
68
69     appendString match {
70       case Some(s) ⇒
71         out.write(NewLine)
72         out.write(makeBytes(s, UTF_8_Charset))
73       case None ⇒
74     }
75
76     out.flush()
77     out.close()
78
79     logger.debug("Wrote to file {}", file.getCanonicalPath)
80   }
81
82   private[this] def dateTagForFolder(): String = {
83     new MutableDateCalc(TimeHelpers.nowMillis()).toYYYYMMDD
84   }
85
86   private[this] def createResourceEventsFolder(root: File): File = {
87     val folder0 = new File(root, "rc")
88     val folder = new File(folder0, "rc-%s".format(dateTagForFolder()))
89     folder.mkdirs()
90     folder
91   }
92
93   private[this] def createIMEventsFolder(root: File): File = {
94     val folder0 = new File(root, "im")
95     val folder = new File(folder0, "im-%s".format(dateTagForFolder()))
96     folder.mkdirs()
97     folder
98   }
99
100   private[this] def writeJson(
101       tag: String,
102       folder: File,
103       jsonPayload: Array[Byte],
104       occurredString: String,
105       extraName: Option[String],
106       isParsed: Boolean,
107       appendString: Option[String]
108   ): Unit = {
109
110     val file = new File(
111       folder,
112       "%s-%s%s.%s.json".format(
113         tag,
114         occurredString,
115         extraName match {
116           case Some(s) ⇒ "-" + s
117           case None    ⇒ ""
118         },
119         if(isParsed) "p" else "u"
120       ))
121
122     val dataHeader = "// %s bytes of payload\n".format(jsonPayload.length)
123     val dataFooter = "\n" + dataHeader
124
125     writeToFile(
126       file,
127       dataHeader,
128       jsonPayload,
129       dataFooter,
130       appendString)
131   }
132
133   def storeUnparsedResourceEvent(aquarium: Aquarium, initialPayload: Array[Byte], exception: Throwable): Unit = {
134     for(root <- aquarium.eventsStoreFolder) {
135       val occurredMDC = new MutableDateCalc(TimeHelpers.nowMillis())
136       val occurredString = occurredMDC.toFilename_YYYYMMDDHHMMSSSSS
137       val rcEventsFolder = createResourceEventsFolder(root)
138       val trace = stringOfStackTrace(exception)
139
140       writeJson("rc", rcEventsFolder, initialPayload, occurredString, None, false, Some(trace))
141     }
142   }
143
144   def storeResourceEvent(aquarium: Aquarium, event: ResourceEventMsg, initialPayload: Array[Byte]): Unit = {
145     if(!aquarium.saveResourceEventsToEventsStoreFolder) {
146       return
147     }
148
149     require(event ne null, "Resource event must be not null")
150
151     for(root <- aquarium.eventsStoreFolder) {
152       val occurredMDC = new MutableDateCalc(event.getOccurredMillis)
153       val occurredString = occurredMDC.toFilename_YYYYMMDDHHMMSSSSS
154       val rcEventsFolder = createResourceEventsFolder(root)
155
156       // Store parsed file
157       writeJson(
158         "rc",
159         rcEventsFolder,
160         initialPayload,
161         occurredString,
162         Some("[%s]-[%s]-[%s]-[%s]".format(
163           event.getOriginalID,
164           event.getUserID,
165           event.getResource,
166           event.getInstanceID)),
167         true,
168         None
169       )
170     }
171   }
172
173   def storeUnparsedIMEvent(aquarium: Aquarium, initialPayload: Array[Byte], exception: Throwable): Unit = {
174     for(root <- aquarium.eventsStoreFolder) {
175       val occurredMDC = new MutableDateCalc(TimeHelpers.nowMillis())
176       val occurredString = occurredMDC.toFilename_YYYYMMDDHHMMSSSSS
177       val imEventsFolder = createIMEventsFolder(root)
178       val trace = stringOfStackTrace(exception)
179
180       writeJson("im", imEventsFolder, initialPayload, occurredString, None, false, Some(trace))
181     }
182   }
183
184   def storeIMEvent(aquarium: Aquarium, event: IMEventMsg, initialPayload: Array[Byte]): Unit = {
185     if(!aquarium.saveIMEventsToEventsStoreFolder) {
186       return
187     }
188
189     require(event ne null, "IM event must be not null")
190
191     for(root <- aquarium.eventsStoreFolder) {
192       val occurredMDC = new MutableDateCalc(event.getOccurredMillis)
193       val occurredString = occurredMDC.toFilename_YYYYMMDDHHMMSSSSS
194       val imEventsFolder = createIMEventsFolder(root)
195
196       writeJson(
197         "im",
198         imEventsFolder,
199         initialPayload,
200         occurredString,
201         Some("[%s]-[%s]".format(event.getOriginalID, event.getUserID)),
202         true,
203         None
204       )
205     }
206   }
207 }