7c404d4e8fbdea618f42df6240b6b3edab5d9cf0
[aquarium] / src / main / scala / gr / grnet / aquarium / Aquarium.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium
37
38 import com.ckkloverdos.convert.Converters
39 import com.ckkloverdos.env.Env
40 import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey}
41 import com.ckkloverdos.maybe._
42 import com.ckkloverdos.props.Props
43 import com.ckkloverdos.sys.SysProp
44 import connector.rabbitmq.RabbitMQProducer
45 import gr.grnet.aquarium.charging.{ChargingService, ChargingBehavior}
46 import gr.grnet.aquarium.message.avro.gen.{UserAgreementMsg, FullPriceTableMsg, IMEventMsg, ResourceTypeMsg, PolicyMsg}
47 import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, ModelFactory, AvroHelpers}
48 import gr.grnet.aquarium.policy.{AdHocFullPriceTableRef, FullPriceTableModel, PolicyModel, CachingPolicyStore, PolicyDefinedFullPriceTableRef, UserAgreementModel, ResourceType}
49 import gr.grnet.aquarium.service.event.AquariumCreatedEvent
50 import gr.grnet.aquarium.service.{StoreWatcherService, RabbitMQService, TimerService, EventBusService, AkkaService}
51 import gr.grnet.aquarium.store.StoreProvider
52 import gr.grnet.aquarium.util.date.TimeHelpers
53 import gr.grnet.aquarium.util.{Loggable, Lifecycle}
54 import java.io.File
55 import java.util.concurrent.atomic.AtomicBoolean
56 import org.slf4j.{LoggerFactory, Logger}
57 import gr.grnet.aquarium.event.CreditsModel
58 import gr.grnet.aquarium.charging.state.UserStateBootstrap
59 import java.util.{Map ⇒ JMap}
60
61 /**
62  *
63  * @author Christos KK Loverdos <loverdos@gmail.com>
64  */
65
66 final class Aquarium(env: Env) extends Lifecycle with Loggable {
67
68   import Aquarium.EnvKeys
69
70   @volatile private[this] var _chargingBehaviorMap = Map[String, ChargingBehavior]()
71
72   private[this] lazy val cachingPolicyStore = new CachingPolicyStore(
73     apply(EnvKeys.defaultPolicyMsg),
74     apply(EnvKeys.storeProvider).policyStore
75   )
76
77   private[this] val _isStopping = new AtomicBoolean(false)
78
79   override def toString = "%s/v%s".format(getClass.getName, version)
80
81   def isStopping() = _isStopping.get()
82
83   @inline
84   def getClientLogger(client: AnyRef): Logger = {
85     client match {
86       case null ⇒
87         this.logger
88
89       case _ ⇒
90         LoggerFactory.getLogger(client.getClass)
91     }
92   }
93
94   def debug(client: AnyRef, fmt: String, args: Any*) = {
95     getClientLogger(client).debug(fmt.format(args: _*))
96   }
97
98   def info(client: AnyRef, fmt: String, args: Any*) = {
99     getClientLogger(client).info(fmt.format(args: _*))
100   }
101
102   def warn(client: AnyRef, fmt: String, args: Any*) = {
103     getClientLogger(client).warn(fmt.format(args: _*))
104   }
105
106   @throws(classOf[AquariumInternalError])
107   def apply[T: Manifest](key: TypedKey[T]): T = {
108     try {
109      env.getEx(key)
110     } catch {
111       case e: Exception ⇒
112         throw new AquariumInternalError("Could not locate %s in Aquarium environment".format(key))
113     }
114   }
115
116   private[this] lazy val _allServices: Seq[_ <: Lifecycle] = Aquarium.ServiceKeys.map(this.apply(_))
117
118   private[this] def startServices(): Unit = {
119     for(service ← _allServices) {
120       logStartingF(service.toString) {
121         service.start()
122       } {}
123     }
124   }
125
126   private[this] def stopServices(): Unit = {
127     val services = _allServices.reverse
128
129     for(service ← services) {
130       logStoppingF(service.toString) {
131         safeUnit(service.stop())
132       } {}
133     }
134   }
135
136   private[this] def showBasicConfiguration(): Unit = {
137     for(folder ← this.eventsStoreFolder) {
138       logger.info("{} = {}", EnvKeys.eventsStoreFolder.name, folder)
139     }
140     this.eventsStoreFolder.throwMe // on error
141
142     logger.info("default policy = {}", AvroHelpers.jsonStringOfSpecificRecord(defaultPolicyMsg))
143   }
144
145   private[this] def addShutdownHooks(): Unit = {
146     Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
147       def run = {
148         if(!_isStopping.get()) {
149           logStoppingF("Aquarium") {
150             stop()
151           } {}
152         }
153       }
154     }))
155   }
156
157   def start(): Unit = {
158     this._isStopping.set(false)
159     showBasicConfiguration()
160     addShutdownHooks()
161     startServices()
162   }
163
164   def stop(): Unit = {
165     this._isStopping.set(true)
166     stopServices()
167   }
168
169   /**
170    * Stops Aquarium after the given millis. Used during testing.
171    */
172   def stopAfterMillis(millis: Long) {
173     Thread sleep millis
174     stop()
175   }
176
177   /**
178    * Reflectively provide a new instance of a class and configure it appropriately.
179    */
180   def newInstance[C <: AnyRef](_class: Class[C]): C = {
181     newInstance(_class.getName)
182   }
183
184   /**
185    * Reflectively provide a new instance of a class and configure it appropriately.
186    */
187   def newInstance[C <: AnyRef](className: String): C = {
188     val originalProps = apply(EnvKeys.originalProps)
189
190     val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
191     instanceM match {
192       case Just(instance) ⇒
193 //        eventBus.addSubscriber[C](instance)
194         instance match {
195           case aquariumAware: AquariumAware ⇒
196             aquariumAware.awareOfAquarium(AquariumCreatedEvent(this))
197
198           case _ ⇒
199         }
200
201         instance match {
202           case configurable: Configurable if (originalProps ne null) ⇒
203             val localProps = configurable.propertyPrefix match {
204               case somePrefix @ Some(prefix) ⇒
205                 if(prefix.length == 0) {
206                   logger.warn(
207                     "Property prefix for %s is %s. Consider using None".format(instance, somePrefix))
208                 }
209
210                 originalProps.subsetForKeyPrefix(prefix)
211
212               case None ⇒
213                 originalProps
214             }
215
216             logger.debug("Configuring {} with props (prefix={})", configurable.getClass.getName, configurable.propertyPrefix)
217             MaybeEither(configurable configure localProps) match {
218               case Just(_) ⇒
219                 logger.info("Configured {} with props (prefix={})", configurable.getClass.getName, configurable.propertyPrefix)
220
221               case Failed(e) ⇒
222                 throw new AquariumInternalError("Could not configure instance of %s".format(className), e)
223             }
224
225           case _ ⇒
226         }
227
228         instance
229
230       case Failed(e) ⇒
231         throw new AquariumInternalError("Could not instantiate %s".format(className), e)
232     }
233
234   }
235
236   def resourceMappingAtMillis(millis: Long): JMap[String, ResourceTypeMsg] = {
237     val policyMspOpt = policyStore.loadPolicyAt(millis)
238     if(policyMspOpt.isEmpty) {
239       throw new AquariumInternalError(
240         "Cannot get resource mapping. Not even the default policy found for time %s",
241         TimeHelpers.toYYYYMMDDHHMMSSSSS(millis)
242       )
243     }
244
245     val policyMsg = policyMspOpt.get
246     policyMsg.getResourceMapping
247   }
248
249 //  def resourceTypesMapAtMillis(millis: Long): Map[String, ResourceType] = {
250 //    val policyMspOpt = policyStore.loadPolicyAt(millis)
251 //    if(policyMspOpt.isEmpty) {
252 //      throw new AquariumInternalError(
253 //        "Cannot get resource types map. Not even the default policy found for time %s",
254 //        TimeHelpers.toYYYYMMDDHHMMSSSSS(millis)
255 //      )
256 //    }
257 //
258 //    val policyMsg = policyMspOpt.get
259 //    // TODO optimize
260 //    ModelFactory.newPolicyModel(policyMsg).resourceTypesMap
261 //  }
262 //
263 //  def currentResourceTypesMap: Map[String, ResourceType] = {
264 //    resourceTypesMapAtMillis(TimeHelpers.nowMillis())
265 //  }
266
267   def unsafeValidPolicyModelAt(referenceTimeMillis: Long): PolicyModel = {
268     policyStore.loadPolicyAt(referenceTimeMillis) match {
269       case None ⇒
270         throw new AquariumInternalError(
271           "No policy found at %s".format(TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis))
272         )
273
274       case Some(policyMsg) ⇒
275         ModelFactory.newPolicyModel(policyMsg)
276     }
277   }
278
279   def unsafeValidPolicyAt(referenceTimeMillis: Long): PolicyMsg = {
280     unsafeValidPolicyModelAt(referenceTimeMillis).msg
281   }
282
283   def unsafeFullPriceTableModelForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTableModel = {
284     val policyModelAtReferenceTime = unsafeValidPolicyModelAt(referenceTimeMillis)
285
286     policyModelAtReferenceTime.roleMapping.get(role) match {
287       case None ⇒
288         throw new AquariumInternalError("Unknown price table for role %s at %s".format(
289           role,
290           TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis)
291         ))
292
293       case Some(fullPriceTable) ⇒
294         fullPriceTable
295     }
296   }
297
298   def unsafeFullPriceTableForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTableMsg = {
299     val policyAtReferenceTime = unsafeValidPolicyAt(referenceTimeMillis)
300     policyAtReferenceTime.getRoleMapping.get(role) match {
301       case null ⇒
302         throw new AquariumInternalError("Unknown price table for role %s at %s".format(
303           role,
304           TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis)
305         ))
306
307       case fullPriceTable ⇒
308         fullPriceTable
309     }
310   }
311
312   def unsafeFullPriceTableModelForAgreement(
313       userAgreementModel: UserAgreementModel,
314       knownPolicyModel: PolicyModel
315   ): FullPriceTableModel = {
316     val policyModel = knownPolicyModel match {
317       case null ⇒
318         unsafeValidPolicyModelAt(userAgreementModel.validFromMillis)
319
320       case policyModel ⇒
321         policyModel
322     }
323
324     userAgreementModel.fullPriceTableRef match {
325       case PolicyDefinedFullPriceTableRef ⇒
326         val role = userAgreementModel.role
327         policyModel.roleMapping.get(role) match {
328           case None ⇒
329             throw new AquariumInternalError("Unknown role %s while computing full price table for user %s at %s",
330               role,
331               userAgreementModel.userID,
332               TimeHelpers.toYYYYMMDDHHMMSSSSS(userAgreementModel.validFromMillis)
333             )
334
335           case Some(fullPriceTable) ⇒
336             fullPriceTable
337         }
338
339       case AdHocFullPriceTableRef(fullPriceTable) ⇒
340         fullPriceTable
341     }
342   }
343
344   def unsafeFullPriceTableForAgreement(
345       userAgreement: UserAgreementMsg,
346       knownPolicyModel: PolicyModel
347   ): FullPriceTableMsg = {
348
349     val policyModel = knownPolicyModel match {
350       case null ⇒
351         unsafeValidPolicyModelAt(userAgreement.getValidFromMillis)
352
353       case policyModel ⇒
354         policyModel
355     }
356
357     unsafeFullPriceTableForAgreement(userAgreement, policyModel.msg)
358   }
359
360   def unsafeFullPriceTableForAgreement(
361      userAgreement: UserAgreementMsg,
362      knownPolicy: PolicyMsg
363   ): FullPriceTableMsg = {
364     val policy = knownPolicy match {
365       case null ⇒
366         unsafeValidPolicyAt(userAgreement.getValidFromMillis)
367
368       case policy ⇒
369         policy
370     }
371
372     val role = userAgreement.getRole
373     userAgreement.getFullPriceTableRef match {
374       case null ⇒
375         policy.getRoleMapping.get(role) match {
376           case null ⇒
377             throw new AquariumInternalError("Unknown role %s while computing full price table for user %s at %s",
378               role,
379               userAgreement.getUserID,
380               TimeHelpers.toYYYYMMDDHHMMSSSSS(userAgreement.getValidFromMillis)
381             )
382
383           case fullPriceTable ⇒
384             fullPriceTable
385         }
386
387       case fullPriceTable ⇒
388         fullPriceTable
389     }
390  }
391
392   /**
393    * Computes the initial user agreement for the given role and reference time. Also,
394    * records the ID from a potential related IMEvent.
395    *
396    * @param imEvent       The IMEvent that creates the user
397    */
398   def initialUserAgreement(imEvent: IMEventMsg): UserAgreementModel = {
399     require(MessageHelpers.isIMEventCreate(imEvent))
400
401     val role = imEvent.getRole
402     val referenceTimeMillis = imEvent.getOccurredMillis
403
404     // Just checking
405     assert(null ne unsafeFullPriceTableModelForRoleAt(role, referenceTimeMillis))
406
407     ModelFactory.newUserAgreementModelFromIMEvent(imEvent)
408   }
409
410   def initialUserBalance(role: String, referenceTimeMillis: Long): CreditsModel.Type = {
411     // FIXME: Where is the mapping?
412     CreditsModel.from(0.0)
413   }
414
415   def getUserStateBootstrap(imEvent: IMEventMsg): UserStateBootstrap = {
416     UserStateBootstrap(
417       this.initialUserAgreement(imEvent),
418       this.initialUserBalance(imEvent.getRole, imEvent.getOccurredMillis)
419     )
420   }
421
422   def chargingBehaviorOf(resourceType: ResourceTypeMsg): ChargingBehavior = {
423     // A resource type never changes charging behavior. By definition.
424     val className = resourceType.getChargingBehaviorClass
425     _chargingBehaviorMap.get(className) match {
426       case Some(chargingBehavior) ⇒
427         chargingBehavior
428
429       case _ ⇒
430         try {
431           _chargingBehaviorMap synchronized {
432             val chargingBehavior = newInstance[ChargingBehavior](className)
433             _chargingBehaviorMap = _chargingBehaviorMap.updated(className, chargingBehavior)
434             chargingBehavior
435           }
436         }
437         catch {
438           case e: Exception ⇒
439             throw new AquariumInternalError("Could not load charging behavior %s".format(className), e)
440         }
441     }
442   }
443
444   def defaultPolicyMsg = apply(EnvKeys.defaultPolicyMsg)
445
446   def defaultClassLoader = apply(EnvKeys.defaultClassLoader)
447
448   def resourceEventStore = apply(EnvKeys.storeProvider).resourceEventStore
449
450   def imEventStore = apply(EnvKeys.storeProvider).imEventStore
451
452   def userStateStore = apply(EnvKeys.storeProvider).userStateStore
453
454   def policyStore = this.cachingPolicyStore
455
456   def eventsStoreFolder = apply(EnvKeys.eventsStoreFolder)
457
458   def eventBus = apply(EnvKeys.eventBus)
459
460   def chargingService = apply(EnvKeys.chargingService)
461
462   def userStateTimestampThreshold = apply(EnvKeys.userStateTimestampThreshold)
463
464   def adminCookie = apply(EnvKeys.adminCookie)
465
466   def converters = apply(EnvKeys.converters)
467
468   def saveResourceEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveRCEvents)
469
470   def saveIMEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveIMEvents)
471
472   def timerService = apply(EnvKeys.timerService)
473
474   def restPort = apply(EnvKeys.restPort)
475
476   def akkaService = apply(EnvKeys.akkaService)
477
478   def version = apply(EnvKeys.version)
479 }
480
481 object Aquarium {
482   final val PropsToShow = List(
483     SysProp.JavaVMName,
484     SysProp.JavaVersion,
485     SysProp.JavaHome,
486     SysProp.JavaClassVersion,
487     SysProp.JavaLibraryPath,
488     SysProp.JavaClassPath,
489     SysProp.JavaIOTmpDir,
490     SysProp.UserName,
491     SysProp.UserHome,
492     SysProp.UserDir,
493     SysProp.FileEncoding
494   )
495
496   object HTTP {
497    final val RESTAdminHeaderName = "X-Aquarium-Admin-Cookie"
498    final val RESTAdminHeaderNameLowerCase = RESTAdminHeaderName.toLowerCase
499  }
500
501   final class AquariumEnvKey[T: Manifest](override val name: String) extends TypedKeySkeleton[T](name) {
502     override def toString = "%s(%s)".format(manifest[T], name)
503   }
504
505   final val ServiceKeys: List[TypedKey[_ <: Lifecycle]] = List(
506     EnvKeys.timerService,
507     EnvKeys.akkaService,
508     EnvKeys.eventBus,
509     EnvKeys.restService,
510     EnvKeys.rabbitMQService,
511     EnvKeys.storeWatcherService,
512     EnvKeys.rabbitMQProducer
513   )
514
515   object EnvKeys {
516     /**
517      * The Aquarium version. Will be reported in any due occasion.
518      */
519     final val version = StringKey("version")
520
521     final val originalProps: TypedKey[Props] =
522       new AquariumEnvKey[Props]("originalProps")
523
524     /**
525      * The fully qualified name of the class that implements the `StoreProvider`.
526      * Will be instantiated reflectively and should have a public default constructor.
527      */
528     final val storeProvider: TypedKey[StoreProvider] =
529       new AquariumEnvKey[StoreProvider]("store.provider.class")
530
531     /**
532      * If a value is given to this property, then it represents a folder where all events coming to aquarium are
533      * saved.
534      *
535      * This is for debugging purposes.
536      */
537     final val eventsStoreFolder: TypedKey[Option[File]] =
538       new AquariumEnvKey[Option[File]]("events.store.folder")
539
540     /**
541      * If this is `true` and `events.store.folder` is defined, then all resource events are
542      * also stored in `events.store.folder`.
543      *
544      * This is for debugging purposes.
545      */
546
547     final val eventsStoreSaveRCEvents = BooleanKey("events.store.save.rc.events")
548
549     /**
550      * If this is `true` and `events.store.folder` is defined, then all IM events are
551      * also stored in `events.store.folder`.
552      *
553      * This is for debugging purposes.
554      */
555     final val eventsStoreSaveIMEvents = BooleanKey("events.store.save.im.events")
556
557     /**
558      * A time period in milliseconds for which we can tolerate stale parts regarding user state.
559      *
560      * The smaller the value, the more accurate the user credits and other state parts are.
561      *
562      * If a request for user state (e.g. balance) is received and the request timestamp exceeds
563      * the timestamp of the last known balance amount by this value, then a re-computation for
564      * the balance is triggered.
565      */
566     final val userStateTimestampThreshold = LongKey("user.state.timestamp.threshold")
567
568     /**
569      * REST service listening port.
570      */
571     final val restPort = IntKey("rest.port")
572
573     final val restShutdownTimeoutMillis = LongKey("rest.shutdown.timeout.millis")
574
575     /**
576      * A cookie used in every administrative REST API call, so that Aquarium knows it comes from
577      * an authorised client.
578      */
579     final val adminCookie: TypedKey[Option[String]] =
580       new AquariumEnvKey[Option[String]]("admin.cookie")
581
582     /**
583      * The class that initializes the REST service
584      */
585     final val restService: TypedKey[Lifecycle] =
586       new AquariumEnvKey[Lifecycle]("rest.service.class")
587
588     final val akkaService: TypedKey[AkkaService] =
589       new AquariumEnvKey[AkkaService]("akka.service")
590
591     final val eventBus: TypedKey[EventBusService] =
592       new AquariumEnvKey[EventBusService]("event.bus.service")
593
594     final val timerService: TypedKey[TimerService] =
595       new AquariumEnvKey[TimerService]("timer.service")
596
597     final val rabbitMQService: TypedKey[RabbitMQService] =
598       new AquariumEnvKey[RabbitMQService]("rabbitmq.service")
599
600     final val rabbitMQProducer: TypedKey[RabbitMQProducer] =
601       new AquariumEnvKey[RabbitMQProducer]("rabbitmq.client")
602
603     final val storeWatcherService: TypedKey[StoreWatcherService] =
604       new AquariumEnvKey[StoreWatcherService]("store.watcher.service")
605
606     final val converters: TypedKey[Converters] =
607       new AquariumEnvKey[Converters]("converters")
608
609     final val chargingService: TypedKey[ChargingService] =
610       new AquariumEnvKey[ChargingService]("charging.service")
611
612     final val defaultClassLoader: TypedKey[ClassLoader] =
613       new AquariumEnvKey[ClassLoader]("default.class.loader")
614
615     final val defaultPolicyMsg: TypedKey[PolicyMsg] =
616       new AquariumEnvKey[PolicyMsg]("default.policy.msg")
617   }
618 }