2 * Copyright 2011-2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium
40 import com.ckkloverdos.convert.Converters.{DefaultConverters => TheDefaultConverters}
41 import com.ckkloverdos.maybe._
42 import com.ckkloverdos.props.Props
43 import com.ckkloverdos.sys.SysProp
45 import gr.grnet.aquarium.util.{Lifecycle, Loggable, shortNameOfClass, shortClassNameOf}
46 import gr.grnet.aquarium.store._
47 import gr.grnet.aquarium.service._
48 import gr.grnet.aquarium.converter.StdConverters
49 import java.util.concurrent.atomic.AtomicBoolean
50 import gr.grnet.aquarium.ResourceLocator._
51 import gr.grnet.aquarium.computation.UserStateComputations
52 import gr.grnet.aquarium.logic.accounting.algorithm.{SimpleCostPolicyAlgorithmCompiler, CostPolicyAlgorithmCompiler}
53 import gr.grnet.aquarium.logic.accounting.dsl.DSLResourcesMap
54 import gr.grnet.aquarium.logic.accounting.Policy
55 import org.slf4j.{Logger, LoggerFactory}
58 * This is the Aquarium entry point.
60 * Responsible to load all of application configuration and provide the relevant services.
62 * @author Christos KK Loverdos <loverdos@gmail.com>.
64 final class Aquarium(val props: Props) extends Lifecycle with Loggable { aquariumSelf ⇒
67 private[this] val _isStopping = new AtomicBoolean(false)
69 def isStopping() = _isStopping.get()
72 def getClientLogger(client: AnyRef): Logger = {
78 LoggerFactory.getLogger(client.getClass)
82 def debug(client: AnyRef, fmt: String, args: Any*) = {
83 getClientLogger(client).debug(fmt.format(args: _*))
86 def info(client: AnyRef, fmt: String, args: Any*) = {
87 getClientLogger(client).info(fmt.format(args: _*))
90 def warn(client: AnyRef, fmt: String, args: Any*) = {
91 getClientLogger(client).warn(fmt.format(args: _*))
95 * Reflectively provide a new instance of a class and configure it appropriately.
97 private[this] def newInstance[C : Manifest](_className: String = ""): C = {
98 val className = _className match {
100 manifest[C].erasure.getName
106 val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
108 case Just(instance) ⇒ instance match {
109 case configurable: Configurable ⇒
110 val localProps = configurable.propertyPrefix match {
112 props.subsetForKeyPrefix(prefix)
118 logger.debug("Configuring {} with props", configurable.getClass.getName)
119 MaybeEither(configurable configure localProps) match {
121 logger.info("Configured {} with props", configurable.getClass.getName)
125 throw new AquariumInternalError("Could not configure instance of %s".format(className), e)
133 throw new AquariumInternalError("Could not instantiate %s".format(className), e)
138 private[this] lazy val _algorithmCompiler: CostPolicyAlgorithmCompiler = SimpleCostPolicyAlgorithmCompiler
140 private[this] lazy val _userStateComputations = new UserStateComputations(aquariumSelf)
142 private[this] lazy val _actorProvider = newInstance[RoleableActorProviderService](props(Keys.actor_provider_class))
145 * Initializes a store provider, according to the value configured
146 * in the configuration file. The
148 private[this] lazy val _storeProvider = newInstance[StoreProvider](props(Keys.store_provider_class))
150 private[this] lazy val _restService = newInstance[Lifecycle](props(Keys.rest_service_class))
152 private[this] lazy val _userStateStoreM: Maybe[UserStateStore] = {
153 // If there is a specific `UserStateStore` implementation specified in the
154 // properties, then this implementation overrides the user store given by
156 props.get(Keys.user_state_store_class) map { className ⇒
157 val instance = newInstance[UserStateStore](className)
158 logger.info("Overriding %s provisioning. Implementation given by: %s".format(
159 shortNameOfClass(classOf[UserStateStore]),
165 private[this] lazy val _resourceEventStoreM: Maybe[ResourceEventStore] = {
166 // If there is a specific `EventStore` implementation specified in the
167 // properties, then this implementation overrides the event store given by
169 props.get(Keys.resource_event_store_class) map { className ⇒
170 val instance = newInstance[ResourceEventStore](className)
171 logger.info("Overriding EventStore provisioning. Implementation given by: %s".format(instance.getClass))
176 private[this] lazy val _imEventStoreM: Maybe[IMEventStore] = {
177 props.get(Keys.user_event_store_class) map { className ⇒
178 val instance = newInstance[IMEventStore](className)
179 logger.info("Overriding IMEventStore provisioning. Implementation given by: %s".format(instance.getClass))
184 private[this] lazy val _policyStoreM: Maybe[PolicyStore] = {
185 props.get(Keys.policy_store_class) map {
187 val instance = newInstance[PolicyStore](className)
188 logger.info("Overriding PolicyStore provisioning. Implementation given by: %s".format(instance.getClass))
193 private[this] lazy val _eventsStoreFolder: Maybe[File] = {
194 props.get(Keys.events_store_folder) map {
196 logger.info("{} = {}", Keys.events_store_folder, folderName)
198 val canonicalFolder = {
199 val folder = new File(folderName)
200 if(folder.isAbsolute) {
201 folder.getCanonicalFile
203 logger.info("{} is not absolute, making it relative to Aquarium Home", Keys.events_store_folder)
204 new File(ResourceLocator.Homes.Folders.AquariumHome, folderName).getCanonicalFile
208 val canonicalPath = canonicalFolder.getCanonicalPath
210 if(canonicalFolder.exists() && !canonicalFolder.isDirectory) {
211 throw new AquariumInternalError("%s = %s is not a folder".format(Keys.events_store_folder, canonicalFolder))
214 // Now, events folder must be outside AQUARIUM_HOME, since AQUARIUM_HOME can be wiped out for an upgrade but
215 // we still want to keep the events.
216 val ahCanonicalPath = ResourceLocator.Homes.Folders.AquariumHome.getCanonicalPath
217 if(canonicalPath.startsWith(ahCanonicalPath)) {
218 throw new AquariumException(
219 "%s = %s is under Aquarium Home = %s".format(
220 Keys.events_store_folder,
226 canonicalFolder.mkdirs()
232 private[this] lazy val _events_store_save_rc_events = props.getBoolean(Keys.events_store_save_rc_events).getOr(false)
234 private[this] lazy val _events_store_save_im_events = props.getBoolean(Keys.events_store_save_im_events).getOr(false)
236 private[this] lazy val _converters = StdConverters.AllConverters
238 private[this] lazy val _timerService: TimerService = newInstance[SimpleTimerService]()
240 private[this] lazy val _akka = newInstance[AkkaService]()
242 private[this] lazy val _eventBus = newInstance[EventBusService]()
244 private[this] lazy val _rabbitmqService = newInstance[RabbitMQService]()
246 private[this] lazy val _storeWatcherService = newInstance[StoreWatcherService]()
248 private[this] lazy val _allServices = List(
258 def get(key: String, default: String = ""): String = props.getOr(key, default)
260 def defaultClassLoader = Thread.currentThread().getContextClassLoader
263 * FIXME: This must be ditched.
265 * Find a file whose location can be overiden in
266 * the configuration file (e.g. policy.yaml)
268 * @param name Name of the file to search for
269 * @param prop Name of the property that defines the file path
270 * @param default Name to return if no file is found
272 def findConfigFile(name: String, prop: String, default: String): File = {
273 // Check for the configured value first
274 val configured = new File(get(prop))
275 if (configured.exists)
278 // Look into the configuration context
279 ResourceLocator.getResource(name) match {
280 case Just(policyResource) ⇒
281 val path = policyResource.url.getPath
288 private[this] def startServices(): Unit = {
289 for(service ← _allServices) {
290 logStartingF(service.toString) {
296 private[this] def stopServices(): Unit = {
297 val services = _allServices.reverse
299 for(service ← services) {
300 logStoppingF(service.toString) {
301 safeUnit(service.stop())
306 def stopWithDelay(millis: Long) {
311 private[this] def configure(): Unit = {
312 logger.info("Aquarium Home = %s".format(
313 if(Homes.Folders.AquariumHome.isAbsolute)
314 Homes.Folders.AquariumHome
316 "%s [=%s]".format(Homes.Folders.AquariumHome, Homes.Folders.AquariumHome.getCanonicalPath)
319 for(folder ← this.eventsStoreFolder) {
320 logger.info("{} = {}", Aquarium.Keys.events_store_folder, folder)
322 this.eventsStoreFolder.throwMe // on error
324 for(prop ← Aquarium.PropsToShow) {
325 logger.info("{} = {}", prop.name, prop.rawValue)
328 logger.info("CONF_HERE = {}", HERE)
329 logger.info("{} = {}", ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES, ResourceLocator.Resources.AquariumPropertiesResource)
330 logger.info("{} = {}", ResourceLocator.ResourceNames.LOGBACK_XML, ResourceLocator.Resources.LogbackXMLResource)
331 logger.info("{} = {}", ResourceLocator.ResourceNames.POLICY_YAML, ResourceLocator.Resources.PolicyYAMLResource)
334 private[this] def addShutdownHooks(): Unit = {
335 Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
337 if(!_isStopping.get()) {
338 logStoppingF("Aquarium") {
347 this._isStopping.set(false)
354 this._isStopping.set(true)
358 def algorithmCompiler = _algorithmCompiler
360 def userStateComputations = _userStateComputations
362 def converters = _converters
364 def actorProvider = _actorProvider
366 def eventBus = _eventBus
368 def timerService = _timerService
370 def userStateStore = {
371 _userStateStoreM match {
373 case _ ⇒ storeProvider.userStateStore
377 def resourceEventStore = {
378 _resourceEventStoreM match {
380 case _ ⇒ storeProvider.resourceEventStore
385 _imEventStoreM match {
387 case _ ⇒ storeProvider.imEventStore
392 _policyStoreM match {
394 case _ ⇒ storeProvider.policyStore
398 def storeProvider = _storeProvider
400 def currentResourcesMap: DSLResourcesMap = {
401 // FIXME: Get rid of this singleton stuff
402 Policy.policy.resourcesMap
405 def initialAgreementForRole(role: String, referenceTimeMillis: Long): String = {
406 // FIXME: Where is the mapping?
410 def initialBalanceForRole(role: String, referenceTimeMillis: Long): Double = {
411 // FIXME: Where is the mapping?
415 def defaultInitialUserRole: String = {
416 // FIXME: Read from properties?
420 def withStoreProviderClass[C <: StoreProvider](spc: Class[C]): Aquarium = {
421 val map = this.props.map
422 val newMap = map.updated(Keys.store_provider_class, spc.getName)
423 val newProps = new Props(newMap)
424 new Aquarium(newProps)
427 def eventsStoreFolder = _eventsStoreFolder
429 def saveResourceEventsToEventsStoreFolder = _events_store_save_rc_events
431 def saveIMEventsToEventsStoreFolder = _events_store_save_im_events
433 def adminCookie: MaybeOption[String] = props.get(Aquarium.Keys.admin_cookie) match {
434 case just @ Just(_) ⇒ just
440 final val PropsToShow = List(
444 SysProp.JavaClassVersion,
445 SysProp.JavaLibraryPath,
446 SysProp.JavaClassPath,
447 SysProp.JavaIOTmpDir,
454 implicit val DefaultConverters = TheDefaultConverters
456 final val PolicyConfName = ResourceLocator.ResourceNames.POLICY_YAML
458 final val RolesAgreementsName = ResourceLocator.ResourceNames.ROLE_AGREEMENTS_MAP
460 final lazy val AquariumPropertiesResource = ResourceLocator.Resources.AquariumPropertiesResource
462 final lazy val AquariumProperties = {
463 val maybeProps = Props(AquariumPropertiesResource)
469 throw new AquariumInternalError(
470 "Could not load %s from %s".format(
471 ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
472 AquariumPropertiesResource))
476 throw new AquariumInternalError(
477 "Could not load %s from %s".format(
478 ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
479 AquariumPropertiesResource),
485 * The main [[gr.grnet.aquarium.Aquarium]] instance.
487 final lazy val Instance = {
488 Maybe(new Aquarium(AquariumProperties)) match {
489 case Just(masterConf) ⇒
493 throw new AquariumInternalError(
494 "Could not create Aquarium configuration from %s".format(
495 AquariumPropertiesResource))
498 throw new AquariumInternalError(
499 "Could not create Aquarium configuration from %s".format(
500 AquariumPropertiesResource),
506 * Defines the names of all the known keys inside the master properties file.
511 * The Aquarium version. Will be reported in any due occasion.
513 final val version = "version"
516 * The fully qualified name of the class that implements the `RoleableActorProviderService`.
517 * Will be instantiated reflectively and should have a public default constructor.
519 final val actor_provider_class = "actor.provider.class"
522 * The class that initializes the REST service
524 final val rest_service_class = "rest.service.class"
527 * The fully qualified name of the class that implements the `StoreProvider`.
528 * Will be instantiated reflectively and should have a public default constructor.
530 final val store_provider_class = "store.provider.class"
533 * The class that implements the User store
535 final val user_state_store_class = "user.state.store.class"
538 * The class that implements the resource event store
540 final val resource_event_store_class = "resource.event.store.class"
543 * The class that implements the IM event store
545 final val user_event_store_class = "user.event.store.class"
548 * The class that implements the wallet entries store
550 final val policy_store_class = "policy.store.class"
553 /** The lower mark for the UserActors' LRU.
555 * The terminology is borrowed from the (also borrowed) Apache-lucene-solr-based implementation.
558 final val user_actors_lru_lower_mark = "user.actors.LRU.lower.mark"
561 * The upper mark for the UserActors' LRU.
563 * The terminology is borrowed from the (also borrowed) Apache-lucene-solr-based implementation.
565 final val user_actors_lru_upper_mark = "user.actors.LRU.upper.mark"
568 * REST service listening port.
572 final val rest_port = "rest.port"
575 * Location of the Aquarium accounting policy config file
577 final val aquarium_policy = "aquarium.policy"
580 * Location of the role-agreement mapping file
582 final val aquarium_role_agreement_map = "aquarium.role-agreement.map"
585 * A time period in milliseconds for which we can tolerate stale parts regarding user state.
587 * The smaller the value, the more accurate the user credits and other state parts are.
589 * If a request for user state (e.g. balance) is received and the request timestamp exceeds
590 * the timestamp of the last known balance amount by this value, then a re-computation for
591 * the balance is triggered.
593 final val user_state_timestamp_threshold = "user.state.timestamp.threshold"
596 * The time unit is the lowest billable time period.
597 * For example, with a time unit of ten seconds, if a VM is started up and shut down in nine
598 * seconds, then the user will be billed for ten seconds.
600 * This is an overall constant. We use it as a property in order to prepare ourselves for
601 * multi-cloud setup, where the same Aquarium instance is used to bill several distinct cloud
604 final val time_unit_in_millis = "time.unit.in.seconds"
607 * If a value is given to this property, then it represents a folder where all events coming to aquarium are
610 final val events_store_folder = "events.store.folder"
613 * If this is `true` and `events.store.folder` is defined, then all resource events are
614 * also stored in `events.store.folder`.
616 * This is for debugging purposes.
618 final val events_store_save_rc_events = "events.store.save.rc.events"
621 * If this is `true` and `events.store.folder` is defined, then all IM events are
622 * also stored in `events.store.folder`.
624 * This is for debugging purposes.
626 final val events_store_save_im_events = "events.store.save.im.events"
629 * If set to `true`, then an IM event that cannot be parsed to [[gr.grnet.aquarium.event.model.im.IMEventModel]] is
630 * saved to the [[gr.grnet.aquarium.store.IMEventStore]].
632 final val save_unparsed_event_im = "save.unparsed.event.im"
635 * A cookie used in every administrative REST API call, so that Aquarium knows it comes from
636 * an authorised client.
638 final val admin_cookie = "admin.cookie"
642 final val RESTAdminHeaderName = "X-Aquarium-Admin-Cookie"
643 final val RESTAdminHeaderNameLowerCase = RESTAdminHeaderName.toLowerCase