2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium
38 import com.ckkloverdos.env.Env
39 import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey}
40 import com.ckkloverdos.props.Props
41 import connector.rabbitmq.RabbitMQProducer
42 import gr.grnet.aquarium.store.{PolicyStore, StoreProvider}
44 import gr.grnet.aquarium.util.{Loggable, Lifecycle}
45 import gr.grnet.aquarium.service.{StoreWatcherService, RabbitMQService, TimerService, EventBusService, AkkaService}
46 import com.ckkloverdos.convert.Converters
47 import java.util.concurrent.atomic.AtomicBoolean
48 import org.slf4j.{LoggerFactory, Logger}
49 import com.ckkloverdos.maybe._
50 import com.ckkloverdos.sys.SysProp
51 import gr.grnet.aquarium.service.event.AquariumCreatedEvent
52 import gr.grnet.aquarium.policy.{FullPriceTable, PolicyModel, CachingPolicyStore, PolicyDefinedFullPriceTableRef, StdUserAgreement, UserAgreementModel, ResourceType}
53 import gr.grnet.aquarium.charging.{ChargingService, ChargingBehavior}
54 import gr.grnet.aquarium.util.date.TimeHelpers
58 * @author Christos KK Loverdos <loverdos@gmail.com>
61 final class Aquarium(env: Env) extends Lifecycle with Loggable {
62 import Aquarium.EnvKeys
64 @volatile private[this] var _chargingBehaviorMap = Map[String, ChargingBehavior]()
66 private[this] lazy val cachingPolicyStore = new CachingPolicyStore(
67 apply(EnvKeys.defaultPolicyModel),
68 apply(EnvKeys.storeProvider).policyStore
71 private[this] val _isStopping = new AtomicBoolean(false)
73 override def toString = "%s/v%s".format(getClass.getName, version)
75 def isStopping() = _isStopping.get()
78 def getClientLogger(client: AnyRef): Logger = {
84 LoggerFactory.getLogger(client.getClass)
88 def debug(client: AnyRef, fmt: String, args: Any*) = {
89 getClientLogger(client).debug(fmt.format(args: _*))
92 def info(client: AnyRef, fmt: String, args: Any*) = {
93 getClientLogger(client).info(fmt.format(args: _*))
96 def warn(client: AnyRef, fmt: String, args: Any*) = {
97 getClientLogger(client).warn(fmt.format(args: _*))
100 @throws(classOf[AquariumInternalError])
101 def apply[T: Manifest](key: TypedKey[T]): T = {
106 throw new AquariumInternalError("Could not locate %s in Aquarium environment".format(key))
110 private[this] lazy val _allServices = Aquarium.ServiceKeys.map(this.apply(_))
112 private[this] def startServices(): Unit = {
113 for(service ← _allServices) {
114 logStartingF(service.toString) {
120 private[this] def stopServices(): Unit = {
121 val services = _allServices.reverse
123 for(service ← services) {
124 logStoppingF(service.toString) {
125 safeUnit(service.stop())
130 private[this] def showBasicConfiguration(): Unit = {
131 for(folder ← this.eventsStoreFolder) {
132 logger.info("{} = {}", EnvKeys.eventsStoreFolder.name, folder)
134 this.eventsStoreFolder.throwMe // on error
136 logger.info("default policy = {}", defaultPolicyModel.toJsonString)
139 private[this] def addShutdownHooks(): Unit = {
140 Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
142 if(!_isStopping.get()) {
143 logStoppingF("Aquarium") {
151 def start(): Unit = {
152 this._isStopping.set(false)
153 showBasicConfiguration()
159 this._isStopping.set(true)
164 * Stops Aquarium after the given millis. Used during testing.
166 def stopAfterMillis(millis: Long) {
172 * Reflectively provide a new instance of a class and configure it appropriately.
174 def newInstance[C <: AnyRef](_class: Class[C]): C = {
175 newInstance(_class.getName)
179 * Reflectively provide a new instance of a class and configure it appropriately.
181 def newInstance[C <: AnyRef](className: String): C = {
182 val originalProps = apply(EnvKeys.originalProps)
184 val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
186 case Just(instance) ⇒
187 // eventBus.addSubscriber[C](instance)
189 case aquariumAware: AquariumAware ⇒
190 aquariumAware.awareOfAquarium(AquariumCreatedEvent(this))
196 case configurable: Configurable if (originalProps ne null) ⇒
197 val localProps = configurable.propertyPrefix match {
198 case somePrefix @ Some(prefix) ⇒
199 if(prefix.length == 0) {
201 "Property prefix for %s is %s. Consider using None".format(instance, somePrefix))
204 originalProps.subsetForKeyPrefix(prefix)
210 logger.debug("Configuring {} with props (prefix={})", configurable.getClass.getName, configurable.propertyPrefix)
211 MaybeEither(configurable configure localProps) match {
213 logger.info("Configured {} with props (prefix={})", configurable.getClass.getName, configurable.propertyPrefix)
216 throw new AquariumInternalError("Could not configure instance of %s".format(className), e)
225 throw new AquariumInternalError("Could not instantiate %s".format(className), e)
230 def currentResourceTypesMap: Map[String, ResourceType] = {
231 val policyOpt = policyStore.loadValidPolicyAt(TimeHelpers.nowMillis())
232 if(policyOpt.isEmpty) {
233 throw new AquariumInternalError("Not even the default policy found")
236 policyOpt.get.resourceTypesMap
239 def unsafeValidPolicyAt(referenceTimeMillis: Long): PolicyModel = {
240 policyStore.loadValidPolicyAt(referenceTimeMillis) match {
242 throw new AquariumInternalError(
243 "No policy found at %s".format(TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis))
251 def unsafePriceTableForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTable = {
252 val policyAtReferenceTime = unsafeValidPolicyAt(referenceTimeMillis)
253 policyAtReferenceTime.roleMapping.get(role) match {
255 throw new AquariumInternalError("Unknown price table for role %s at %s".format(
257 TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis)
260 case Some(fullPriceTable) ⇒
266 * Computes the initial user agreement for the given role and reference time. Also,
267 * records the ID from a potential related IMEvent.
269 * @param role The role in the agreement
270 * @param referenceTimeMillis The reference time to consider for the agreement
272 def initialUserAgreement(
274 referenceTimeMillis: Long,
275 relatedIMEventID: Option[String]
276 ): UserAgreementModel = {
279 assert(null ne unsafePriceTableForRoleAt(role, referenceTimeMillis))
282 "<StandardUserAgreement>",
287 PolicyDefinedFullPriceTableRef()
291 def initialUserBalance(role: String, referenceTimeMillis: Long): Double = {
292 // FIXME: Where is the mapping?
296 def chargingBehaviorOf(resourceType: ResourceType): ChargingBehavior = {
297 // A resource type never changes charging behavior. By definition.
298 val className = resourceType.chargingBehavior
299 _chargingBehaviorMap.get(className) match {
300 case Some(chargingBehavior) ⇒
305 _chargingBehaviorMap synchronized {
306 val chargingBehavior = newInstance[ChargingBehavior](className)
307 _chargingBehaviorMap = _chargingBehaviorMap.updated(className, chargingBehavior)
313 throw new AquariumInternalError("Could not load charging behavior %s".format(className), e)
318 def defaultPolicyModel = apply(EnvKeys.defaultPolicyModel)
320 def defaultClassLoader = apply(EnvKeys.defaultClassLoader)
322 def resourceEventStore = apply(EnvKeys.storeProvider).resourceEventStore
324 def imEventStore = apply(EnvKeys.storeProvider).imEventStore
326 def userStateStore = apply(EnvKeys.storeProvider).userStateStore
328 def policyStore = this.cachingPolicyStore
330 def eventsStoreFolder = apply(EnvKeys.eventsStoreFolder)
332 def eventBus = apply(EnvKeys.eventBus)
334 def chargingService = apply(EnvKeys.chargingService)
336 def userStateTimestampThreshold = apply(EnvKeys.userStateTimestampThreshold)
338 def adminCookie = apply(EnvKeys.adminCookie)
340 def converters = apply(EnvKeys.converters)
342 def saveResourceEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveRCEvents)
344 def saveIMEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveIMEvents)
346 def timerService = apply(EnvKeys.timerService)
348 def restPort = apply(EnvKeys.restPort)
350 def akkaService = apply(EnvKeys.akkaService)
352 def version = apply(EnvKeys.version)
356 final val PropsToShow = List(
360 SysProp.JavaClassVersion,
361 SysProp.JavaLibraryPath,
362 SysProp.JavaClassPath,
363 SysProp.JavaIOTmpDir,
371 final val RESTAdminHeaderName = "X-Aquarium-Admin-Cookie"
372 final val RESTAdminHeaderNameLowerCase = RESTAdminHeaderName.toLowerCase
375 final class AquariumEnvKey[T: Manifest](override val name: String) extends TypedKeySkeleton[T](name) {
376 override def toString = "%s(%s)".format(manifest[T], name)
379 final val ServiceKeys: List[TypedKey[_ <: Lifecycle]] = List(
380 EnvKeys.timerService,
384 EnvKeys.rabbitMQService,
385 EnvKeys.storeWatcherService
390 * The Aquarium version. Will be reported in any due occasion.
392 final val version = StringKey("version")
394 final val originalProps: TypedKey[Props] =
395 new AquariumEnvKey[Props]("originalProps")
398 * The fully qualified name of the class that implements the `StoreProvider`.
399 * Will be instantiated reflectively and should have a public default constructor.
401 final val storeProvider: TypedKey[StoreProvider] =
402 new AquariumEnvKey[StoreProvider]("store.provider.class")
405 * If a value is given to this property, then it represents a folder where all events coming to aquarium are
408 * This is for debugging purposes.
410 final val eventsStoreFolder: TypedKey[Option[File]] =
411 new AquariumEnvKey[Option[File]]("events.store.folder")
414 * If this is `true` and `events.store.folder` is defined, then all resource events are
415 * also stored in `events.store.folder`.
417 * This is for debugging purposes.
420 final val eventsStoreSaveRCEvents = BooleanKey("events.store.save.rc.events")
423 * If this is `true` and `events.store.folder` is defined, then all IM events are
424 * also stored in `events.store.folder`.
426 * This is for debugging purposes.
428 final val eventsStoreSaveIMEvents = BooleanKey("events.store.save.im.events")
431 * A time period in milliseconds for which we can tolerate stale parts regarding user state.
433 * The smaller the value, the more accurate the user credits and other state parts are.
435 * If a request for user state (e.g. balance) is received and the request timestamp exceeds
436 * the timestamp of the last known balance amount by this value, then a re-computation for
437 * the balance is triggered.
439 final val userStateTimestampThreshold = LongKey("user.state.timestamp.threshold")
442 * REST service listening port.
444 final val restPort = IntKey("rest.port")
446 final val restShutdownTimeoutMillis = LongKey("rest.shutdown.timeout.millis")
449 * A cookie used in every administrative REST API call, so that Aquarium knows it comes from
450 * an authorised client.
452 final val adminCookie: TypedKey[Option[String]] =
453 new AquariumEnvKey[Option[String]]("admin.cookie")
456 * The class that initializes the REST service
458 final val restService: TypedKey[Lifecycle] =
459 new AquariumEnvKey[Lifecycle]("rest.service.class")
461 final val akkaService: TypedKey[AkkaService] =
462 new AquariumEnvKey[AkkaService]("akka.service")
464 final val eventBus: TypedKey[EventBusService] =
465 new AquariumEnvKey[EventBusService]("event.bus.service")
467 final val timerService: TypedKey[TimerService] =
468 new AquariumEnvKey[TimerService]("timer.service")
470 final val rabbitMQService: TypedKey[RabbitMQService] =
471 new AquariumEnvKey[RabbitMQService]("rabbitmq.service")
473 final val rabbitMQProducer: TypedKey[RabbitMQProducer] =
474 new AquariumEnvKey[RabbitMQProducer]("rabbitmq.client")
476 final val storeWatcherService: TypedKey[StoreWatcherService] =
477 new AquariumEnvKey[StoreWatcherService]("store.watcher.service")
479 final val converters: TypedKey[Converters] =
480 new AquariumEnvKey[Converters]("converters")
482 final val chargingService: TypedKey[ChargingService] =
483 new AquariumEnvKey[ChargingService]("charging.service")
485 final val defaultClassLoader: TypedKey[ClassLoader] =
486 new AquariumEnvKey[ClassLoader]("default.class.loader")
488 final val defaultPolicyModel: TypedKey[PolicyModel] =
489 new AquariumEnvKey[PolicyModel]("default.policy.model")