Read configuration values from the cmd-line
[aquarium] / src / main / scala / gr / grnet / aquarium / Configurator.scala
1 /*
2  * Copyright 2011 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium
37
38 import actor.{ActorProvider}
39 import com.ckkloverdos.resource._
40 import com.ckkloverdos.sys.SysProp
41 import com.ckkloverdos.props.Props
42 import com.ckkloverdos.maybe.{Maybe, Failed, Just, NoVal}
43 import com.ckkloverdos.convert.Converters.{DefaultConverters => TheDefaultConverters}
44 import processor.actor.{UserEventProcessorService, ResourceEventProcessorService, EventProcessorService}
45 import store._
46 import util.{Lifecycle, Loggable}
47 import java.net.URL
48 import collection.mutable.Buffer
49 import java.io.{ByteArrayInputStream, InputStream, Reader, BufferedReader}
50
51 /**
52  * The master configurator. Responsible to load all of application configuration and provide the relevant services.
53  *
54  * @author Christos KK Loverdos <loverdos@gmail.com>.
55  */
56 class Configurator(val props: Props) extends Loggable {
57   import Configurator.Keys
58
59   /**
60    * Reflectively provide a new instance of a class and configure it appropriately.
61    */
62   private[this] def newInstance[C : Manifest](className: String): C = {
63     val instanceM = Maybe(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
64     instanceM match {
65       case Just(instance) ⇒ instance match {
66         case configurable: Configurable ⇒
67           Maybe(configurable configure props) match {
68             case Just(_) ⇒
69               instance
70             case Failed(e, _) ⇒
71               throw new Exception("Could not configure instance of %s".format(className), e)
72             case NoVal ⇒
73               throw new Exception("Could not configure instance of %s".format(className))
74           }
75         case _ ⇒
76           instance
77       }
78       case Failed(e, _) ⇒
79         throw new Exception("Could not instantiate %s".format(className), e)
80       case NoVal ⇒
81         throw new Exception("Could not instantiate %s".format(className))
82     }
83
84   }
85
86   private[this] lazy val _actorProvider: ActorProvider = {
87     val instance = newInstance[ActorProvider](props.getEx(Keys.actor_provider_class))
88     logger.info("Loaded ActorProvider: %s".format(instance.getClass))
89     instance
90   }
91
92   /**
93    * Initializes a store provider, according to the value configured
94    * in the configuration file. The
95    */
96   private[this] lazy val _storeProvider: StoreProvider = {
97     val instance = newInstance[StoreProvider](props.getEx(Keys.store_provider_class))
98     logger.info("Loaded StoreProvider: %s".format(instance.getClass))
99     instance
100   }
101   
102   private[this] lazy val _restService: Lifecycle = {
103     val instance = newInstance[Lifecycle](props.getEx(Keys.rest_service_class))
104     logger.info("Loaded RESTService: %s".format(instance.getClass))
105     instance
106   }
107
108   private[this] lazy val _userStateStoreM: Maybe[UserStateStore] = {
109     // If there is a specific `UserStateStore` implementation specified in the
110     // properties, then this implementation overrides the user store given by
111     // `StoreProvider`.
112     props.get(Keys.user_state_store_class) map { className ⇒
113       val instance = newInstance[UserStateStore](className)
114       logger.info("Overriding UserStateStore provisioning. Implementation given by: %s".format(instance.getClass))
115       instance
116     }
117   }
118
119   private[this] lazy val _resourceEventStoreM: Maybe[ResourceEventStore] = {
120     // If there is a specific `EventStore` implementation specified in the
121     // properties, then this implementation overrides the event store given by
122     // `StoreProvider`.
123     props.get(Keys.resource_event_store_class) map { className ⇒
124       val instance = newInstance[ResourceEventStore](className)
125       logger.info("Overriding EventStore provisioning. Implementation given by: %s".format(instance.getClass))
126       instance
127     }
128   }
129
130   private[this] lazy val _userEventStoreM: Maybe[UserEventStore] = {
131     props.get(Keys.user_event_store_class) map { className ⇒
132       val instance = newInstance[UserEventStore](className)
133       logger.info("Overriding UserEventStore provisioning. Implementation given by: %s".format(instance.getClass))
134       instance
135     }
136   }
137
138   private[this] lazy val _WalletEventStoreM: Maybe[WalletEntryStore] = {
139     // If there is a specific `IMStore` implementation specified in the
140     // properties, then this implementation overrides the event store given by
141     // `IMProvider`.
142     props.get(Keys.wallet_entry_store_class) map {
143       className ⇒
144         val instance = newInstance[WalletEntryStore](className)
145         logger.info("Overriding WalletEntryStore provisioning. Implementation given by: %s".format(instance.getClass))
146         instance
147     }
148   }
149
150   private[this] lazy val _policyStoreM: Maybe[PolicyStore] = {
151     props.get(Keys.policy_store_class) map {
152       className ⇒
153         val instance = newInstance[PolicyStore](className)
154         logger.info("Overriding PolicyStore provisioning. Implementation given by: %s".format(instance.getClass))
155         instance
156     }
157   }
158
159   private[this] lazy val _resEventProc: ResourceEventProcessorService = new ResourceEventProcessorService
160
161   private[this] lazy val _imEventProc: UserEventProcessorService = new UserEventProcessorService
162
163   def get(key: String, default: String = ""): String = props.getOr(key, default)
164
165   def defaultClassLoader = Thread.currentThread().getContextClassLoader
166
167   def startServices(): Unit = {
168     _restService.start()
169     _actorProvider.start()
170     _resEventProc.start()
171     _imEventProc.start()
172   }
173
174   def stopServices(): Unit = {
175     _imEventProc.stop()
176     _resEventProc.stop()
177     _restService.stop()
178     _actorProvider.stop()
179
180 //    akka.actor.Actor.registry.shutdownAll()
181   }
182
183   def stopServicesWithDelay(millis: Long) {
184     Thread sleep millis
185     stopServices()
186   }
187   
188   def actorProvider = _actorProvider
189
190   def userStateStore = {
191     _userStateStoreM match {
192       case Just(us) ⇒ us
193       case _        ⇒ storeProvider.userStateStore
194     }
195   }
196
197   def resourceEventStore = {
198     _resourceEventStoreM match {
199       case Just(es) ⇒ es
200       case _        ⇒ storeProvider.resourceEventStore
201     }
202   }
203
204   def walletStore = {
205     _WalletEventStoreM match {
206       case Just(es) ⇒ es
207       case _        ⇒ storeProvider.walletEntryStore
208     }
209   }
210
211   def userEventStore = {
212     _userEventStoreM match {
213       case Just(es) ⇒ es
214       case _        ⇒ storeProvider.userEventStore
215     }
216   }
217
218   def policyStore = {
219     _policyStoreM match {
220       case Just(es) ⇒ es
221       case _        ⇒ storeProvider.policyStore
222     }
223   }
224
225   def storeProvider = _storeProvider
226   
227   def withStoreProviderClass[C <: StoreProvider](spc: Class[C]): Configurator = {
228     val map = this.props.map
229     val newMap = map.updated(Keys.store_provider_class, spc.getName)
230     val newProps = new Props(newMap)
231     new Configurator(newProps)
232   }
233 }
234
235 object Configurator {
236   implicit val DefaultConverters = TheDefaultConverters
237
238   val MasterConfName = "aquarium.properties"
239
240   val PolicyConfName = "policy.yaml"
241
242   /**
243    * Current directory resource context.
244    * Normally this should be the application installation directory.
245    *
246    * It takes priority over `ClasspathBaseResourceContext`.
247    */
248   val AppBaseResourceContext = new FileStreamResourceContext(".")
249
250   /**
251    * The venerable /etc resource context
252    */
253   val SlashEtcResourceContext = new FileStreamResourceContext("/etc/aquarium")
254
255   /**
256    * Class loader resource context.
257    * This has the lowest priority.
258    */
259   val ClasspathBaseResourceContext = new ClassLoaderStreamResourceContext(Thread.currentThread().getContextClassLoader)
260
261   /**
262    * Last resort: read properties from the command line
263    */
264   val CmdLineResourceContext = new SysPropResourceContext
265
266   /**
267    * Use this property to override the place where aquarium configuration resides.
268    *
269    * The value of this property is a folder that defines the highest-priority resource context.
270    */
271   val ConfBaseFolderSysProp = SysProp("aquarium.conf.base.folder")
272
273   val BasicResourceContext = new CompositeStreamResourceContext(
274     NoVal,
275     SlashEtcResourceContext,
276     AppBaseResourceContext,
277     ClasspathBaseResourceContext,
278     CmdLineResourceContext)
279
280   /**
281    * The resource context used in the application.
282    */
283   lazy val MasterResourceContext = {
284     ConfBaseFolderSysProp.value match {
285       case Just(value) ⇒
286         // We have a system override for the configuration location
287         new CompositeStreamResourceContext(Just(BasicResourceContext), new FileStreamResourceContext(value))
288       case NoVal ⇒
289         BasicResourceContext
290       case Failed(e, m) ⇒
291         throw new RuntimeException(m , e)
292     }
293   }
294
295   lazy val MasterConfResource = {
296     val maybeMCResource = MasterResourceContext getResource MasterConfName
297     maybeMCResource match {
298       case Just(masterConfResource) ⇒
299         masterConfResource
300       case NoVal ⇒
301         throw new RuntimeException("Could not find master configuration file: %s".format(MasterConfName))
302       case Failed(e, m) ⇒
303         throw new RuntimeException(m, e)
304     }
305   }
306
307   lazy val MasterConfProps = {
308     val maybeProps = Props apply MasterConfResource
309     maybeProps match {
310       case Just(props) ⇒
311         props
312       case NoVal ⇒
313         throw new RuntimeException("Could not load master configuration file: %s".format(MasterConfName))
314       case Failed(e, m) ⇒
315         throw new RuntimeException(m, e)
316     }
317   }
318
319   lazy val MasterConfigurator = {
320     Maybe(new Configurator(MasterConfProps)) match {
321       case Just(masterConf) ⇒
322         masterConf
323       case NoVal ⇒
324         throw new RuntimeException("Could not initialize master configuration file: %s".format(MasterConfName))
325       case Failed(e, m) ⇒
326         throw new RuntimeException(m, e)
327     }
328   }
329
330   /**
331    * Defines the names of all the known keys inside the master properties file.
332    */
333   final object Keys {
334
335     final val allKeys = List(version, actor_provider_class, rest_service_class,
336       store_provider_class, user_state_store_class, resource_event_store_class,
337       user_event_store_class, wallet_entry_store_class, policy_store_class,
338       user_actors_lru_lower_mark, user_actors_lru_upper_mark, amqp_servers,
339       amqp_port, amqp_username, amqp_password, amqp_vhost, amqp_exchanges,
340       rest_port, persistence_provider, persistence_host, persistence_username,
341       persistence_password, persistence_port, persistence_db,
342       mongo_connection_pool_size, aquarium_policy, user_state_timestamp_threshold,
343       time_unit_in_millis)
344
345     /**
346      * The Aquarium version. Will be reported in any due occasion.
347      */
348     final val version = "version"
349
350     /**
351      * The fully qualified name of the class that implements the `ActorProvider`.
352      * Will be instantiated reflectively and should have a public default constructor.
353      */
354     final val actor_provider_class = "actor.provider.class"
355
356     /**
357      * The class that initializes the REST service
358      */
359     final val rest_service_class = "rest.service.class"
360
361     /**
362      * The fully qualified name of the class that implements the `StoreProvider`.
363      * Will be instantiated reflectively and should have a public default constructor.
364      */
365     final val store_provider_class = "store.provider.class"
366
367     /**
368      * The class that implements the User store
369      */
370     final val user_state_store_class = "user.state.store.class"
371
372     /**
373      * The class that implements the resource event store
374      */
375     final val resource_event_store_class = "resource.event.store.class"
376
377     /**
378      * The class that implements the IM event store
379      */
380     final val user_event_store_class = "user.event.store.class"
381
382     /**
383      * The class that implements the wallet entries store
384      */
385     final val wallet_entry_store_class = "wallet.entry.store.class"
386
387     /**
388      * The class that implements the wallet entries store
389      */
390     final val policy_store_class = "policy.store.class"
391
392
393     /** The lower mark for the UserActors' LRU, managed by UserActorManager.
394      *
395      * The terminology is borrowed from the (also borrowed) Apache-lucene-solr-based implementation.
396      *
397      */
398     final val user_actors_lru_lower_mark = "user.actors.LRU.lower.mark"
399
400     /**
401      * The upper mark for the UserActors' LRU, managed by UserActorManager.
402      *
403      * The terminology is borrowed from the (also borrowed) Apache-lucene-solr-based implementation.
404      */
405     final val user_actors_lru_upper_mark = "user.actors.LRU.upper.mark"
406
407     /**
408      * Comma separated list of amqp servers running in active-active
409      * configuration.
410      */
411     final val amqp_servers = "amqp.servers"
412
413     /**
414      * Comma separated list of amqp servers running in active-active
415      * configuration.
416      */
417     final val amqp_port = "amqp.port"
418
419     /**
420      * User name for connecting with the AMQP server
421      */
422     final val amqp_username = "amqp.username"
423
424     /**
425      * Passwd for connecting with the AMQP server
426      */
427     final val amqp_password = "amqp.passwd"
428
429     /**
430      * Virtual host on the AMQP server
431      */
432     final val amqp_vhost = "amqp.vhost"
433
434     /**
435      * Comma separated list of exchanges known to aquarium
436      */
437     final val amqp_exchanges = "amqp.exchanges"
438
439     /**
440      * REST service listening port.
441      *
442      * Default is 8080.
443      */
444     final val rest_port = "rest.port"
445
446     /*
447      * Provider for persistence services
448      */
449     final val persistence_provider = "persistence.provider"
450
451     /**
452      * Hostname for the persistence service
453      */
454     final val persistence_host = "persistence.host"
455
456     /**
457      * Username for connecting to the persistence service
458      */
459     final val persistence_username = "persistence.username"
460
461     /**
462      *  Password for connecting to the persistence service
463      */
464     final val persistence_password = "persistence.password"
465
466     /**
467      *  Password for connecting to the persistence service
468      */
469     final val persistence_port = "persistence.port"
470
471     /**
472      *  The DB schema to use
473      */
474     final val persistence_db = "persistence.db"
475
476     /**
477      * Maximum number of open connections to MongoDB
478      */
479     final val mongo_connection_pool_size = "mongo.connection.pool.size"
480
481     /**
482      * Location of the Aquarium accounting policy config file
483      */
484     final val aquarium_policy = "aquarium.policy"
485
486     /**
487      * A time period in milliseconds for which we can tolerate stale data regarding user state.
488      *
489      * The smaller the value, the more accurate the user credits and other state data are.
490      *
491      * If a request for user state (e.g. balance) is received and the request timestamp exceeds
492      * the timestamp of the last known balance amount by this value, then a re-computation for
493      * the balance is triggered.
494      */
495     final val user_state_timestamp_threshold = "user.state.timestamp.threshold"
496
497     /**
498      * The time unit is the lowest billable time period.
499      * For example, with a time unit of ten seconds, if a VM is started up and shut down in nine
500      * seconds, then the user will be billed for ten seconds.
501      *
502      * This is an overall constant. We use it as a property in order to prepare ourselves for
503      * multi-cloud setup, where the same Aquarium instance is used to bill several distinct cloud
504      * infrastructures.
505      */
506     final val time_unit_in_millis = "time.unit.in.seconds"
507   }
508 }
509
510 /**
511  * Reads configuration keys from system properties.
512  */
513 class SysPropResource extends StreamResourceSkeleton {
514
515   def exists = true
516   def url = new URL("cmd-line://")
517   def name = "cmdline"
518   def path = "cmdline"
519   def canonicalPath = "cmdline"
520
521   override def _inputStream: InputStream = {
522     val props = Configurator.Keys.allKeys.foldLeft(new StringBuffer()){
523       (b, k) =>
524         SysProp(k).value match {
525           case Just(x) =>
526             b.append(k).append("=").append(x).append("\n")
527           case _ => b
528         }
529     }
530
531     new ByteArrayInputStream(props.toString.getBytes("UTF-8"))
532   }
533 }
534
535 class SysPropResourceContext extends StreamResourceContextSkeleton(NoVal) {
536   def /(child: String) = null
537
538   override def getResource(path: String, normalized: Boolean) = Just(new SysPropResource)
539
540   def getLocalResource(path: String, normalized: Boolean) = Just(new SysPropResource)
541
542   override def toString = "SysPropResourceContext"
543 }