e61a7abff0859e7516d7feb42735fc40f1add92f
[aquarium] / src / main / scala / gr / grnet / aquarium / Aquarium.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium
37
38 import com.ckkloverdos.env.Env
39 import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey}
40 import com.ckkloverdos.props.Props
41 import gr.grnet.aquarium.store.{PolicyStore, StoreProvider}
42 import java.io.File
43 import gr.grnet.aquarium.util.{Loggable, Lifecycle}
44 import gr.grnet.aquarium.service.{StoreWatcherService, RabbitMQService, TimerService, EventBusService, AkkaService}
45 import com.ckkloverdos.convert.Converters
46 import java.util.concurrent.atomic.AtomicBoolean
47 import org.slf4j.{LoggerFactory, Logger}
48 import com.ckkloverdos.maybe._
49 import com.ckkloverdos.sys.SysProp
50 import gr.grnet.aquarium.service.event.AquariumCreatedEvent
51 import gr.grnet.aquarium.policy.{FullPriceTable, PolicyModel, CachingPolicyStore, PolicyDefinedFullPriceTableRef, StdUserAgreement, UserAgreementModel, ResourceType}
52 import gr.grnet.aquarium.charging.{ChargingService, ChargingBehavior}
53 import gr.grnet.aquarium.util.date.TimeHelpers
54
55 /**
56  *
57  * @author Christos KK Loverdos <loverdos@gmail.com>
58  */
59
60 final class Aquarium(env: Env) extends Lifecycle with Loggable {
61   import Aquarium.EnvKeys
62
63   @volatile private[this] var _chargingBehaviorMap = Map[String, ChargingBehavior]()
64
65   private[this] lazy val cachingPolicyStore = new CachingPolicyStore(
66     apply(EnvKeys.defaultPolicyModel),
67     apply(EnvKeys.storeProvider).policyStore
68   )
69
70   private[this] val _isStopping = new AtomicBoolean(false)
71
72   override def toString = "%s/v%s".format(getClass.getName, version)
73
74   def isStopping() = _isStopping.get()
75
76   @inline
77   def getClientLogger(client: AnyRef): Logger = {
78     client match {
79       case null ⇒
80         this.logger
81
82       case _ ⇒
83         LoggerFactory.getLogger(client.getClass)
84     }
85   }
86
87   def debug(client: AnyRef, fmt: String, args: Any*) = {
88     getClientLogger(client).debug(fmt.format(args: _*))
89   }
90
91   def info(client: AnyRef, fmt: String, args: Any*) = {
92     getClientLogger(client).info(fmt.format(args: _*))
93   }
94
95   def warn(client: AnyRef, fmt: String, args: Any*) = {
96     getClientLogger(client).warn(fmt.format(args: _*))
97   }
98
99   @throws(classOf[AquariumInternalError])
100   def apply[T: Manifest](key: TypedKey[T]): T = {
101     try {
102      env.getEx(key)
103     } catch {
104       case e: Exception ⇒
105         throw new AquariumInternalError("Could not locate %s in Aquarium environment".format(key))
106     }
107   }
108
109   private[this] lazy val _allServices = Aquarium.ServiceKeys.map(this.apply(_))
110
111   private[this] def startServices(): Unit = {
112     for(service ← _allServices) {
113       logStartingF(service.toString) {
114         service.start()
115       } {}
116     }
117   }
118
119   private[this] def stopServices(): Unit = {
120     val services = _allServices.reverse
121
122     for(service ← services) {
123       logStoppingF(service.toString) {
124         safeUnit(service.stop())
125       } {}
126     }
127   }
128
129   private[this] def showBasicConfiguration(): Unit = {
130     for(folder ← this.eventsStoreFolder) {
131       logger.info("{} = {}", EnvKeys.eventsStoreFolder.name, folder)
132     }
133     this.eventsStoreFolder.throwMe // on error
134
135     logger.info("default policy = {}", defaultPolicyModel.toJsonString)
136   }
137
138   private[this] def addShutdownHooks(): Unit = {
139     Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
140       def run = {
141         if(!_isStopping.get()) {
142           logStoppingF("Aquarium") {
143             stop()
144           } {}
145         }
146       }
147     }))
148   }
149
150   def start(): Unit = {
151     this._isStopping.set(false)
152     showBasicConfiguration()
153     addShutdownHooks()
154     startServices()
155   }
156
157   def stop(): Unit = {
158     this._isStopping.set(true)
159     stopServices()
160   }
161
162   /**
163    * Stops Aquarium after the given millis. Used during testing.
164    */
165   def stopAfterMillis(millis: Long) {
166     Thread sleep millis
167     stop()
168   }
169
170   /**
171    * Reflectively provide a new instance of a class and configure it appropriately.
172    */
173   def newInstance[C <: AnyRef](_class: Class[C]): C = {
174     newInstance(_class.getName)
175   }
176
177   /**
178    * Reflectively provide a new instance of a class and configure it appropriately.
179    */
180   def newInstance[C <: AnyRef](className: String): C = {
181     val originalProps = apply(EnvKeys.originalProps)
182
183     val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
184     instanceM match {
185       case Just(instance) ⇒
186 //        eventBus.addSubscriber[C](instance)
187         instance match {
188           case aquariumAware: AquariumAware ⇒
189             aquariumAware.awareOfAquarium(AquariumCreatedEvent(this))
190
191           case _ ⇒
192         }
193
194         instance match {
195           case configurable: Configurable if (originalProps ne null) ⇒
196             val localProps = configurable.propertyPrefix match {
197               case somePrefix @ Some(prefix) ⇒
198                 if(prefix.length == 0) {
199                   logger.warn(
200                     "Property prefix for %s is %s. Consider using None".format(instance, somePrefix))
201                 }
202
203                 originalProps.subsetForKeyPrefix(prefix)
204
205               case None ⇒
206                 originalProps
207             }
208
209             logger.debug("Configuring {} with props (prefix={})", configurable.getClass.getName, configurable.propertyPrefix)
210             MaybeEither(configurable configure localProps) match {
211               case Just(_) ⇒
212                 logger.info("Configured {} with props (prefix={})", configurable.getClass.getName, configurable.propertyPrefix)
213
214               case Failed(e) ⇒
215                 throw new AquariumInternalError("Could not configure instance of %s".format(className), e)
216             }
217
218           case _ ⇒
219         }
220
221         instance
222
223       case Failed(e) ⇒
224         throw new AquariumInternalError("Could not instantiate %s".format(className), e)
225     }
226
227   }
228
229   def currentResourceTypesMap: Map[String, ResourceType] = {
230     val policyOpt = policyStore.loadValidPolicyAt(TimeHelpers.nowMillis())
231     if(policyOpt.isEmpty) {
232       throw new AquariumInternalError("Not even the default policy found")
233     }
234
235     policyOpt.get.resourceTypesMap
236   }
237
238   def unsafeValidPolicyAt(referenceTimeMillis: Long): PolicyModel = {
239     policyStore.loadValidPolicyAt(referenceTimeMillis) match {
240       case None ⇒
241         throw new AquariumInternalError(
242           "No policy found at %s".format(TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis))
243         )
244
245       case Some(policy) ⇒
246         policy
247     }
248   }
249
250   def unsafePriceTableForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTable = {
251     val policyAtReferenceTime = unsafeValidPolicyAt(referenceTimeMillis)
252     policyAtReferenceTime.roleMapping.get(role) match {
253       case None ⇒
254         throw new AquariumInternalError("Unknown price table for role %s at %s".format(
255           role,
256           TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis)
257         ))
258
259       case Some(fullPriceTable) ⇒
260         fullPriceTable
261     }
262   }
263
264   /**
265    * Computes the initial user agreement for the given role and reference time. Also,
266    * records the ID from a potential related IMEvent.
267    *
268    * @param role                The role in the agreement
269    * @param referenceTimeMillis The reference time to consider for the agreement
270    */
271   def initialUserAgreement(
272       role: String,
273       referenceTimeMillis: Long,
274       relatedIMEventID: Option[String]
275   ): UserAgreementModel = {
276
277     // Just checking
278     assert(null ne unsafePriceTableForRoleAt(role, referenceTimeMillis))
279
280     StdUserAgreement(
281       "<StandardUserAgreement>",
282       relatedIMEventID,
283       0,
284       Long.MaxValue,
285       role,
286       PolicyDefinedFullPriceTableRef
287     )
288   }
289
290   def initialUserBalance(role: String, referenceTimeMillis: Long): Double = {
291     // FIXME: Where is the mapping?
292     1000.0
293   }
294
295   def chargingBehaviorOf(resourceType: ResourceType): ChargingBehavior = {
296     val className = resourceType.chargingBehavior
297     _chargingBehaviorMap.get(className) match {
298       case Some(chargingBehavior) ⇒
299         chargingBehavior
300
301       case _ ⇒
302         // It does not matter if this is entered by multiple threads and more than one instance of the same class
303         // is created. The returned instance is not meant to be cached.
304         try {
305           val chargingBehavior = newInstance[ChargingBehavior](className)
306           _chargingBehaviorMap synchronized {
307             _chargingBehaviorMap = _chargingBehaviorMap.updated(className, chargingBehavior)
308           }
309
310           chargingBehavior
311         }
312         catch {
313           case e: Exception ⇒
314             throw new AquariumInternalError("Could not load charging behavior %s".format(className), e)
315         }
316     }
317   }
318
319   def defaultPolicyModel = apply(EnvKeys.defaultPolicyModel)
320
321   def defaultClassLoader = apply(EnvKeys.defaultClassLoader)
322
323   def resourceEventStore = apply(EnvKeys.storeProvider).resourceEventStore
324
325   def imEventStore = apply(EnvKeys.storeProvider).imEventStore
326
327   def userStateStore = apply(EnvKeys.storeProvider).userStateStore
328
329   def policyStore = this.cachingPolicyStore
330
331   def eventsStoreFolder = apply(EnvKeys.eventsStoreFolder)
332
333   def eventBus = apply(EnvKeys.eventBus)
334
335   def chargingService = apply(EnvKeys.chargingService)
336
337   def userStateTimestampThreshold = apply(EnvKeys.userStateTimestampThreshold)
338
339   def adminCookie = apply(EnvKeys.adminCookie)
340
341   def converters = apply(EnvKeys.converters)
342
343   def saveResourceEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveRCEvents)
344
345   def saveIMEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveIMEvents)
346
347   def timerService = apply(EnvKeys.timerService)
348
349   def restPort = apply(EnvKeys.restPort)
350
351   def akkaService = apply(EnvKeys.akkaService)
352
353   def version = apply(EnvKeys.version)
354 }
355
356 object Aquarium {
357   final val PropsToShow = List(
358     SysProp.JavaVMName,
359     SysProp.JavaVersion,
360     SysProp.JavaHome,
361     SysProp.JavaClassVersion,
362     SysProp.JavaLibraryPath,
363     SysProp.JavaClassPath,
364     SysProp.JavaIOTmpDir,
365     SysProp.UserName,
366     SysProp.UserHome,
367     SysProp.UserDir,
368     SysProp.FileEncoding
369   )
370
371   object HTTP {
372    final val RESTAdminHeaderName = "X-Aquarium-Admin-Cookie"
373    final val RESTAdminHeaderNameLowerCase = RESTAdminHeaderName.toLowerCase
374  }
375
376   final class AquariumEnvKey[T: Manifest](override val name: String) extends TypedKeySkeleton[T](name) {
377     override def toString = "%s(%s)".format(manifest[T], name)
378   }
379
380   final val ServiceKeys: List[TypedKey[_ <: Lifecycle]] = List(
381     EnvKeys.timerService,
382     EnvKeys.akkaService,
383     EnvKeys.eventBus,
384     EnvKeys.restService,
385     EnvKeys.rabbitMQService,
386     EnvKeys.storeWatcherService
387   )
388
389   object EnvKeys {
390     /**
391      * The Aquarium version. Will be reported in any due occasion.
392      */
393     final val version = StringKey("version")
394
395     final val originalProps: TypedKey[Props] =
396       new AquariumEnvKey[Props]("originalProps")
397
398     /**
399      * The fully qualified name of the class that implements the `StoreProvider`.
400      * Will be instantiated reflectively and should have a public default constructor.
401      */
402     final val storeProvider: TypedKey[StoreProvider] =
403       new AquariumEnvKey[StoreProvider]("store.provider.class")
404
405     /**
406      * If a value is given to this property, then it represents a folder where all events coming to aquarium are
407      * saved.
408      *
409      * This is for debugging purposes.
410      */
411     final val eventsStoreFolder: TypedKey[Option[File]] =
412       new AquariumEnvKey[Option[File]]("events.store.folder")
413
414     /**
415      * If this is `true` and `events.store.folder` is defined, then all resource events are
416      * also stored in `events.store.folder`.
417      *
418      * This is for debugging purposes.
419      */
420
421     final val eventsStoreSaveRCEvents = BooleanKey("events.store.save.rc.events")
422
423     /**
424      * If this is `true` and `events.store.folder` is defined, then all IM events are
425      * also stored in `events.store.folder`.
426      *
427      * This is for debugging purposes.
428      */
429     final val eventsStoreSaveIMEvents = BooleanKey("events.store.save.im.events")
430
431     /**
432      * A time period in milliseconds for which we can tolerate stale parts regarding user state.
433      *
434      * The smaller the value, the more accurate the user credits and other state parts are.
435      *
436      * If a request for user state (e.g. balance) is received and the request timestamp exceeds
437      * the timestamp of the last known balance amount by this value, then a re-computation for
438      * the balance is triggered.
439      */
440     final val userStateTimestampThreshold = LongKey("user.state.timestamp.threshold")
441
442     /**
443      * REST service listening port.
444      */
445     final val restPort = IntKey("rest.port")
446
447     final val restShutdownTimeoutMillis = LongKey("rest.shutdown.timeout.millis")
448
449     /**
450      * A cookie used in every administrative REST API call, so that Aquarium knows it comes from
451      * an authorised client.
452      */
453     final val adminCookie: TypedKey[Option[String]] =
454       new AquariumEnvKey[Option[String]]("admin.cookie")
455
456     /**
457      * The class that initializes the REST service
458      */
459     final val restService: TypedKey[Lifecycle] =
460       new AquariumEnvKey[Lifecycle]("rest.service.class")
461
462     final val akkaService: TypedKey[AkkaService] =
463       new AquariumEnvKey[AkkaService]("akka.service")
464
465     final val eventBus: TypedKey[EventBusService] =
466       new AquariumEnvKey[EventBusService]("event.bus.service")
467
468     final val timerService: TypedKey[TimerService] =
469       new AquariumEnvKey[TimerService]("timer.service")
470
471     final val rabbitMQService: TypedKey[RabbitMQService] =
472       new AquariumEnvKey[RabbitMQService]("rabbitmq.service")
473
474     final val storeWatcherService: TypedKey[StoreWatcherService] =
475       new AquariumEnvKey[StoreWatcherService]("store.watcher.service")
476
477     final val converters: TypedKey[Converters] =
478       new AquariumEnvKey[Converters]("converters")
479
480     final val chargingService: TypedKey[ChargingService] =
481       new AquariumEnvKey[ChargingService]("charging.service")
482
483     final val defaultClassLoader: TypedKey[ClassLoader] =
484       new AquariumEnvKey[ClassLoader]("default.class.loader")
485
486     final val defaultPolicyModel: TypedKey[PolicyModel] =
487       new AquariumEnvKey[PolicyModel]("default.policy.model")
488   }
489 }