2 * Copyright 2011 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
12 * 2. Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following
14 * disclaimer in the documentation and/or other materials
15 * provided with the distribution.
17 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
30 * The views and conclusions contained in the software and
31 * documentation are those of the authors and should not be
32 * interpreted as representing official policies, either expressed
33 * or implied, of GRNET S.A.
36 package gr.grnet.aquarium
38 import actor.{ActorProvider}
39 import com.ckkloverdos.resource._
40 import com.ckkloverdos.sys.SysProp
41 import com.ckkloverdos.props.Props
42 import com.ckkloverdos.maybe.{Maybe, Failed, Just, NoVal}
43 import com.ckkloverdos.convert.Converters.{DefaultConverters => TheDefaultConverters}
44 import processor.actor.{UserEventProcessorService, ResourceEventProcessorService, EventProcessorService}
46 import util.{Lifecycle, Loggable}
48 import collection.mutable.Buffer
49 import java.io.{ByteArrayInputStream, InputStream, Reader, BufferedReader}
52 * The master configurator. Responsible to load all of application configuration and provide the relevant services.
54 * @author Christos KK Loverdos <loverdos@gmail.com>.
56 class Configurator(val props: Props) extends Loggable {
57 import Configurator.Keys
60 * Reflectively provide a new instance of a class and configure it appropriately.
62 private[this] def newInstance[C : Manifest](className: String): C = {
63 val instanceM = Maybe(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
65 case Just(instance) ⇒ instance match {
66 case configurable: Configurable ⇒
67 Maybe(configurable configure props) match {
71 throw new Exception("Could not configure instance of %s".format(className), e)
73 throw new Exception("Could not configure instance of %s".format(className))
79 throw new Exception("Could not instantiate %s".format(className), e)
81 throw new Exception("Could not instantiate %s".format(className))
86 private[this] lazy val _actorProvider: ActorProvider = {
87 val instance = newInstance[ActorProvider](props.getEx(Keys.actor_provider_class))
88 logger.info("Loaded ActorProvider: %s".format(instance.getClass))
93 * Initializes a store provider, according to the value configured
94 * in the configuration file. The
96 private[this] lazy val _storeProvider: StoreProvider = {
97 val instance = newInstance[StoreProvider](props.getEx(Keys.store_provider_class))
98 logger.info("Loaded StoreProvider: %s".format(instance.getClass))
102 private[this] lazy val _restService: Lifecycle = {
103 val instance = newInstance[Lifecycle](props.getEx(Keys.rest_service_class))
104 logger.info("Loaded RESTService: %s".format(instance.getClass))
108 private[this] lazy val _userStateStoreM: Maybe[UserStateStore] = {
109 // If there is a specific `UserStateStore` implementation specified in the
110 // properties, then this implementation overrides the user store given by
112 props.get(Keys.user_state_store_class) map { className ⇒
113 val instance = newInstance[UserStateStore](className)
114 logger.info("Overriding UserStateStore provisioning. Implementation given by: %s".format(instance.getClass))
119 private[this] lazy val _resourceEventStoreM: Maybe[ResourceEventStore] = {
120 // If there is a specific `EventStore` implementation specified in the
121 // properties, then this implementation overrides the event store given by
123 props.get(Keys.resource_event_store_class) map { className ⇒
124 val instance = newInstance[ResourceEventStore](className)
125 logger.info("Overriding EventStore provisioning. Implementation given by: %s".format(instance.getClass))
130 private[this] lazy val _userEventStoreM: Maybe[UserEventStore] = {
131 props.get(Keys.user_event_store_class) map { className ⇒
132 val instance = newInstance[UserEventStore](className)
133 logger.info("Overriding UserEventStore provisioning. Implementation given by: %s".format(instance.getClass))
138 private[this] lazy val _WalletEventStoreM: Maybe[WalletEntryStore] = {
139 // If there is a specific `IMStore` implementation specified in the
140 // properties, then this implementation overrides the event store given by
142 props.get(Keys.wallet_entry_store_class) map {
144 val instance = newInstance[WalletEntryStore](className)
145 logger.info("Overriding WalletEntryStore provisioning. Implementation given by: %s".format(instance.getClass))
150 private[this] lazy val _policyStoreM: Maybe[PolicyStore] = {
151 props.get(Keys.policy_store_class) map {
153 val instance = newInstance[PolicyStore](className)
154 logger.info("Overriding PolicyStore provisioning. Implementation given by: %s".format(instance.getClass))
159 private[this] lazy val _resEventProc: ResourceEventProcessorService = new ResourceEventProcessorService
161 private[this] lazy val _imEventProc: UserEventProcessorService = new UserEventProcessorService
163 def get(key: String, default: String = ""): String = props.getOr(key, default)
165 def defaultClassLoader = Thread.currentThread().getContextClassLoader
167 def startServices(): Unit = {
169 _actorProvider.start()
170 _resEventProc.start()
174 def stopServices(): Unit = {
178 _actorProvider.stop()
180 // akka.actor.Actor.registry.shutdownAll()
183 def stopServicesWithDelay(millis: Long) {
188 def actorProvider = _actorProvider
190 def userStateStore = {
191 _userStateStoreM match {
193 case _ ⇒ storeProvider.userStateStore
197 def resourceEventStore = {
198 _resourceEventStoreM match {
200 case _ ⇒ storeProvider.resourceEventStore
205 _WalletEventStoreM match {
207 case _ ⇒ storeProvider.walletEntryStore
211 def userEventStore = {
212 _userEventStoreM match {
214 case _ ⇒ storeProvider.userEventStore
219 _policyStoreM match {
221 case _ ⇒ storeProvider.policyStore
225 def storeProvider = _storeProvider
227 def withStoreProviderClass[C <: StoreProvider](spc: Class[C]): Configurator = {
228 val map = this.props.map
229 val newMap = map.updated(Keys.store_provider_class, spc.getName)
230 val newProps = new Props(newMap)
231 new Configurator(newProps)
235 object Configurator {
236 implicit val DefaultConverters = TheDefaultConverters
238 val MasterConfName = "aquarium.properties"
240 val PolicyConfName = "policy.yaml"
243 * Current directory resource context.
244 * Normally this should be the application installation directory.
246 * It takes priority over `ClasspathBaseResourceContext`.
248 val AppBaseResourceContext = new FileStreamResourceContext(".")
251 * The venerable /etc resource context
253 val SlashEtcResourceContext = new FileStreamResourceContext("/etc/aquarium")
256 * Class loader resource context.
257 * This has the lowest priority.
259 val ClasspathBaseResourceContext = new ClassLoaderStreamResourceContext(Thread.currentThread().getContextClassLoader)
262 * Last resort: read properties from the command line
264 val CmdLineResourceContext = new SysPropResourceContext
267 * Use this property to override the place where aquarium configuration resides.
269 * The value of this property is a folder that defines the highest-priority resource context.
271 val ConfBaseFolderSysProp = SysProp("aquarium.conf.base.folder")
273 val BasicResourceContext = new CompositeStreamResourceContext(
275 SlashEtcResourceContext,
276 AppBaseResourceContext,
277 ClasspathBaseResourceContext,
278 CmdLineResourceContext)
281 * The resource context used in the application.
283 lazy val MasterResourceContext = {
284 ConfBaseFolderSysProp.value match {
286 // We have a system override for the configuration location
287 new CompositeStreamResourceContext(Just(BasicResourceContext), new FileStreamResourceContext(value))
291 throw new RuntimeException(m , e)
295 lazy val MasterConfResource = {
296 val maybeMCResource = MasterResourceContext getResource MasterConfName
297 maybeMCResource match {
298 case Just(masterConfResource) ⇒
301 throw new RuntimeException("Could not find master configuration file: %s".format(MasterConfName))
303 throw new RuntimeException(m, e)
307 lazy val MasterConfProps = {
308 val maybeProps = Props apply MasterConfResource
313 throw new RuntimeException("Could not load master configuration file: %s".format(MasterConfName))
315 throw new RuntimeException(m, e)
319 lazy val MasterConfigurator = {
320 Maybe(new Configurator(MasterConfProps)) match {
321 case Just(masterConf) ⇒
324 throw new RuntimeException("Could not initialize master configuration file: %s".format(MasterConfName))
326 throw new RuntimeException(m, e)
331 * Defines the names of all the known keys inside the master properties file.
335 final val allKeys = List(version, actor_provider_class, rest_service_class,
336 store_provider_class, user_state_store_class, resource_event_store_class,
337 user_event_store_class, wallet_entry_store_class, policy_store_class,
338 user_actors_lru_lower_mark, user_actors_lru_upper_mark, amqp_servers,
339 amqp_port, amqp_username, amqp_password, amqp_vhost, amqp_exchanges,
340 rest_port, persistence_provider, persistence_host, persistence_username,
341 persistence_password, persistence_port, persistence_db,
342 mongo_connection_pool_size, aquarium_policy, user_state_timestamp_threshold,
346 * The Aquarium version. Will be reported in any due occasion.
348 final val version = "version"
351 * The fully qualified name of the class that implements the `ActorProvider`.
352 * Will be instantiated reflectively and should have a public default constructor.
354 final val actor_provider_class = "actor.provider.class"
357 * The class that initializes the REST service
359 final val rest_service_class = "rest.service.class"
362 * The fully qualified name of the class that implements the `StoreProvider`.
363 * Will be instantiated reflectively and should have a public default constructor.
365 final val store_provider_class = "store.provider.class"
368 * The class that implements the User store
370 final val user_state_store_class = "user.state.store.class"
373 * The class that implements the resource event store
375 final val resource_event_store_class = "resource.event.store.class"
378 * The class that implements the IM event store
380 final val user_event_store_class = "user.event.store.class"
383 * The class that implements the wallet entries store
385 final val wallet_entry_store_class = "wallet.entry.store.class"
388 * The class that implements the wallet entries store
390 final val policy_store_class = "policy.store.class"
393 /** The lower mark for the UserActors' LRU, managed by UserActorManager.
395 * The terminology is borrowed from the (also borrowed) Apache-lucene-solr-based implementation.
398 final val user_actors_lru_lower_mark = "user.actors.LRU.lower.mark"
401 * The upper mark for the UserActors' LRU, managed by UserActorManager.
403 * The terminology is borrowed from the (also borrowed) Apache-lucene-solr-based implementation.
405 final val user_actors_lru_upper_mark = "user.actors.LRU.upper.mark"
408 * Comma separated list of amqp servers running in active-active
411 final val amqp_servers = "amqp.servers"
414 * Comma separated list of amqp servers running in active-active
417 final val amqp_port = "amqp.port"
420 * User name for connecting with the AMQP server
422 final val amqp_username = "amqp.username"
425 * Passwd for connecting with the AMQP server
427 final val amqp_password = "amqp.passwd"
430 * Virtual host on the AMQP server
432 final val amqp_vhost = "amqp.vhost"
435 * Comma separated list of exchanges known to aquarium
437 final val amqp_exchanges = "amqp.exchanges"
440 * REST service listening port.
444 final val rest_port = "rest.port"
447 * Provider for persistence services
449 final val persistence_provider = "persistence.provider"
452 * Hostname for the persistence service
454 final val persistence_host = "persistence.host"
457 * Username for connecting to the persistence service
459 final val persistence_username = "persistence.username"
462 * Password for connecting to the persistence service
464 final val persistence_password = "persistence.password"
467 * Password for connecting to the persistence service
469 final val persistence_port = "persistence.port"
472 * The DB schema to use
474 final val persistence_db = "persistence.db"
477 * Maximum number of open connections to MongoDB
479 final val mongo_connection_pool_size = "mongo.connection.pool.size"
482 * Location of the Aquarium accounting policy config file
484 final val aquarium_policy = "aquarium.policy"
487 * A time period in milliseconds for which we can tolerate stale data regarding user state.
489 * The smaller the value, the more accurate the user credits and other state data are.
491 * If a request for user state (e.g. balance) is received and the request timestamp exceeds
492 * the timestamp of the last known balance amount by this value, then a re-computation for
493 * the balance is triggered.
495 final val user_state_timestamp_threshold = "user.state.timestamp.threshold"
498 * The time unit is the lowest billable time period.
499 * For example, with a time unit of ten seconds, if a VM is started up and shut down in nine
500 * seconds, then the user will be billed for ten seconds.
502 * This is an overall constant. We use it as a property in order to prepare ourselves for
503 * multi-cloud setup, where the same Aquarium instance is used to bill several distinct cloud
506 final val time_unit_in_millis = "time.unit.in.seconds"
511 * Reads configuration keys from system properties.
513 class SysPropResource extends StreamResourceSkeleton {
516 def url = new URL("cmd-line://")
519 def canonicalPath = "cmdline"
521 override def _inputStream: InputStream = {
522 val props = Configurator.Keys.allKeys.foldLeft(new StringBuffer()){
524 SysProp(k).value match {
526 b.append(k).append("=").append(x).append("\n")
531 new ByteArrayInputStream(props.toString.getBytes("UTF-8"))
535 class SysPropResourceContext extends StreamResourceContextSkeleton(NoVal) {
536 def /(child: String) = null
538 override def getResource(path: String, normalized: Boolean) = Just(new SysPropResource)
540 def getLocalResource(path: String, normalized: Boolean) = Just(new SysPropResource)
542 override def toString = "SysPropResourceContext"