Fix initialization order error for DefaultContext
[aquarium] / src / main / scala / gr / grnet / aquarium / Aquarium.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium
37
38 import com.ckkloverdos.convert.Converters
39 import com.ckkloverdos.env.Env
40 import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey}
41 import com.ckkloverdos.maybe._
42 import com.ckkloverdos.props.Props
43 import com.ckkloverdos.sys.SysProp
44 import connector.rabbitmq.RabbitMQProducer
45 import gr.grnet.aquarium.charging.{ChargingService, ChargingBehavior}
46 import gr.grnet.aquarium.message.avro.gen.{UserAgreementMsg, FullPriceTableMsg, IMEventMsg, ResourceTypeMsg, PolicyMsg}
47 import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, ModelFactory, AvroHelpers}
48 import gr.grnet.aquarium.policy.{AdHocFullPriceTableRef, FullPriceTableModel, PolicyModel, CachingPolicyStore, PolicyDefinedFullPriceTableRef, UserAgreementModel, ResourceType}
49 import gr.grnet.aquarium.service.event.AquariumCreatedEvent
50 import gr.grnet.aquarium.service.{StoreWatcherService, RabbitMQService, TimerService, EventBusService, AkkaService}
51 import gr.grnet.aquarium.store.StoreProvider
52 import gr.grnet.aquarium.util.date.TimeHelpers
53 import gr.grnet.aquarium.util.{Loggable, Lifecycle}
54 import java.io.File
55 import java.util.concurrent.atomic.AtomicBoolean
56 import org.slf4j.{LoggerFactory, Logger}
57 import java.util.{Map ⇒ JMap}
58 import java.util.{HashMap ⇒ JHashMap}
59
60 /**
61  *
62  * @author Christos KK Loverdos <loverdos@gmail.com>
63  */
64
65 final class Aquarium(env: Env) extends Lifecycle with Loggable {
66
67   import Aquarium.EnvKeys
68
69   @volatile private[this] var _chargingBehaviorMap = Map[String, ChargingBehavior]()
70
71   // Caching value for the latest resource mapping
72   @volatile private[this] var _resourceMapping = apply(EnvKeys.defaultPolicyMsg).getResourceMapping
73
74   private[this] lazy val cachingPolicyStore = new CachingPolicyStore(
75     apply(EnvKeys.defaultPolicyMsg),
76     apply(EnvKeys.storeProvider).policyStore
77   )
78
79   private[this] val _isStopping = new AtomicBoolean(false)
80
81   override def toString = "%s/v%s".format(getClass.getName, version)
82
83   def isStopping() = _isStopping.get()
84
85   @inline
86   def getClientLogger(client: AnyRef): Logger = {
87     client match {
88       case null ⇒
89         this.logger
90
91       case _ ⇒
92         LoggerFactory.getLogger(client.getClass)
93     }
94   }
95
96   def debug(client: AnyRef, fmt: String, args: Any*) = {
97     getClientLogger(client).debug(fmt.format(args: _*))
98   }
99
100   def info(client: AnyRef, fmt: String, args: Any*) = {
101     getClientLogger(client).info(fmt.format(args: _*))
102   }
103
104   def warn(client: AnyRef, fmt: String, args: Any*) = {
105     getClientLogger(client).warn(fmt.format(args: _*))
106   }
107
108   @throws(classOf[AquariumInternalError])
109   def apply[T: Manifest](key: TypedKey[T]): T = {
110     try {
111      env.getEx(key)
112     } catch {
113       case e: Exception ⇒
114         throw new AquariumInternalError("Could not locate %s in Aquarium environment".format(key))
115     }
116   }
117
118   private[this] lazy val _allServices: Seq[_ <: Lifecycle] = Aquarium.ServiceKeys.map(this.apply(_))
119
120   private[this] def startServices(): Unit = {
121     for(service ← _allServices) {
122       logStartingF(service.toString) {
123         service.start()
124       } {}
125     }
126   }
127
128   private[this] def stopServices(): Unit = {
129     val services = _allServices.reverse
130
131     for(service ← services) {
132       logStoppingF(service.toString) {
133         safeUnit(service.stop())
134       } {}
135     }
136   }
137
138   private[this] def showBasicConfiguration(): Unit = {
139     for(folder ← this.eventsStoreFolder) {
140       logger.info("{} = {}", EnvKeys.eventsStoreFolder.name, folder)
141     }
142     this.eventsStoreFolder.throwMe // on error
143
144     logger.info("default policy = {}", AvroHelpers.jsonStringOfSpecificRecord(defaultPolicyMsg))
145   }
146
147   private[this] def addShutdownHooks(): Unit = {
148     Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
149       def run = {
150         if(!_isStopping.get()) {
151           logStoppingF("Aquarium") {
152             stop()
153           } {}
154         }
155       }
156     }))
157   }
158
159   def start(): Unit = {
160     this._isStopping.set(false)
161     showBasicConfiguration()
162     addShutdownHooks()
163     startServices()
164   }
165
166   def stop(): Unit = {
167     this._isStopping.set(true)
168     stopServices()
169   }
170
171   /**
172    * Stops Aquarium after the given millis. Used during testing.
173    */
174   def stopAfterMillis(millis: Long) {
175     Thread sleep millis
176     stop()
177   }
178
179   /**
180    * Reflectively provide a new instance of a class and configure it appropriately.
181    */
182   def newInstance[C <: AnyRef](_class: Class[C]): C = {
183     newInstance(_class.getName)
184   }
185
186   /**
187    * Reflectively provide a new instance of a class and configure it appropriately.
188    */
189   def newInstance[C <: AnyRef](className: String): C = {
190     val originalProps = apply(EnvKeys.originalProps)
191
192     val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
193     instanceM match {
194       case Just(instance) ⇒
195 //        eventBus.addSubscriber[C](instance)
196         instance match {
197           case aquariumAware: AquariumAware ⇒
198             aquariumAware.awareOfAquarium(AquariumCreatedEvent(this))
199
200           case _ ⇒
201         }
202
203         instance match {
204           case configurable: Configurable if (originalProps ne null) ⇒
205             val localProps = configurable.propertyPrefix match {
206               case somePrefix @ Some(prefix) ⇒
207                 if(prefix.length == 0) {
208                   logger.warn(
209                     "Property prefix for %s is %s. Consider using None".format(instance, somePrefix))
210                 }
211
212                 originalProps.subsetForKeyPrefix(prefix)
213
214               case None ⇒
215                 originalProps
216             }
217
218             logger.debug("Configuring {} with props (prefix={})", configurable.getClass.getName, configurable.propertyPrefix)
219             MaybeEither(configurable configure localProps) match {
220               case Just(_) ⇒
221                 logger.info("Configured {} with props (prefix={})", configurable.getClass.getName, configurable.propertyPrefix)
222
223               case Failed(e) ⇒
224                 throw new AquariumInternalError("Could not configure instance of %s".format(className), e)
225             }
226
227           case _ ⇒
228         }
229
230         instance
231
232       case Failed(e) ⇒
233         throw new AquariumInternalError("Could not instantiate %s".format(className), e)
234     }
235
236   }
237
238   /**
239    * @deprecated Use `currentResourceMapping` instead
240    */
241   def resourceMappingAtMillis(millis: Long): JMap[String, ResourceTypeMsg] = {
242     val policyMspOpt = policyStore.loadPolicyAt(millis)
243     if(policyMspOpt.isEmpty) {
244       throw new AquariumInternalError(
245         "Cannot get resource mapping. Not even the default policy found for time %s",
246         TimeHelpers.toYYYYMMDDHHMMSSSSS(millis)
247       )
248     }
249
250     val policyMsg = policyMspOpt.get
251     policyMsg.getResourceMapping
252   }
253
254   /**
255    * Provides the current resource mapping. This value is cached.
256    *
257    * NOTE: The assumption is that the resource mapping is always updated with new keys,
258    *       that is we allow only the addition of new resource types.
259    */
260   def currentResourceMapping = {
261     this._resourceMapping synchronized this._resourceMapping
262   }
263
264   //  def resourceTypesMapAtMillis(millis: Long): Map[String, ResourceType] = {
265 //    val policyMspOpt = policyStore.loadPolicyAt(millis)
266 //    if(policyMspOpt.isEmpty) {
267 //      throw new AquariumInternalError(
268 //        "Cannot get resource types map. Not even the default policy found for time %s",
269 //        TimeHelpers.toYYYYMMDDHHMMSSSSS(millis)
270 //      )
271 //    }
272 //
273 //    val policyMsg = policyMspOpt.get
274 //    // TODO optimize
275 //    ModelFactory.newPolicyModel(policyMsg).resourceTypesMap
276 //  }
277 //
278 //  def currentResourceTypesMap: Map[String, ResourceType] = {
279 //    resourceTypesMapAtMillis(TimeHelpers.nowMillis())
280 //  }
281
282   def unsafeValidPolicyModelAt(referenceTimeMillis: Long): PolicyModel = {
283     policyStore.loadPolicyAt(referenceTimeMillis) match {
284       case None ⇒
285         throw new AquariumInternalError(
286           "No policy found at %s".format(TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis))
287         )
288
289       case Some(policyMsg) ⇒
290         ModelFactory.newPolicyModel(policyMsg)
291     }
292   }
293
294   def unsafeValidPolicyAt(referenceTimeMillis: Long): PolicyMsg = {
295     unsafeValidPolicyModelAt(referenceTimeMillis).msg
296   }
297
298   def unsafeFullPriceTableModelForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTableModel = {
299     val policyModelAtReferenceTime = unsafeValidPolicyModelAt(referenceTimeMillis)
300
301     policyModelAtReferenceTime.roleMapping.get(role) match {
302       case None ⇒
303         throw new AquariumInternalError("Unknown price table for role %s at %s".format(
304           role,
305           TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis)
306         ))
307
308       case Some(fullPriceTable) ⇒
309         fullPriceTable
310     }
311   }
312
313   def unsafeFullPriceTableForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTableMsg = {
314     val policyAtReferenceTime = unsafeValidPolicyAt(referenceTimeMillis)
315     policyAtReferenceTime.getRoleMapping.get(role) match {
316       case null ⇒
317         throw new AquariumInternalError("Unknown price table for role %s at %s".format(
318           role,
319           TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis)
320         ))
321
322       case fullPriceTable ⇒
323         fullPriceTable
324     }
325   }
326
327   def unsafeFullPriceTableModelForAgreement(
328       userAgreementModel: UserAgreementModel,
329       knownPolicyModel: PolicyModel
330   ): FullPriceTableModel = {
331     val policyModel = knownPolicyModel match {
332       case null ⇒
333         unsafeValidPolicyModelAt(userAgreementModel.validFromMillis)
334
335       case policyModel ⇒
336         policyModel
337     }
338
339     userAgreementModel.fullPriceTableRef match {
340       case PolicyDefinedFullPriceTableRef ⇒
341         val role = userAgreementModel.role
342         policyModel.roleMapping.get(role) match {
343           case None ⇒
344             throw new AquariumInternalError("Unknown role %s while computing full price table for user %s at %s",
345               role,
346               userAgreementModel.userID,
347               TimeHelpers.toYYYYMMDDHHMMSSSSS(userAgreementModel.validFromMillis)
348             )
349
350           case Some(fullPriceTable) ⇒
351             fullPriceTable
352         }
353
354       case AdHocFullPriceTableRef(fullPriceTable) ⇒
355         fullPriceTable
356     }
357   }
358
359   def unsafeFullPriceTableForAgreement(
360       userAgreement: UserAgreementMsg,
361       knownPolicyModel: PolicyModel
362   ): FullPriceTableMsg = {
363
364     val policyModel = knownPolicyModel match {
365       case null ⇒
366         unsafeValidPolicyModelAt(userAgreement.getValidFromMillis)
367
368       case policyModel ⇒
369         policyModel
370     }
371
372     unsafeFullPriceTableForAgreement(userAgreement, policyModel.msg)
373   }
374
375   def unsafeFullPriceTableForAgreement(
376      userAgreement: UserAgreementMsg,
377      knownPolicy: PolicyMsg
378   ): FullPriceTableMsg = {
379     val policy = knownPolicy match {
380       case null ⇒
381         unsafeValidPolicyAt(userAgreement.getValidFromMillis)
382
383       case policy ⇒
384         policy
385     }
386
387     val role = userAgreement.getRole
388     userAgreement.getFullPriceTableRef match {
389       case null ⇒
390         policy.getRoleMapping.get(role) match {
391           case null ⇒
392             throw new AquariumInternalError("Unknown role %s while computing full price table for user %s at %s",
393               role,
394               userAgreement.getUserID,
395               TimeHelpers.toYYYYMMDDHHMMSSSSS(userAgreement.getValidFromMillis)
396             )
397
398           case fullPriceTable ⇒
399             fullPriceTable
400         }
401
402       case fullPriceTable ⇒
403         fullPriceTable
404     }
405  }
406
407   /**
408    * Computes the initial user agreement for the given role and reference time. Also,
409    * records the ID from a potential related IMEvent.
410    *
411    * @param imEvent       The IMEvent that creates the user
412    */
413   def initialUserAgreement(imEvent: IMEventMsg): UserAgreementModel = {
414     require(MessageHelpers.isIMEventCreate(imEvent))
415
416     val role = imEvent.getRole
417     val referenceTimeMillis = imEvent.getOccurredMillis
418
419     // Just checking
420     assert(null ne unsafeFullPriceTableModelForRoleAt(role, referenceTimeMillis))
421
422     ModelFactory.newUserAgreementModelFromIMEvent(imEvent)
423   }
424
425   def initialUserBalance(role: String, referenceTimeMillis: Long): Real = {
426     // FIXME: Where is the mapping?
427     Real.Zero
428   }
429
430   def chargingBehaviorOf(resourceType: ResourceTypeMsg): ChargingBehavior = {
431     // A resource type never changes charging behavior. By definition.
432     val className = resourceType.getChargingBehaviorClass
433     _chargingBehaviorMap.get(className) match {
434       case Some(chargingBehavior) ⇒
435         chargingBehavior
436
437       case _ ⇒
438         try {
439           _chargingBehaviorMap synchronized {
440             val chargingBehavior = newInstance[ChargingBehavior](className)
441             _chargingBehaviorMap = _chargingBehaviorMap.updated(className, chargingBehavior)
442             chargingBehavior
443           }
444         }
445         catch {
446           case e: Exception ⇒
447             throw new AquariumInternalError("Could not load charging behavior %s".format(className), e)
448         }
449     }
450   }
451
452   def defaultPolicyMsg = apply(EnvKeys.defaultPolicyMsg)
453
454   def defaultClassLoader = apply(EnvKeys.defaultClassLoader)
455
456   def resourceEventStore = apply(EnvKeys.storeProvider).resourceEventStore
457
458   def imEventStore = apply(EnvKeys.storeProvider).imEventStore
459
460   def userStateStore = apply(EnvKeys.storeProvider).userStateStore
461
462   def policyStore = this.cachingPolicyStore
463
464   def eventsStoreFolder = apply(EnvKeys.eventsStoreFolder)
465
466   def eventBus = apply(EnvKeys.eventBus)
467
468   def chargingService = apply(EnvKeys.chargingService)
469
470   def userStateTimestampThreshold = apply(EnvKeys.userStateTimestampThreshold)
471
472   def adminCookie = apply(EnvKeys.adminCookie)
473
474   def converters = apply(EnvKeys.converters)
475
476   def saveResourceEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveRCEvents)
477
478   def saveIMEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveIMEvents)
479
480   def timerService = apply(EnvKeys.timerService)
481
482   def restPort = apply(EnvKeys.restPort)
483
484   def akkaService = apply(EnvKeys.akkaService)
485
486   def version = apply(EnvKeys.version)
487 }
488
489 object Aquarium {
490   final val PropsToShow = List(
491     SysProp.JavaVMName,
492     SysProp.JavaVersion,
493     SysProp.JavaHome,
494     SysProp.JavaClassVersion,
495     SysProp.JavaLibraryPath,
496     SysProp.JavaClassPath,
497     SysProp.JavaIOTmpDir,
498     SysProp.UserName,
499     SysProp.UserHome,
500     SysProp.UserDir,
501     SysProp.FileEncoding
502   )
503
504   object HTTP {
505    final val RESTAdminHeaderName = "X-Aquarium-Admin-Cookie"
506    final val RESTAdminHeaderNameLowerCase = RESTAdminHeaderName.toLowerCase
507  }
508
509   final class AquariumEnvKey[T: Manifest](override val name: String) extends TypedKeySkeleton[T](name) {
510     override def toString = "%s(%s)".format(manifest[T], name)
511   }
512
513   final val ServiceKeys: List[TypedKey[_ <: Lifecycle]] = List(
514     EnvKeys.timerService,
515     EnvKeys.akkaService,
516     EnvKeys.eventBus,
517     EnvKeys.restService,
518     EnvKeys.rabbitMQService,
519     EnvKeys.storeWatcherService,
520     EnvKeys.rabbitMQProducer
521   )
522
523   object EnvKeys {
524     /**
525      * The Aquarium version. Will be reported in any due occasion.
526      */
527     final val version = StringKey("version")
528
529     final val originalProps: TypedKey[Props] =
530       new AquariumEnvKey[Props]("originalProps")
531
532     /**
533      * The fully qualified name of the class that implements the `StoreProvider`.
534      * Will be instantiated reflectively and should have a public default constructor.
535      */
536     final val storeProvider: TypedKey[StoreProvider] =
537       new AquariumEnvKey[StoreProvider]("store.provider.class")
538
539     /**
540      * If a value is given to this property, then it represents a folder where all events coming to aquarium are
541      * saved.
542      *
543      * This is for debugging purposes.
544      */
545     final val eventsStoreFolder: TypedKey[Option[File]] =
546       new AquariumEnvKey[Option[File]]("events.store.folder")
547
548     /**
549      * If this is `true` and `events.store.folder` is defined, then all resource events are
550      * also stored in `events.store.folder`.
551      *
552      * This is for debugging purposes.
553      */
554
555     final val eventsStoreSaveRCEvents = BooleanKey("events.store.save.rc.events")
556
557     /**
558      * If this is `true` and `events.store.folder` is defined, then all IM events are
559      * also stored in `events.store.folder`.
560      *
561      * This is for debugging purposes.
562      */
563     final val eventsStoreSaveIMEvents = BooleanKey("events.store.save.im.events")
564
565     /**
566      * A time period in milliseconds for which we can tolerate stale parts regarding user state.
567      *
568      * The smaller the value, the more accurate the user credits and other state parts are.
569      *
570      * If a request for user state (e.g. balance) is received and the request timestamp exceeds
571      * the timestamp of the last known balance amount by this value, then a re-computation for
572      * the balance is triggered.
573      */
574     final val userStateTimestampThreshold = LongKey("user.state.timestamp.threshold")
575
576     /**
577      * REST service listening port.
578      */
579     final val restPort = IntKey("rest.port")
580
581     final val restShutdownTimeoutMillis = LongKey("rest.shutdown.timeout.millis")
582
583     /**
584      * A cookie used in every administrative REST API call, so that Aquarium knows it comes from
585      * an authorised client.
586      */
587     final val adminCookie: TypedKey[Option[String]] =
588       new AquariumEnvKey[Option[String]]("admin.cookie")
589
590     /**
591      * The class that initializes the REST service
592      */
593     final val restService: TypedKey[Lifecycle] =
594       new AquariumEnvKey[Lifecycle]("rest.service.class")
595
596     final val akkaService: TypedKey[AkkaService] =
597       new AquariumEnvKey[AkkaService]("akka.service")
598
599     final val eventBus: TypedKey[EventBusService] =
600       new AquariumEnvKey[EventBusService]("event.bus.service")
601
602     final val timerService: TypedKey[TimerService] =
603       new AquariumEnvKey[TimerService]("timer.service")
604
605     final val rabbitMQService: TypedKey[RabbitMQService] =
606       new AquariumEnvKey[RabbitMQService]("rabbitmq.service")
607
608     final val rabbitMQProducer: TypedKey[RabbitMQProducer] =
609       new AquariumEnvKey[RabbitMQProducer]("rabbitmq.client")
610
611     final val storeWatcherService: TypedKey[StoreWatcherService] =
612       new AquariumEnvKey[StoreWatcherService]("store.watcher.service")
613
614     final val converters: TypedKey[Converters] =
615       new AquariumEnvKey[Converters]("converters")
616
617     final val chargingService: TypedKey[ChargingService] =
618       new AquariumEnvKey[ChargingService]("charging.service")
619
620     final val defaultClassLoader: TypedKey[ClassLoader] =
621       new AquariumEnvKey[ClassLoader]("default.class.loader")
622
623     final val defaultPolicyMsg: TypedKey[PolicyMsg] =
624       new AquariumEnvKey[PolicyMsg]("default.policy.msg")
625   }
626 }