2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium
38 import com.ckkloverdos.env.Env
39 import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey}
40 import com.ckkloverdos.props.Props
41 import gr.grnet.aquarium.store.{PolicyStore, UserStateStore, IMEventStore, ResourceEventStore, StoreProvider}
43 import gr.grnet.aquarium.util.{Loggable, Lifecycle}
44 import gr.grnet.aquarium.service.{RoleableActorProviderService, StoreWatcherService, RabbitMQService, TimerService, EventBusService, AkkaService}
45 import com.ckkloverdos.convert.Converters
46 import java.util.concurrent.atomic.AtomicBoolean
47 import org.slf4j.{LoggerFactory, Logger}
48 import gr.grnet.aquarium.logic.accounting.algorithm.CostPolicyAlgorithmCompiler
49 import gr.grnet.aquarium.computation.UserStateComputations
50 import com.ckkloverdos.maybe._
51 import gr.grnet.aquarium.ResourceLocator._
52 import gr.grnet.aquarium.logic.accounting.dsl.DSLResourcesMap
53 import gr.grnet.aquarium.logic.accounting.Policy
54 import com.ckkloverdos.sys.SysProp
58 * @author Christos KK Loverdos <loverdos@gmail.com>
61 final class Aquarium(env: Env) extends Lifecycle with Loggable {
62 import Aquarium.EnvKeys
64 private[this] val _isStopping = new AtomicBoolean(false)
66 override def toString = "%s/v%s".format(getClass.getName, version)
68 def isStopping() = _isStopping.get()
71 def getClientLogger(client: AnyRef): Logger = {
77 LoggerFactory.getLogger(client.getClass)
81 def debug(client: AnyRef, fmt: String, args: Any*) = {
82 getClientLogger(client).debug(fmt.format(args: _*))
85 def info(client: AnyRef, fmt: String, args: Any*) = {
86 getClientLogger(client).info(fmt.format(args: _*))
89 def warn(client: AnyRef, fmt: String, args: Any*) = {
90 getClientLogger(client).warn(fmt.format(args: _*))
93 @throws(classOf[AquariumInternalError])
94 def apply[T: Manifest](key: TypedKey[T]): T = {
99 throw new AquariumInternalError("Could not locate %s in Aquarium environment".format(key))
103 private[this] lazy val _allServices = Aquarium.ServiceKeys.map(this(_))
105 private[this] def startServices(): Unit = {
106 for(service ← _allServices) {
107 logStartingF(service.toString) {
113 private[this] def stopServices(): Unit = {
114 val services = _allServices.reverse
116 for(service ← services) {
117 logStoppingF(service.toString) {
118 safeUnit(service.stop())
123 private[this] def showBasicConfiguration(): Unit = {
124 logger.info("Aquarium Home = %s".format(
125 if(Homes.Folders.AquariumHome.isAbsolute)
126 Homes.Folders.AquariumHome
128 "%s [=%s]".format(Homes.Folders.AquariumHome, Homes.Folders.AquariumHome.getCanonicalPath)
131 for(folder ← this.eventsStoreFolder) {
132 logger.info("{} = {}", EnvKeys.eventsStoreFolder.name, folder)
134 this.eventsStoreFolder.throwMe // on error
136 for(prop ← Aquarium.PropsToShow) {
137 logger.info("{} = {}", prop.name, prop.rawValue)
140 logger.info("CONF_HERE = {}", HERE)
141 logger.info("{} = {}", ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES, ResourceLocator.Resources.AquariumPropertiesResource)
142 logger.info("{} = {}", ResourceLocator.ResourceNames.LOGBACK_XML, ResourceLocator.Resources.LogbackXMLResource)
143 logger.info("{} = {}", ResourceLocator.ResourceNames.POLICY_YAML, ResourceLocator.Resources.PolicyYAMLResource)
145 logger.info("Runtime.getRuntime.availableProcessors() => {}", Runtime.getRuntime.availableProcessors())
148 private[this] def addShutdownHooks(): Unit = {
149 Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
151 if(!_isStopping.get()) {
152 logStoppingF("Aquarium") {
160 def start(): Unit = {
161 this._isStopping.set(false)
162 showBasicConfiguration()
168 this._isStopping.set(true)
173 * Stops Aquarium after the given millis. Used during testing.
175 def stopAfterMillis(millis: Long) {
181 * Reflectively provide a new instance of a class and configure it appropriately.
183 def newInstance[C <: AnyRef](_class: Class[C], className: String): C = {
184 val originalProps = apply(EnvKeys.originalProps)
186 val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
188 case Just(instance) ⇒
189 eventBus.addSubscriber[C](instance)
192 case configurable: Configurable if (originalProps ne null) ⇒
193 val localProps = configurable.propertyPrefix match {
194 case somePrefix @ Some(prefix) ⇒
195 if(prefix.length == 0) {
197 "Property prefix for %s is %s. Consider using None".format(instance, somePrefix))
200 originalProps.subsetForKeyPrefix(prefix)
206 logger.debug("Configuring {} with props", configurable.getClass.getName)
207 MaybeEither(configurable configure localProps) match {
209 logger.info("Configured {} with props", configurable.getClass.getName)
213 throw new AquariumInternalError("Could not configure instance of %s".format(className), e)
221 throw new AquariumInternalError("Could not instantiate %s".format(className), e)
226 def currentResourcesMap: DSLResourcesMap = {
227 // FIXME: Get rid of this singleton stuff
228 Policy.policy.resourcesMap
231 def initialAgreementForRole(role: String, referenceTimeMillis: Long): String = {
232 // FIXME: Where is the mapping?
236 def initialBalanceForRole(role: String, referenceTimeMillis: Long): Double = {
237 // FIXME: Where is the mapping?
241 def defaultInitialUserRole: String = {
242 // FIXME: Read from properties?
246 def defaultClassLoader = apply(EnvKeys.defaultClassLoader)
248 def resourceEventStore = apply(EnvKeys.resourceEventStore)
250 def imEventStore = apply(EnvKeys.imEventStore)
252 def userStateStore = apply(EnvKeys.userStateStore)
254 def policyStore = apply(EnvKeys.policyStore)
256 def eventsStoreFolder = apply(EnvKeys.eventsStoreFolder)
258 def algorithmCompiler = apply(EnvKeys.algorithmCompiler)
260 def eventBus = apply(EnvKeys.eventBus)
262 def userStateComputations = apply(EnvKeys.userStateComputations)
264 def userStateTimestampThreshold = apply(EnvKeys.userStateTimestampThreshold)
266 def adminCookie = apply(EnvKeys.adminCookie)
268 def converters = apply(EnvKeys.converters)
270 // def actorProvider = apply(EnvKeys.actorProvider)
272 def saveResourceEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveRCEvents)
274 def saveIMEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveIMEvents)
276 def timerService = apply(EnvKeys.timerService)
278 def restPort = apply(EnvKeys.restPort)
280 def akkaService = apply(EnvKeys.akkaService)
282 def version = apply(EnvKeys.version)
286 final val PropsToShow = List(
290 SysProp.JavaClassVersion,
291 SysProp.JavaLibraryPath,
292 SysProp.JavaClassPath,
293 SysProp.JavaIOTmpDir,
301 final val RESTAdminHeaderName = "X-Aquarium-Admin-Cookie"
302 final val RESTAdminHeaderNameLowerCase = RESTAdminHeaderName.toLowerCase
305 final class AquariumEnvKey[T: Manifest](override val name: String) extends TypedKeySkeleton[T](name) {
306 override def toString = name
309 final val ServiceKeys: List[TypedKey[_ <: Lifecycle]] = List(
310 EnvKeys.timerService,
314 EnvKeys.rabbitMQService,
315 EnvKeys.storeWatcherService
320 * The Aquarium version. Will be reported in any due occasion.
322 final val version = StringKey("version")
324 final val originalProps: TypedKey[Props] =
325 new AquariumEnvKey[Props]("originalProps")
328 * The fully qualified name of the class that implements the `StoreProvider`.
329 * Will be instantiated reflectively and should have a public default constructor.
331 final val storeProvider: TypedKey[StoreProvider] =
332 new AquariumEnvKey[StoreProvider]("store.provider.class")
335 * If a value is given to this property, then it represents a folder where all events coming to aquarium are
338 * This is for debugging purposes.
340 final val eventsStoreFolder: TypedKey[Option[File]] =
341 new AquariumEnvKey[Option[File]]("events.store.folder")
344 * If this is `true` and `events.store.folder` is defined, then all resource events are
345 * also stored in `events.store.folder`.
347 * This is for debugging purposes.
350 final val eventsStoreSaveRCEvents = BooleanKey("events.store.save.rc.events")
353 * If this is `true` and `events.store.folder` is defined, then all IM events are
354 * also stored in `events.store.folder`.
356 * This is for debugging purposes.
358 final val eventsStoreSaveIMEvents = BooleanKey("events.store.save.im.events")
361 * A time period in milliseconds for which we can tolerate stale parts regarding user state.
363 * The smaller the value, the more accurate the user credits and other state parts are.
365 * If a request for user state (e.g. balance) is received and the request timestamp exceeds
366 * the timestamp of the last known balance amount by this value, then a re-computation for
367 * the balance is triggered.
369 final val userStateTimestampThreshold = LongKey("user.state.timestamp.threshold")
372 * REST service listening port.
376 final val restPort = IntKey("rest.port")
378 final val restShutdownTimeoutMillis = LongKey("rest.shutdown.timeout.millis")
381 * A cookie used in every administrative REST API call, so that Aquarium knows it comes from
382 * an authorised client.
384 final val adminCookie: TypedKey[Option[String]] =
385 new AquariumEnvKey[Option[String]]("admin.cookie")
387 final val resourceEventStore: TypedKey[ResourceEventStore] =
388 new AquariumEnvKey[ResourceEventStore]("resource.event.store.class")
390 final val imEventStore: TypedKey[IMEventStore] =
391 new AquariumEnvKey[IMEventStore]("im.event.store.class")
393 final val userStateStore: TypedKey[UserStateStore] =
394 new AquariumEnvKey[UserStateStore]("user.state.store.class")
396 final val policyStore: TypedKey[PolicyStore] =
397 new AquariumEnvKey[PolicyStore]("policy.store.class")
400 * The class that initializes the REST service
402 final val restService: TypedKey[Lifecycle] =
403 new AquariumEnvKey[Lifecycle]("rest.service.class")
406 * The fully qualified name of the class that implements the `RoleableActorProviderService`.
407 * Will be instantiated reflectively and should have a public default constructor.
409 // final val actorProvider: TypedKey[RoleableActorProviderService] =
410 // new AquariumEnvKey[RoleableActorProviderService]("actor.provider.class")
412 final val akkaService: TypedKey[AkkaService] =
413 new AquariumEnvKey[AkkaService]("akka.service")
415 final val eventBus: TypedKey[EventBusService] =
416 new AquariumEnvKey[EventBusService]("event.bus.service")
418 final val timerService: TypedKey[TimerService] =
419 new AquariumEnvKey[TimerService]("timer.service")
421 final val rabbitMQService: TypedKey[RabbitMQService] =
422 new AquariumEnvKey[RabbitMQService]("rabbitmq.service")
424 final val storeWatcherService: TypedKey[StoreWatcherService] =
425 new AquariumEnvKey[StoreWatcherService]("store.watcher.service")
427 final val converters: TypedKey[Converters] =
428 new AquariumEnvKey[Converters]("converters")
430 final val algorithmCompiler: TypedKey[CostPolicyAlgorithmCompiler] =
431 new AquariumEnvKey[CostPolicyAlgorithmCompiler]("algorithm.compiler")
433 final val userStateComputations: TypedKey[UserStateComputations] =
434 new AquariumEnvKey[UserStateComputations]("user.state.computations")
436 final val defaultClassLoader: TypedKey[ClassLoader] =
437 new AquariumEnvKey[ClassLoader]("default.class.loader")