4c335f27dc2a65e2c6762b0fb64ca69763745dc4
[aquarium] / src / main / scala / gr / grnet / aquarium / Aquarium.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium
37
38 import com.ckkloverdos.env.Env
39 import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey}
40 import com.ckkloverdos.props.Props
41 import gr.grnet.aquarium.store.{PolicyStore, UserStateStore, IMEventStore, ResourceEventStore, StoreProvider}
42 import java.io.File
43 import gr.grnet.aquarium.util.{Loggable, Lifecycle}
44 import gr.grnet.aquarium.service.{RoleableActorProviderService, StoreWatcherService, RabbitMQService, TimerService, EventBusService, AkkaService}
45 import com.ckkloverdos.convert.Converters
46 import java.util.concurrent.atomic.AtomicBoolean
47 import org.slf4j.{LoggerFactory, Logger}
48 import gr.grnet.aquarium.logic.accounting.algorithm.CostPolicyAlgorithmCompiler
49 import gr.grnet.aquarium.computation.UserStateComputations
50 import com.ckkloverdos.maybe._
51 import gr.grnet.aquarium.ResourceLocator._
52 import gr.grnet.aquarium.logic.accounting.dsl.DSLResourcesMap
53 import gr.grnet.aquarium.logic.accounting.Policy
54 import com.ckkloverdos.sys.SysProp
55
56 /**
57  *
58  * @author Christos KK Loverdos <loverdos@gmail.com>
59  */
60
61 final class Aquarium(env: Env) extends Lifecycle with Loggable {
62   import Aquarium.EnvKeys
63
64   private[this] val _isStopping = new AtomicBoolean(false)
65
66   override def toString = "%s/v%s".format(getClass.getName, version)
67
68   def isStopping() = _isStopping.get()
69
70   @inline
71   def getClientLogger(client: AnyRef): Logger = {
72     client match {
73       case null ⇒
74         this.logger
75
76       case _ ⇒
77         LoggerFactory.getLogger(client.getClass)
78     }
79   }
80
81   def debug(client: AnyRef, fmt: String, args: Any*) = {
82     getClientLogger(client).debug(fmt.format(args: _*))
83   }
84
85   def info(client: AnyRef, fmt: String, args: Any*) = {
86     getClientLogger(client).info(fmt.format(args: _*))
87   }
88
89   def warn(client: AnyRef, fmt: String, args: Any*) = {
90     getClientLogger(client).warn(fmt.format(args: _*))
91   }
92
93   @throws(classOf[AquariumInternalError])
94   def apply[T: Manifest](key: TypedKey[T]): T = {
95     try {
96      env.getEx(key)
97     } catch {
98       case e: Exception ⇒
99         throw new AquariumInternalError("Could not locate %s in Aquarium environment".format(key))
100     }
101   }
102
103   private[this] lazy val _allServices = Aquarium.ServiceKeys.map(this(_))
104
105   private[this] def startServices(): Unit = {
106     for(service ← _allServices) {
107       logStartingF(service.toString) {
108         service.start()
109       } {}
110     }
111   }
112
113   private[this] def stopServices(): Unit = {
114     val services = _allServices.reverse
115
116     for(service ← services) {
117       logStoppingF(service.toString) {
118         safeUnit(service.stop())
119       } {}
120     }
121   }
122
123   private[this] def showBasicConfiguration(): Unit = {
124     logger.info("Aquarium Home = %s".format(
125       if(Homes.Folders.AquariumHome.isAbsolute)
126         Homes.Folders.AquariumHome
127       else
128         "%s [=%s]".format(Homes.Folders.AquariumHome, Homes.Folders.AquariumHome.getCanonicalPath)
129     ))
130
131     for(folder ← this.eventsStoreFolder) {
132       logger.info("{} = {}", EnvKeys.eventsStoreFolder.name, folder)
133     }
134     this.eventsStoreFolder.throwMe // on error
135
136     for(prop ← Aquarium.PropsToShow) {
137       logger.info("{} = {}", prop.name, prop.rawValue)
138     }
139
140     logger.info("CONF_HERE =  {}", HERE)
141     logger.info("{} = {}", ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES, ResourceLocator.Resources.AquariumPropertiesResource)
142     logger.info("{} = {}", ResourceLocator.ResourceNames.LOGBACK_XML, ResourceLocator.Resources.LogbackXMLResource)
143     logger.info("{} = {}", ResourceLocator.ResourceNames.POLICY_YAML, ResourceLocator.Resources.PolicyYAMLResource)
144
145     logger.info("Runtime.getRuntime.availableProcessors() => {}", Runtime.getRuntime.availableProcessors())
146   }
147
148   private[this] def addShutdownHooks(): Unit = {
149     Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
150       def run = {
151         if(!_isStopping.get()) {
152           logStoppingF("Aquarium") {
153             stop()
154           } {}
155         }
156       }
157     }))
158   }
159
160   def start(): Unit = {
161     this._isStopping.set(false)
162     showBasicConfiguration()
163     addShutdownHooks()
164     startServices()
165   }
166
167   def stop(): Unit = {
168     this._isStopping.set(true)
169     stopServices()
170   }
171
172   /**
173    * Stops Aquarium after the given millis. Used during testing.
174    */
175   def stopAfterMillis(millis: Long) {
176     Thread sleep millis
177     stop()
178   }
179
180   /**
181    * Reflectively provide a new instance of a class and configure it appropriately.
182    */
183   def newInstance[C <: AnyRef](_class: Class[C], className: String): C = {
184     val originalProps = apply(EnvKeys.originalProps)
185
186     val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
187     instanceM match {
188       case Just(instance) ⇒
189         eventBus.addSubscriber[C](instance)
190
191         instance match {
192           case configurable: Configurable if (originalProps ne null) ⇒
193             val localProps = configurable.propertyPrefix match {
194               case somePrefix @ Some(prefix) ⇒
195                 if(prefix.length == 0) {
196                   logger.warn(
197                     "Property prefix for %s is %s. Consider using None".format(instance, somePrefix))
198                 }
199
200                 originalProps.subsetForKeyPrefix(prefix)
201
202               case None ⇒
203                 originalProps
204             }
205
206             logger.debug("Configuring {} with props", configurable.getClass.getName)
207             MaybeEither(configurable configure localProps) match {
208               case Just(_) ⇒
209                 logger.info("Configured {} with props", configurable.getClass.getName)
210                 instance
211
212               case Failed(e) ⇒
213                 throw new AquariumInternalError("Could not configure instance of %s".format(className), e)
214             }
215
216           case _ ⇒
217             instance
218         }
219
220       case Failed(e) ⇒
221         throw new AquariumInternalError("Could not instantiate %s".format(className), e)
222     }
223
224   }
225
226   def currentResourcesMap: DSLResourcesMap = {
227     // FIXME: Get rid of this singleton stuff
228     Policy.policy.resourcesMap
229   }
230
231   def initialAgreementForRole(role: String, referenceTimeMillis: Long): String = {
232     // FIXME: Where is the mapping?
233     "default"
234   }
235
236   def initialBalanceForRole(role: String, referenceTimeMillis: Long): Double = {
237     // FIXME: Where is the mapping?
238     10000.0
239   }
240
241   def defaultInitialUserRole: String = {
242     // FIXME: Read from properties?
243     "default"
244   }
245
246   def defaultClassLoader = apply(EnvKeys.defaultClassLoader)
247
248   def resourceEventStore = apply(EnvKeys.resourceEventStore)
249
250   def imEventStore = apply(EnvKeys.imEventStore)
251
252   def userStateStore = apply(EnvKeys.userStateStore)
253
254   def policyStore = apply(EnvKeys.policyStore)
255
256   def eventsStoreFolder = apply(EnvKeys.eventsStoreFolder)
257
258   def algorithmCompiler = apply(EnvKeys.algorithmCompiler)
259
260   def eventBus = apply(EnvKeys.eventBus)
261
262   def userStateComputations = apply(EnvKeys.userStateComputations)
263
264   def userStateTimestampThreshold = apply(EnvKeys.userStateTimestampThreshold)
265
266   def adminCookie = apply(EnvKeys.adminCookie)
267
268   def converters = apply(EnvKeys.converters)
269
270 //  def actorProvider = apply(EnvKeys.actorProvider)
271
272   def saveResourceEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveRCEvents)
273
274   def saveIMEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveIMEvents)
275
276   def timerService = apply(EnvKeys.timerService)
277
278   def restPort = apply(EnvKeys.restPort)
279
280   def akkaService = apply(EnvKeys.akkaService)
281
282   def version = apply(EnvKeys.version)
283 }
284
285 object Aquarium {
286   final val PropsToShow = List(
287     SysProp.JavaVMName,
288     SysProp.JavaVersion,
289     SysProp.JavaHome,
290     SysProp.JavaClassVersion,
291     SysProp.JavaLibraryPath,
292     SysProp.JavaClassPath,
293     SysProp.JavaIOTmpDir,
294     SysProp.UserName,
295     SysProp.UserHome,
296     SysProp.UserDir,
297     SysProp.FileEncoding
298   )
299
300   object HTTP {
301    final val RESTAdminHeaderName = "X-Aquarium-Admin-Cookie"
302    final val RESTAdminHeaderNameLowerCase = RESTAdminHeaderName.toLowerCase
303  }
304
305   final class AquariumEnvKey[T: Manifest](override val name: String) extends TypedKeySkeleton[T](name) {
306     override def toString = name
307   }
308
309   final val ServiceKeys: List[TypedKey[_ <: Lifecycle]] = List(
310     EnvKeys.timerService,
311     EnvKeys.akkaService,
312     EnvKeys.eventBus,
313     EnvKeys.restService,
314     EnvKeys.rabbitMQService,
315     EnvKeys.storeWatcherService
316   )
317
318   object EnvKeys {
319     /**
320      * The Aquarium version. Will be reported in any due occasion.
321      */
322     final val version = StringKey("version")
323
324     final val originalProps: TypedKey[Props] =
325       new AquariumEnvKey[Props]("originalProps")
326
327     /**
328      * The fully qualified name of the class that implements the `StoreProvider`.
329      * Will be instantiated reflectively and should have a public default constructor.
330      */
331     final val storeProvider: TypedKey[StoreProvider] =
332       new AquariumEnvKey[StoreProvider]("store.provider.class")
333
334     /**
335      * If a value is given to this property, then it represents a folder where all events coming to aquarium are
336      * saved.
337      *
338      * This is for debugging purposes.
339      */
340     final val eventsStoreFolder: TypedKey[Option[File]] =
341       new AquariumEnvKey[Option[File]]("events.store.folder")
342
343     /**
344      * If this is `true` and `events.store.folder` is defined, then all resource events are
345      * also stored in `events.store.folder`.
346      *
347      * This is for debugging purposes.
348      */
349
350     final val eventsStoreSaveRCEvents = BooleanKey("events.store.save.rc.events")
351
352     /**
353      * If this is `true` and `events.store.folder` is defined, then all IM events are
354      * also stored in `events.store.folder`.
355      *
356      * This is for debugging purposes.
357      */
358     final val eventsStoreSaveIMEvents = BooleanKey("events.store.save.im.events")
359
360     /**
361      * A time period in milliseconds for which we can tolerate stale parts regarding user state.
362      *
363      * The smaller the value, the more accurate the user credits and other state parts are.
364      *
365      * If a request for user state (e.g. balance) is received and the request timestamp exceeds
366      * the timestamp of the last known balance amount by this value, then a re-computation for
367      * the balance is triggered.
368      */
369     final val userStateTimestampThreshold = LongKey("user.state.timestamp.threshold")
370
371     /**
372      * REST service listening port.
373      *
374      * Default is 8080.
375      */
376     final val restPort = IntKey("rest.port")
377
378     final val restShutdownTimeoutMillis = LongKey("rest.shutdown.timeout.millis")
379
380     /**
381      * A cookie used in every administrative REST API call, so that Aquarium knows it comes from
382      * an authorised client.
383      */
384     final val adminCookie: TypedKey[Option[String]] =
385       new AquariumEnvKey[Option[String]]("admin.cookie")
386
387     final val resourceEventStore: TypedKey[ResourceEventStore] =
388       new AquariumEnvKey[ResourceEventStore]("resource.event.store.class")
389
390     final val imEventStore: TypedKey[IMEventStore] =
391       new AquariumEnvKey[IMEventStore]("im.event.store.class")
392
393     final val userStateStore: TypedKey[UserStateStore] =
394       new AquariumEnvKey[UserStateStore]("user.state.store.class")
395
396     final val policyStore: TypedKey[PolicyStore] =
397       new AquariumEnvKey[PolicyStore]("policy.store.class")
398
399     /**
400      * The class that initializes the REST service
401      */
402     final val restService: TypedKey[Lifecycle] =
403       new AquariumEnvKey[Lifecycle]("rest.service.class")
404
405     /**
406      * The fully qualified name of the class that implements the `RoleableActorProviderService`.
407      * Will be instantiated reflectively and should have a public default constructor.
408      */
409 //    final val actorProvider: TypedKey[RoleableActorProviderService] =
410 //      new AquariumEnvKey[RoleableActorProviderService]("actor.provider.class")
411
412     final val akkaService: TypedKey[AkkaService] =
413       new AquariumEnvKey[AkkaService]("akka.service")
414
415     final val eventBus: TypedKey[EventBusService] =
416       new AquariumEnvKey[EventBusService]("event.bus.service")
417
418     final val timerService: TypedKey[TimerService] =
419       new AquariumEnvKey[TimerService]("timer.service")
420
421     final val rabbitMQService: TypedKey[RabbitMQService] =
422       new AquariumEnvKey[RabbitMQService]("rabbitmq.service")
423
424     final val storeWatcherService: TypedKey[StoreWatcherService] =
425       new AquariumEnvKey[StoreWatcherService]("store.watcher.service")
426
427     final val converters: TypedKey[Converters] =
428       new AquariumEnvKey[Converters]("converters")
429
430     final val algorithmCompiler: TypedKey[CostPolicyAlgorithmCompiler] =
431       new AquariumEnvKey[CostPolicyAlgorithmCompiler]("algorithm.compiler")
432
433     final val userStateComputations: TypedKey[UserStateComputations] =
434       new AquariumEnvKey[UserStateComputations]("user.state.computations")
435
436     final val defaultClassLoader: TypedKey[ClassLoader] =
437       new AquariumEnvKey[ClassLoader]("default.class.loader")
438
439   }
440 }