a31281ea0dda3646b181fad934108193ee3055f6
[aquarium] / src / main / scala / gr / grnet / aquarium / Aquarium.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium
37
38 import com.ckkloverdos.env.Env
39 import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey}
40 import com.ckkloverdos.props.Props
41 import gr.grnet.aquarium.store.{PolicyStore, UserStateStore, IMEventStore, ResourceEventStore, StoreProvider}
42 import java.io.File
43 import gr.grnet.aquarium.util.{Loggable, Lifecycle}
44 import gr.grnet.aquarium.service.{StoreWatcherService, RabbitMQService, TimerService, EventBusService, AkkaService}
45 import com.ckkloverdos.convert.Converters
46 import java.util.concurrent.atomic.AtomicBoolean
47 import org.slf4j.{LoggerFactory, Logger}
48 import gr.grnet.aquarium.computation.UserStateComputations
49 import com.ckkloverdos.maybe._
50 import gr.grnet.aquarium.ResourceLocator._
51 import com.ckkloverdos.sys.SysProp
52 import gr.grnet.aquarium.service.event.AquariumCreatedEvent
53 import gr.grnet.aquarium.policy.{PolicyDefinedFullPriceTableRef, StdUserAgreement, UserAgreementModel, ResourceType}
54
55 /**
56  *
57  * @author Christos KK Loverdos <loverdos@gmail.com>
58  */
59
60 final class Aquarium(env: Env) extends Lifecycle with Loggable {
61   import Aquarium.EnvKeys
62
63   private[this] val _isStopping = new AtomicBoolean(false)
64
65   override def toString = "%s/v%s".format(getClass.getName, version)
66
67   def isStopping() = _isStopping.get()
68
69   @inline
70   def getClientLogger(client: AnyRef): Logger = {
71     client match {
72       case null ⇒
73         this.logger
74
75       case _ ⇒
76         LoggerFactory.getLogger(client.getClass)
77     }
78   }
79
80   def debug(client: AnyRef, fmt: String, args: Any*) = {
81     getClientLogger(client).debug(fmt.format(args: _*))
82   }
83
84   def info(client: AnyRef, fmt: String, args: Any*) = {
85     getClientLogger(client).info(fmt.format(args: _*))
86   }
87
88   def warn(client: AnyRef, fmt: String, args: Any*) = {
89     getClientLogger(client).warn(fmt.format(args: _*))
90   }
91
92   @throws(classOf[AquariumInternalError])
93   def apply[T: Manifest](key: TypedKey[T]): T = {
94     try {
95      env.getEx(key)
96     } catch {
97       case e: Exception ⇒
98         throw new AquariumInternalError("Could not locate %s in Aquarium environment".format(key))
99     }
100   }
101
102   private[this] lazy val _allServices = Aquarium.ServiceKeys.map(this(_))
103
104   private[this] def startServices(): Unit = {
105     for(service ← _allServices) {
106       logStartingF(service.toString) {
107         service.start()
108       } {}
109     }
110   }
111
112   private[this] def stopServices(): Unit = {
113     val services = _allServices.reverse
114
115     for(service ← services) {
116       logStoppingF(service.toString) {
117         safeUnit(service.stop())
118       } {}
119     }
120   }
121
122   private[this] def showBasicConfiguration(): Unit = {
123     logger.info("Aquarium Home = %s".format(
124       if(Homes.Folders.AquariumHome.isAbsolute)
125         Homes.Folders.AquariumHome
126       else
127         "%s [=%s]".format(Homes.Folders.AquariumHome, Homes.Folders.AquariumHome.getCanonicalPath)
128     ))
129
130     for(folder ← this.eventsStoreFolder) {
131       logger.info("{} = {}", EnvKeys.eventsStoreFolder.name, folder)
132     }
133     this.eventsStoreFolder.throwMe // on error
134
135     for(prop ← Aquarium.PropsToShow) {
136       logger.info("{} = {}", prop.name, prop.rawValue)
137     }
138
139     logger.info("CONF_HERE =  {}", HERE)
140     logger.info("{} = {}", ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES, ResourceLocator.Resources.AquariumPropertiesResource)
141     logger.info("{} = {}", ResourceLocator.ResourceNames.LOGBACK_XML, ResourceLocator.Resources.LogbackXMLResource)
142     logger.info("{} = {}", ResourceLocator.ResourceNames.POLICY_YAML, ResourceLocator.Resources.PolicyYAMLResource)
143
144     logger.info("Runtime.getRuntime.availableProcessors() => {}", Runtime.getRuntime.availableProcessors())
145   }
146
147   private[this] def addShutdownHooks(): Unit = {
148     Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
149       def run = {
150         if(!_isStopping.get()) {
151           logStoppingF("Aquarium") {
152             stop()
153           } {}
154         }
155       }
156     }))
157   }
158
159   def start(): Unit = {
160     this._isStopping.set(false)
161     showBasicConfiguration()
162     addShutdownHooks()
163     startServices()
164   }
165
166   def stop(): Unit = {
167     this._isStopping.set(true)
168     stopServices()
169   }
170
171   /**
172    * Stops Aquarium after the given millis. Used during testing.
173    */
174   def stopAfterMillis(millis: Long) {
175     Thread sleep millis
176     stop()
177   }
178
179   /**
180    * Reflectively provide a new instance of a class and configure it appropriately.
181    */
182   def newInstance[C <: AnyRef](_class: Class[C], className: String): C = {
183     val originalProps = apply(EnvKeys.originalProps)
184
185     val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
186     instanceM match {
187       case Just(instance) ⇒
188 //        eventBus.addSubscriber[C](instance)
189         instance match {
190           case aquariumAware: AquariumAware ⇒
191             aquariumAware.awareOfAquarium(AquariumCreatedEvent(this))
192
193           case _ ⇒
194         }
195
196         instance match {
197           case configurable: Configurable if (originalProps ne null) ⇒
198             val localProps = configurable.propertyPrefix match {
199               case somePrefix @ Some(prefix) ⇒
200                 if(prefix.length == 0) {
201                   logger.warn(
202                     "Property prefix for %s is %s. Consider using None".format(instance, somePrefix))
203                 }
204
205                 originalProps.subsetForKeyPrefix(prefix)
206
207               case None ⇒
208                 originalProps
209             }
210
211             logger.debug("Configuring {} with props (prefix={})", configurable.getClass.getName, configurable.propertyPrefix)
212             MaybeEither(configurable configure localProps) match {
213               case Just(_) ⇒
214                 logger.info("Configured {} with props (prefix={})", configurable.getClass.getName, configurable.propertyPrefix)
215
216               case Failed(e) ⇒
217                 throw new AquariumInternalError("Could not configure instance of %s".format(className), e)
218             }
219
220           case _ ⇒
221         }
222
223         instance
224
225       case Failed(e) ⇒
226         throw new AquariumInternalError("Could not instantiate %s".format(className), e)
227     }
228
229   }
230
231   def currentResourceTypesMap: Map[String, ResourceType] = {
232     // FIXME: Implement
233     Map()
234   }
235
236   def initialUserAgreementForRole(role: String, referenceTimeMillis: Long): UserAgreementModel = {
237     // FIXME: Where is the mapping?
238     StdUserAgreement("", None, Timespan(0L), defaultInitialUserRole, PolicyDefinedFullPriceTableRef)
239   }
240
241   def initialBalanceForRole(role: String, referenceTimeMillis: Long): Double = {
242     // FIXME: Where is the mapping?
243     10000.0
244   }
245
246   def defaultInitialUserRole: String = {
247     // FIXME: Read from properties?
248     "default"
249   }
250
251   def defaultClassLoader = apply(EnvKeys.defaultClassLoader)
252
253   def resourceEventStore = apply(EnvKeys.resourceEventStore)
254
255   def imEventStore = apply(EnvKeys.imEventStore)
256
257   def userStateStore = apply(EnvKeys.userStateStore)
258
259   def policyStore = apply(EnvKeys.policyStore)
260
261   def eventsStoreFolder = apply(EnvKeys.eventsStoreFolder)
262
263   def eventBus = apply(EnvKeys.eventBus)
264
265   def userStateComputations = apply(EnvKeys.userStateComputations)
266
267   def userStateTimestampThreshold = apply(EnvKeys.userStateTimestampThreshold)
268
269   def adminCookie = apply(EnvKeys.adminCookie)
270
271   def converters = apply(EnvKeys.converters)
272
273   def saveResourceEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveRCEvents)
274
275   def saveIMEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveIMEvents)
276
277   def timerService = apply(EnvKeys.timerService)
278
279   def restPort = apply(EnvKeys.restPort)
280
281   def akkaService = apply(EnvKeys.akkaService)
282
283   def version = apply(EnvKeys.version)
284 }
285
286 object Aquarium {
287   final val PropsToShow = List(
288     SysProp.JavaVMName,
289     SysProp.JavaVersion,
290     SysProp.JavaHome,
291     SysProp.JavaClassVersion,
292     SysProp.JavaLibraryPath,
293     SysProp.JavaClassPath,
294     SysProp.JavaIOTmpDir,
295     SysProp.UserName,
296     SysProp.UserHome,
297     SysProp.UserDir,
298     SysProp.FileEncoding
299   )
300
301   object HTTP {
302    final val RESTAdminHeaderName = "X-Aquarium-Admin-Cookie"
303    final val RESTAdminHeaderNameLowerCase = RESTAdminHeaderName.toLowerCase
304  }
305
306   final class AquariumEnvKey[T: Manifest](override val name: String) extends TypedKeySkeleton[T](name) {
307     override def toString = name
308   }
309
310   final val ServiceKeys: List[TypedKey[_ <: Lifecycle]] = List(
311     EnvKeys.timerService,
312     EnvKeys.akkaService,
313     EnvKeys.eventBus,
314     EnvKeys.restService,
315     EnvKeys.rabbitMQService,
316     EnvKeys.storeWatcherService
317   )
318
319   object EnvKeys {
320     /**
321      * The Aquarium version. Will be reported in any due occasion.
322      */
323     final val version = StringKey("version")
324
325     final val originalProps: TypedKey[Props] =
326       new AquariumEnvKey[Props]("originalProps")
327
328     /**
329      * The fully qualified name of the class that implements the `StoreProvider`.
330      * Will be instantiated reflectively and should have a public default constructor.
331      */
332     final val storeProvider: TypedKey[StoreProvider] =
333       new AquariumEnvKey[StoreProvider]("store.provider.class")
334
335     /**
336      * If a value is given to this property, then it represents a folder where all events coming to aquarium are
337      * saved.
338      *
339      * This is for debugging purposes.
340      */
341     final val eventsStoreFolder: TypedKey[Option[File]] =
342       new AquariumEnvKey[Option[File]]("events.store.folder")
343
344     /**
345      * If this is `true` and `events.store.folder` is defined, then all resource events are
346      * also stored in `events.store.folder`.
347      *
348      * This is for debugging purposes.
349      */
350
351     final val eventsStoreSaveRCEvents = BooleanKey("events.store.save.rc.events")
352
353     /**
354      * If this is `true` and `events.store.folder` is defined, then all IM events are
355      * also stored in `events.store.folder`.
356      *
357      * This is for debugging purposes.
358      */
359     final val eventsStoreSaveIMEvents = BooleanKey("events.store.save.im.events")
360
361     /**
362      * A time period in milliseconds for which we can tolerate stale parts regarding user state.
363      *
364      * The smaller the value, the more accurate the user credits and other state parts are.
365      *
366      * If a request for user state (e.g. balance) is received and the request timestamp exceeds
367      * the timestamp of the last known balance amount by this value, then a re-computation for
368      * the balance is triggered.
369      */
370     final val userStateTimestampThreshold = LongKey("user.state.timestamp.threshold")
371
372     /**
373      * REST service listening port.
374      */
375     final val restPort = IntKey("rest.port")
376
377     final val restShutdownTimeoutMillis = LongKey("rest.shutdown.timeout.millis")
378
379     /**
380      * A cookie used in every administrative REST API call, so that Aquarium knows it comes from
381      * an authorised client.
382      */
383     final val adminCookie: TypedKey[Option[String]] =
384       new AquariumEnvKey[Option[String]]("admin.cookie")
385
386     final val resourceEventStore: TypedKey[ResourceEventStore] =
387       new AquariumEnvKey[ResourceEventStore]("resource.event.store.class")
388
389     final val imEventStore: TypedKey[IMEventStore] =
390       new AquariumEnvKey[IMEventStore]("im.event.store.class")
391
392     final val userStateStore: TypedKey[UserStateStore] =
393       new AquariumEnvKey[UserStateStore]("user.state.store.class")
394
395     final val policyStore: TypedKey[PolicyStore] =
396       new AquariumEnvKey[PolicyStore]("policy.store.class")
397
398     /**
399      * The class that initializes the REST service
400      */
401     final val restService: TypedKey[Lifecycle] =
402       new AquariumEnvKey[Lifecycle]("rest.service.class")
403
404     final val akkaService: TypedKey[AkkaService] =
405       new AquariumEnvKey[AkkaService]("akka.service")
406
407     final val eventBus: TypedKey[EventBusService] =
408       new AquariumEnvKey[EventBusService]("event.bus.service")
409
410     final val timerService: TypedKey[TimerService] =
411       new AquariumEnvKey[TimerService]("timer.service")
412
413     final val rabbitMQService: TypedKey[RabbitMQService] =
414       new AquariumEnvKey[RabbitMQService]("rabbitmq.service")
415
416     final val storeWatcherService: TypedKey[StoreWatcherService] =
417       new AquariumEnvKey[StoreWatcherService]("store.watcher.service")
418
419     final val converters: TypedKey[Converters] =
420       new AquariumEnvKey[Converters]("converters")
421
422     final val userStateComputations: TypedKey[UserStateComputations] =
423       new AquariumEnvKey[UserStateComputations]("user.state.computations")
424
425     final val defaultClassLoader: TypedKey[ClassLoader] =
426       new AquariumEnvKey[ClassLoader]("default.class.loader")
427
428   }
429 }