2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium
38 import com.ckkloverdos.convert.Converters
39 import com.ckkloverdos.env.Env
40 import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey}
41 import com.ckkloverdos.maybe._
42 import com.ckkloverdos.props.Props
43 import com.ckkloverdos.sys.SysProp
44 import connector.rabbitmq.RabbitMQProducer
45 import gr.grnet.aquarium.charging.{ChargingService, ChargingBehavior}
46 import gr.grnet.aquarium.message.avro.gen.{UserAgreementMsg, FullPriceTableMsg, IMEventMsg, ResourceTypeMsg, PolicyMsg}
47 import gr.grnet.aquarium.message.avro.{MessageHelpers, MessageFactory, ModelFactory, AvroHelpers}
48 import gr.grnet.aquarium.policy.{AdHocFullPriceTableRef, FullPriceTableModel, PolicyModel, CachingPolicyStore, PolicyDefinedFullPriceTableRef, UserAgreementModel, ResourceType}
49 import gr.grnet.aquarium.service.event.AquariumCreatedEvent
50 import gr.grnet.aquarium.service.{StoreWatcherService, RabbitMQService, TimerService, EventBusService, AkkaService}
51 import gr.grnet.aquarium.store.StoreProvider
52 import gr.grnet.aquarium.util.date.TimeHelpers
53 import gr.grnet.aquarium.util.{Loggable, Lifecycle}
55 import java.util.concurrent.atomic.AtomicBoolean
56 import org.slf4j.{LoggerFactory, Logger}
57 import java.util.{Map ⇒ JMap}
58 import java.util.{HashMap ⇒ JHashMap}
62 * @author Christos KK Loverdos <loverdos@gmail.com>
65 final class Aquarium(env: Env) extends Lifecycle with Loggable {
67 import Aquarium.EnvKeys
69 @volatile private[this] var _chargingBehaviorMap = Map[String, ChargingBehavior]()
71 // Caching value for the latest resource mapping
72 @volatile private[this] var _resourceMapping = apply(EnvKeys.defaultPolicyMsg).getResourceMapping
74 private[this] lazy val cachingPolicyStore = new CachingPolicyStore(
75 apply(EnvKeys.defaultPolicyMsg),
76 apply(EnvKeys.storeProvider).policyStore
79 private[this] val _isStopping = new AtomicBoolean(false)
81 override def toString = "%s/v%s".format(getClass.getName, version)
83 def isStopping() = _isStopping.get()
86 def getClientLogger(client: AnyRef): Logger = {
92 LoggerFactory.getLogger(client.getClass)
96 def debug(client: AnyRef, fmt: String, args: Any*) = {
97 getClientLogger(client).debug(fmt.format(args: _*))
100 def info(client: AnyRef, fmt: String, args: Any*) = {
101 getClientLogger(client).info(fmt.format(args: _*))
104 def warn(client: AnyRef, fmt: String, args: Any*) = {
105 getClientLogger(client).warn(fmt.format(args: _*))
108 @throws(classOf[AquariumInternalError])
109 def apply[T: Manifest](key: TypedKey[T]): T = {
114 throw new AquariumInternalError("Could not locate %s in Aquarium environment".format(key))
118 private[this] lazy val _allServices: Seq[_ <: Lifecycle] = Aquarium.ServiceKeys.map(this.apply(_))
120 private[this] def startServices(): Unit = {
121 for(service ← _allServices) {
122 logStartingF(service.toString) {
128 private[this] def stopServices(): Unit = {
129 val services = _allServices.reverse
131 for(service ← services) {
132 logStoppingF(service.toString) {
133 safeUnit(service.stop())
138 private[this] def showBasicConfiguration(): Unit = {
139 for(folder ← this.eventsStoreFolder) {
140 logger.info("{} = {}", EnvKeys.eventsStoreFolder.name, folder)
142 this.eventsStoreFolder.throwMe // on error
144 logger.info("default policy = {}", AvroHelpers.jsonStringOfSpecificRecord(defaultPolicyMsg))
147 private[this] def addShutdownHooks(): Unit = {
148 Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
150 if(!_isStopping.get()) {
151 logStoppingF("Aquarium") {
159 def start(): Unit = {
160 this._isStopping.set(false)
161 showBasicConfiguration()
167 this._isStopping.set(true)
172 * Stops Aquarium after the given millis. Used during testing.
174 def stopAfterMillis(millis: Long) {
180 * Reflectively provide a new instance of a class and configure it appropriately.
182 def newInstance[C <: AnyRef](_class: Class[C]): C = {
183 newInstance(_class.getName)
187 * Reflectively provide a new instance of a class and configure it appropriately.
189 def newInstance[C <: AnyRef](className: String): C = {
190 val originalProps = apply(EnvKeys.originalProps)
192 val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
194 case Just(instance) ⇒
195 // eventBus.addSubscriber[C](instance)
197 case aquariumAware: AquariumAware ⇒
198 aquariumAware.awareOfAquarium(AquariumCreatedEvent(this))
204 case configurable: Configurable if (originalProps ne null) ⇒
205 val localProps = configurable.propertyPrefix match {
206 case somePrefix @ Some(prefix) ⇒
207 if(prefix.length == 0) {
209 "Property prefix for %s is %s. Consider using None".format(instance, somePrefix))
212 originalProps.subsetForKeyPrefix(prefix)
218 logger.debug("Configuring {} with props (prefix={})", configurable.getClass.getName, configurable.propertyPrefix)
219 MaybeEither(configurable configure localProps) match {
221 logger.info("Configured {} with props (prefix={})", configurable.getClass.getName, configurable.propertyPrefix)
224 throw new AquariumInternalError("Could not configure instance of %s".format(className), e)
233 throw new AquariumInternalError("Could not instantiate %s".format(className), e)
239 * @deprecated Use `currentResourceMapping` instead
241 def resourceMappingAtMillis(millis: Long): JMap[String, ResourceTypeMsg] = {
242 val policyMspOpt = policyStore.loadPolicyAt(millis)
243 if(policyMspOpt.isEmpty) {
244 throw new AquariumInternalError(
245 "Cannot get resource mapping. Not even the default policy found for time %s",
246 TimeHelpers.toYYYYMMDDHHMMSSSSS(millis)
250 val policyMsg = policyMspOpt.get
251 policyMsg.getResourceMapping
255 * Provides the current resource mapping. This value is cached.
257 * NOTE: The assumption is that the resource mapping is always updated with new keys,
258 * that is we allow only the addition of new resource types.
260 def currentResourceMapping = {
261 this._resourceMapping synchronized this._resourceMapping
264 // def resourceTypesMapAtMillis(millis: Long): Map[String, ResourceType] = {
265 // val policyMspOpt = policyStore.loadPolicyAt(millis)
266 // if(policyMspOpt.isEmpty) {
267 // throw new AquariumInternalError(
268 // "Cannot get resource types map. Not even the default policy found for time %s",
269 // TimeHelpers.toYYYYMMDDHHMMSSSSS(millis)
273 // val policyMsg = policyMspOpt.get
275 // ModelFactory.newPolicyModel(policyMsg).resourceTypesMap
278 // def currentResourceTypesMap: Map[String, ResourceType] = {
279 // resourceTypesMapAtMillis(TimeHelpers.nowMillis())
282 def unsafeValidPolicyModelAt(referenceTimeMillis: Long): PolicyModel = {
283 policyStore.loadPolicyAt(referenceTimeMillis) match {
285 throw new AquariumInternalError(
286 "No policy found at %s".format(TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis))
289 case Some(policyMsg) ⇒
290 ModelFactory.newPolicyModel(policyMsg)
294 def unsafeValidPolicyAt(referenceTimeMillis: Long): PolicyMsg = {
295 unsafeValidPolicyModelAt(referenceTimeMillis).msg
298 def unsafeFullPriceTableModelForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTableModel = {
299 val policyModelAtReferenceTime = unsafeValidPolicyModelAt(referenceTimeMillis)
301 policyModelAtReferenceTime.roleMapping.get(role) match {
303 throw new AquariumInternalError("Unknown price table for role %s at %s".format(
305 TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis)
308 case Some(fullPriceTable) ⇒
313 def unsafeFullPriceTableForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTableMsg = {
314 val policyAtReferenceTime = unsafeValidPolicyAt(referenceTimeMillis)
315 policyAtReferenceTime.getRoleMapping.get(role) match {
317 throw new AquariumInternalError("Unknown price table for role %s at %s".format(
319 TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis)
322 case fullPriceTable ⇒
327 def unsafeFullPriceTableModelForAgreement(
328 userAgreementModel: UserAgreementModel,
329 knownPolicyModel: PolicyModel
330 ): FullPriceTableModel = {
331 val policyModel = knownPolicyModel match {
333 unsafeValidPolicyModelAt(userAgreementModel.validFromMillis)
339 userAgreementModel.fullPriceTableRef match {
340 case PolicyDefinedFullPriceTableRef ⇒
341 val role = userAgreementModel.role
342 policyModel.roleMapping.get(role) match {
344 throw new AquariumInternalError("Unknown role %s while computing full price table for user %s at %s",
346 userAgreementModel.userID,
347 TimeHelpers.toYYYYMMDDHHMMSSSSS(userAgreementModel.validFromMillis)
350 case Some(fullPriceTable) ⇒
354 case AdHocFullPriceTableRef(fullPriceTable) ⇒
359 def unsafeFullPriceTableForAgreement(
360 userAgreement: UserAgreementMsg,
361 knownPolicyModel: PolicyModel
362 ): FullPriceTableMsg = {
364 val policyModel = knownPolicyModel match {
366 unsafeValidPolicyModelAt(userAgreement.getValidFromMillis)
372 unsafeFullPriceTableForAgreement(userAgreement, policyModel.msg)
375 def unsafeFullPriceTableForAgreement(
376 userAgreement: UserAgreementMsg,
377 knownPolicy: PolicyMsg
378 ): FullPriceTableMsg = {
379 val policy = knownPolicy match {
381 unsafeValidPolicyAt(userAgreement.getValidFromMillis)
387 val role = userAgreement.getRole
388 userAgreement.getFullPriceTableRef match {
390 policy.getRoleMapping.get(role) match {
392 throw new AquariumInternalError("Unknown role %s while computing full price table for user %s at %s",
394 userAgreement.getUserID,
395 TimeHelpers.toYYYYMMDDHHMMSSSSS(userAgreement.getValidFromMillis)
398 case fullPriceTable ⇒
402 case fullPriceTable ⇒
408 * Computes the initial user agreement for the given role and reference time. Also,
409 * records the ID from a potential related IMEvent.
411 * @param imEvent The IMEvent that creates the user
413 def initialUserAgreement(imEvent: IMEventMsg): UserAgreementModel = {
414 require(MessageHelpers.isIMEventCreate(imEvent))
416 val role = imEvent.getRole
417 val referenceTimeMillis = imEvent.getOccurredMillis
420 assert(null ne unsafeFullPriceTableModelForRoleAt(role, referenceTimeMillis))
422 ModelFactory.newUserAgreementModelFromIMEvent(imEvent)
425 def initialUserBalance(role: String, referenceTimeMillis: Long): Real = {
426 // FIXME: Where is the mapping?
430 def chargingBehaviorOf(resourceType: ResourceTypeMsg): ChargingBehavior = {
431 // A resource type never changes charging behavior. By definition.
432 val className = resourceType.getChargingBehaviorClass
433 _chargingBehaviorMap.get(className) match {
434 case Some(chargingBehavior) ⇒
439 _chargingBehaviorMap synchronized {
440 val chargingBehavior = newInstance[ChargingBehavior](className)
441 _chargingBehaviorMap = _chargingBehaviorMap.updated(className, chargingBehavior)
447 throw new AquariumInternalError("Could not load charging behavior %s".format(className), e)
452 def defaultPolicyMsg = apply(EnvKeys.defaultPolicyMsg)
454 def defaultClassLoader = apply(EnvKeys.defaultClassLoader)
456 def resourceEventStore = apply(EnvKeys.storeProvider).resourceEventStore
458 def imEventStore = apply(EnvKeys.storeProvider).imEventStore
460 def userStateStore = apply(EnvKeys.storeProvider).userStateStore
462 def policyStore = this.cachingPolicyStore
464 def eventsStoreFolder = apply(EnvKeys.eventsStoreFolder)
466 def eventBus = apply(EnvKeys.eventBus)
468 def chargingService = apply(EnvKeys.chargingService)
470 def userStateTimestampThreshold = apply(EnvKeys.userStateTimestampThreshold)
472 def adminCookie = apply(EnvKeys.adminCookie)
474 def converters = apply(EnvKeys.converters)
476 def saveResourceEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveRCEvents)
478 def saveIMEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveIMEvents)
480 def timerService = apply(EnvKeys.timerService)
482 def restPort = apply(EnvKeys.restPort)
484 def akkaService = apply(EnvKeys.akkaService)
486 def version = apply(EnvKeys.version)
490 final val PropsToShow = List(
494 SysProp.JavaClassVersion,
495 SysProp.JavaLibraryPath,
496 SysProp.JavaClassPath,
497 SysProp.JavaIOTmpDir,
505 final val RESTAdminHeaderName = "X-Aquarium-Admin-Cookie"
506 final val RESTAdminHeaderNameLowerCase = RESTAdminHeaderName.toLowerCase
509 final class AquariumEnvKey[T: Manifest](override val name: String) extends TypedKeySkeleton[T](name) {
510 override def toString = "%s(%s)".format(manifest[T], name)
513 final val ServiceKeys: List[TypedKey[_ <: Lifecycle]] = List(
514 EnvKeys.timerService,
518 EnvKeys.rabbitMQService,
519 EnvKeys.storeWatcherService,
520 EnvKeys.rabbitMQProducer
525 * The Aquarium version. Will be reported in any due occasion.
527 final val version = StringKey("version")
529 final val originalProps: TypedKey[Props] =
530 new AquariumEnvKey[Props]("originalProps")
533 * The fully qualified name of the class that implements the `StoreProvider`.
534 * Will be instantiated reflectively and should have a public default constructor.
536 final val storeProvider: TypedKey[StoreProvider] =
537 new AquariumEnvKey[StoreProvider]("store.provider.class")
540 * If a value is given to this property, then it represents a folder where all events coming to aquarium are
543 * This is for debugging purposes.
545 final val eventsStoreFolder: TypedKey[Option[File]] =
546 new AquariumEnvKey[Option[File]]("events.store.folder")
549 * If this is `true` and `events.store.folder` is defined, then all resource events are
550 * also stored in `events.store.folder`.
552 * This is for debugging purposes.
555 final val eventsStoreSaveRCEvents = BooleanKey("events.store.save.rc.events")
558 * If this is `true` and `events.store.folder` is defined, then all IM events are
559 * also stored in `events.store.folder`.
561 * This is for debugging purposes.
563 final val eventsStoreSaveIMEvents = BooleanKey("events.store.save.im.events")
566 * A time period in milliseconds for which we can tolerate stale parts regarding user state.
568 * The smaller the value, the more accurate the user credits and other state parts are.
570 * If a request for user state (e.g. balance) is received and the request timestamp exceeds
571 * the timestamp of the last known balance amount by this value, then a re-computation for
572 * the balance is triggered.
574 final val userStateTimestampThreshold = LongKey("user.state.timestamp.threshold")
577 * REST service listening port.
579 final val restPort = IntKey("rest.port")
581 final val restShutdownTimeoutMillis = LongKey("rest.shutdown.timeout.millis")
584 * A cookie used in every administrative REST API call, so that Aquarium knows it comes from
585 * an authorised client.
587 final val adminCookie: TypedKey[Option[String]] =
588 new AquariumEnvKey[Option[String]]("admin.cookie")
591 * The class that initializes the REST service
593 final val restService: TypedKey[Lifecycle] =
594 new AquariumEnvKey[Lifecycle]("rest.service.class")
596 final val akkaService: TypedKey[AkkaService] =
597 new AquariumEnvKey[AkkaService]("akka.service")
599 final val eventBus: TypedKey[EventBusService] =
600 new AquariumEnvKey[EventBusService]("event.bus.service")
602 final val timerService: TypedKey[TimerService] =
603 new AquariumEnvKey[TimerService]("timer.service")
605 final val rabbitMQService: TypedKey[RabbitMQService] =
606 new AquariumEnvKey[RabbitMQService]("rabbitmq.service")
608 final val rabbitMQProducer: TypedKey[RabbitMQProducer] =
609 new AquariumEnvKey[RabbitMQProducer]("rabbitmq.client")
611 final val storeWatcherService: TypedKey[StoreWatcherService] =
612 new AquariumEnvKey[StoreWatcherService]("store.watcher.service")
614 final val converters: TypedKey[Converters] =
615 new AquariumEnvKey[Converters]("converters")
617 final val chargingService: TypedKey[ChargingService] =
618 new AquariumEnvKey[ChargingService]("charging.service")
620 final val defaultClassLoader: TypedKey[ClassLoader] =
621 new AquariumEnvKey[ClassLoader]("default.class.loader")
623 final val defaultPolicyMsg: TypedKey[PolicyMsg] =
624 new AquariumEnvKey[PolicyMsg]("default.policy.msg")