Changed user event format
[aquarium] / src / main / scala / gr / grnet / aquarium / Configurator.scala
1 /*
2  * Copyright 2011 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium
37
38 import actor.{ActorProvider}
39 import com.ckkloverdos.resource._
40 import com.ckkloverdos.sys.SysProp
41 import com.ckkloverdos.props.Props
42 import com.ckkloverdos.maybe.{Maybe, Failed, Just, NoVal}
43 import com.ckkloverdos.convert.Converters.{DefaultConverters => TheDefaultConverters}
44 import processor.actor.{UserEventProcessorService, ResourceEventProcessorService}
45 import store._
46 import util.{Lifecycle, Loggable}
47
48 /**
49  * The master configurator. Responsible to load all of application configuration and provide the relevant services.
50  *
51  * @author Christos KK Loverdos <loverdos@gmail.com>.
52  */
53 class Configurator(val props: Props) extends Loggable {
54   import Configurator.Keys
55
56   /**
57    * Reflectively provide a new instance of a class and configure it appropriately.
58    */
59   private[this] def newInstance[C : Manifest](className: String): C = {
60     val instanceM = Maybe(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
61     instanceM match {
62       case Just(instance) ⇒ instance match {
63         case configurable: Configurable ⇒
64           Maybe(configurable configure props) match {
65             case Just(_) ⇒
66               instance
67             case Failed(e, _) ⇒
68               throw new Exception("Could not configure instance of %s".format(className), e)
69             case NoVal ⇒
70               throw new Exception("Could not configure instance of %s".format(className))
71           }
72         case _ ⇒
73           instance
74       }
75       case Failed(e, _) ⇒
76         throw new Exception("Could not instantiate %s".format(className), e)
77       case NoVal ⇒
78         throw new Exception("Could not instantiate %s".format(className))
79     }
80
81   }
82
83   private[this] lazy val _actorProvider: ActorProvider = {
84     val instance = newInstance[ActorProvider](props.getEx(Keys.actor_provider_class))
85     logger.info("Loaded ActorProvider: %s".format(instance.getClass))
86     instance
87   }
88
89   /**
90    * Initializes a store provider, according to the value configured
91    * in the configuration file. The
92    */
93   private[this] lazy val _storeProvider: StoreProvider = {
94     val instance = newInstance[StoreProvider](props.getEx(Keys.store_provider_class))
95     logger.info("Loaded StoreProvider: %s".format(instance.getClass))
96     instance
97   }
98   
99   private[this] lazy val _restService: Lifecycle = {
100     val instance = newInstance[Lifecycle](props.getEx(Keys.rest_service_class))
101     logger.info("Loaded RESTService: %s".format(instance.getClass))
102     instance
103   }
104
105   private[this] lazy val _userStateStoreM: Maybe[UserStateStore] = {
106     // If there is a specific `UserStateStore` implementation specified in the
107     // properties, then this implementation overrides the user store given by
108     // `StoreProvider`.
109     props.get(Keys.user_state_store_class) map { className ⇒
110       val instance = newInstance[UserStateStore](className)
111       logger.info("Overriding UserStateStore provisioning. Implementation given by: %s".format(instance.getClass))
112       instance
113     }
114   }
115
116   private[this] lazy val _resourceEventStoreM: Maybe[ResourceEventStore] = {
117     // If there is a specific `EventStore` implementation specified in the
118     // properties, then this implementation overrides the event store given by
119     // `StoreProvider`.
120     props.get(Keys.resource_event_store_class) map { className ⇒
121       val instance = newInstance[ResourceEventStore](className)
122       logger.info("Overriding EventStore provisioning. Implementation given by: %s".format(instance.getClass))
123       instance
124     }
125   }
126
127   private[this] lazy val _userEventStoreM: Maybe[UserEventStore] = {
128     props.get(Keys.user_event_store_class) map { className ⇒
129       val instance = newInstance[UserEventStore](className)
130       logger.info("Overriding UserEventStore provisioning. Implementation given by: %s".format(instance.getClass))
131       instance
132     }
133   }
134
135   private[this] lazy val _WalletEventStoreM: Maybe[WalletEntryStore] = {
136     // If there is a specific `IMStore` implementation specified in the
137     // properties, then this implementation overrides the event store given by
138     // `IMProvider`.
139     props.get(Keys.wallet_entry_store_class) map {
140       className ⇒
141         val instance = newInstance[WalletEntryStore](className)
142         logger.info("Overriding WalletEntryStore provisioning. Implementation given by: %s".format(instance.getClass))
143         instance
144     }
145   }
146
147   private[this] lazy val _policyStoreM: Maybe[PolicyStore] = {
148     props.get(Keys.policy_store_class) map {
149       className ⇒
150         val instance = newInstance[PolicyStore](className)
151         logger.info("Overriding PolicyStore provisioning. Implementation given by: %s".format(instance.getClass))
152         instance
153     }
154   }
155
156   private[this] lazy val _resEventProc: ResourceEventProcessorService = new ResourceEventProcessorService
157
158   private[this] lazy val _imEventProc: UserEventProcessorService = new UserEventProcessorService
159
160   def get(key: String, default: String = ""): String = props.getOr(key, default)
161
162   def defaultClassLoader = Thread.currentThread().getContextClassLoader
163
164   def startServices(): Unit = {
165     _restService.start()
166     _actorProvider.start()
167     _resEventProc.start()
168     _imEventProc.start()
169   }
170
171   def stopServices(): Unit = {
172     _imEventProc.stop()
173     _resEventProc.stop()
174     _restService.stop()
175     _actorProvider.stop()
176
177 //    akka.actor.Actor.registry.shutdownAll()
178   }
179
180   def stopServicesWithDelay(millis: Long) {
181     Thread sleep millis
182     stopServices()
183   }
184   
185   def actorProvider = _actorProvider
186
187   def userStateStore = {
188     _userStateStoreM match {
189       case Just(us) ⇒ us
190       case _        ⇒ storeProvider.userStateStore
191     }
192   }
193
194   def resourceEventStore = {
195     _resourceEventStoreM match {
196       case Just(es) ⇒ es
197       case _        ⇒ storeProvider.resourceEventStore
198     }
199   }
200
201   def walletStore = {
202     _WalletEventStoreM match {
203       case Just(es) ⇒ es
204       case _        ⇒ storeProvider.walletEntryStore
205     }
206   }
207
208   def userEventStore = {
209     _userEventStoreM match {
210       case Just(es) ⇒ es
211       case _        ⇒ storeProvider.userEventStore
212     }
213   }
214
215   def policyStore = {
216     _policyStoreM match {
217       case Just(es) ⇒ es
218       case _        ⇒ storeProvider.policyStore
219     }
220   }
221
222   def storeProvider = _storeProvider
223   
224   def withStoreProviderClass[C <: StoreProvider](spc: Class[C]): Configurator = {
225     val map = this.props.map
226     val newMap = map.updated(Keys.store_provider_class, spc.getName)
227     val newProps = new Props(newMap)
228     new Configurator(newProps)
229   }
230 }
231
232 object Configurator {
233   implicit val DefaultConverters = TheDefaultConverters
234
235   val MasterConfName = "aquarium.properties"
236
237   val PolicyConfName = "policy.yaml"
238
239   /**
240    * Current directory resource context.
241    */
242   val AppBaseResourceContext = new FileStreamResourceContext(".")
243
244   /**
245    * Default config context for Aquarium distributions
246    */
247   val LocalConfigResourceContext = new FileStreamResourceContext("conf")
248
249   /**
250    * The venerable /etc resource context. Applicable in Unix environments
251    */
252   val SlashEtcResourceContext = new FileStreamResourceContext("/etc/aquarium")
253
254   /**
255    * Class loader resource context.
256    * This has the lowest priority.
257    */
258   val ClasspathBaseResourceContext = new ClassLoaderStreamResourceContext(Thread.currentThread().getContextClassLoader)
259
260   /**
261    * Use this property to override the place where aquarium configuration resides.
262    *
263    * The value of this property is a folder that defines the highest-priority resource context.
264    */
265   val ConfBaseFolderSysProp = SysProp("aquarium.conf.base.folder")
266
267   val BasicResourceContext = new CompositeStreamResourceContext(
268     NoVal,
269     SlashEtcResourceContext,
270     LocalConfigResourceContext,
271     AppBaseResourceContext,
272     ClasspathBaseResourceContext)
273
274   /**
275    * The resource context used in the application.
276    */
277   lazy val MasterResourceContext = {
278     ConfBaseFolderSysProp.value match {
279       case Just(value) ⇒
280         // We have a system override for the configuration location
281         new CompositeStreamResourceContext(Just(BasicResourceContext), new FileStreamResourceContext(value))
282       case NoVal ⇒
283         BasicResourceContext
284       case Failed(e, m) ⇒
285         throw new RuntimeException(m , e)
286     }
287   }
288
289   lazy val MasterConfResource = {
290     val maybeMCResource = MasterResourceContext getResource MasterConfName
291     maybeMCResource match {
292       case Just(masterConfResource) ⇒
293         masterConfResource
294       case NoVal ⇒
295         throw new RuntimeException("Could not find master configuration file: %s".format(MasterConfName))
296       case Failed(e, m) ⇒
297         throw new RuntimeException(m, e)
298     }
299   }
300
301   lazy val MasterConfProps = {
302     val maybeProps = Props apply MasterConfResource
303     maybeProps match {
304       case Just(props) ⇒
305         props
306       case NoVal ⇒
307         throw new RuntimeException("Could not load master configuration file: %s".format(MasterConfName))
308       case Failed(e, m) ⇒
309         throw new RuntimeException(m, e)
310     }
311   }
312
313   lazy val MasterConfigurator = {
314     Maybe(new Configurator(MasterConfProps)) match {
315       case Just(masterConf) ⇒
316         masterConf
317       case NoVal ⇒
318         throw new RuntimeException("Could not initialize master configuration file: %s".format(MasterConfName))
319       case Failed(e, m) ⇒
320         throw new RuntimeException(m, e)
321     }
322   }
323
324   /**
325    * Defines the names of all the known keys inside the master properties file.
326    */
327   final object Keys {
328
329     /**
330      * The Aquarium version. Will be reported in any due occasion.
331      */
332     final val version = "version"
333
334     /**
335      * The fully qualified name of the class that implements the `ActorProvider`.
336      * Will be instantiated reflectively and should have a public default constructor.
337      */
338     final val actor_provider_class = "actor.provider.class"
339
340     /**
341      * The class that initializes the REST service
342      */
343     final val rest_service_class = "rest.service.class"
344
345     /**
346      * The fully qualified name of the class that implements the `StoreProvider`.
347      * Will be instantiated reflectively and should have a public default constructor.
348      */
349     final val store_provider_class = "store.provider.class"
350
351     /**
352      * The class that implements the User store
353      */
354     final val user_state_store_class = "user.state.store.class"
355
356     /**
357      * The class that implements the resource event store
358      */
359     final val resource_event_store_class = "resource.event.store.class"
360
361     /**
362      * The class that implements the IM event store
363      */
364     final val user_event_store_class = "user.event.store.class"
365
366     /**
367      * The class that implements the wallet entries store
368      */
369     final val wallet_entry_store_class = "wallet.entry.store.class"
370
371     /**
372      * The class that implements the wallet entries store
373      */
374     final val policy_store_class = "policy.store.class"
375
376
377     /** The lower mark for the UserActors' LRU, managed by UserActorManager.
378      *
379      * The terminology is borrowed from the (also borrowed) Apache-lucene-solr-based implementation.
380      *
381      */
382     final val user_actors_lru_lower_mark = "user.actors.LRU.lower.mark"
383
384     /**
385      * The upper mark for the UserActors' LRU, managed by UserActorManager.
386      *
387      * The terminology is borrowed from the (also borrowed) Apache-lucene-solr-based implementation.
388      */
389     final val user_actors_lru_upper_mark = "user.actors.LRU.upper.mark"
390
391     /**
392      * Comma separated list of amqp servers running in active-active
393      * configuration.
394      */
395     final val amqp_servers = "amqp.servers"
396
397     /**
398      * Comma separated list of amqp servers running in active-active
399      * configuration.
400      */
401     final val amqp_port = "amqp.port"
402
403     /**
404      * User name for connecting with the AMQP server
405      */
406     final val amqp_username = "amqp.username"
407
408     /**
409      * Passwd for connecting with the AMQP server
410      */
411     final val amqp_password = "amqp.passwd"
412
413     /**
414      * Virtual host on the AMQP server
415      */
416     final val amqp_vhost = "amqp.vhost"
417
418     /**
419      * Comma separated list of exchanges known to aquarium
420      */
421     final val amqp_exchange = "amqp.exchange"
422
423     /**
424      * Queues for retrieving resource events from. Multiple queues can be
425      * declared, seperated by semicolon
426      *
427      * Format is `exchange:routing.key:queue-name;<exchnage2:routing.key2:queue-name>;...`
428      */
429     final val amqp_resevents_queues = "amqp.resevents.queues"
430
431     /**
432      * Queues for retrieving user events from. Multiple queues can be
433      * declared, seperated by semicolon
434      *
435      * Format is `exchange:routing.key:queue-name;<exchnage2:routing.key2:queue-name>;...`
436      */
437     final val amqp_userevents_queues="amqp.userevents.queues"
438
439     /**
440      * REST service listening port.
441      *
442      * Default is 8080.
443      */
444     final val rest_port = "rest.port"
445
446     /*
447      * Provider for persistence services
448      */
449     final val persistence_provider = "persistence.provider"
450
451     /**
452      * Hostname for the persistence service
453      */
454     final val persistence_host = "persistence.host"
455
456     /**
457      * Username for connecting to the persistence service
458      */
459     final val persistence_username = "persistence.username"
460
461     /**
462      *  Password for connecting to the persistence service
463      */
464     final val persistence_password = "persistence.password"
465
466     /**
467      *  Password for connecting to the persistence service
468      */
469     final val persistence_port = "persistence.port"
470
471     /**
472      *  The DB schema to use
473      */
474     final val persistence_db = "persistence.db"
475
476     /**
477      * Maximum number of open connections to MongoDB
478      */
479     final val mongo_connection_pool_size = "mongo.connection.pool.size"
480
481     /**
482      * Location of the Aquarium accounting policy config file
483      */
484     final val aquarium_policy = "aquarium.policy"
485
486     /**
487      * Location of the role-agreement mapping file
488      */
489     final val aquarium_role_agreement_map = "aquarium.role-agreement.map"
490     
491     /**
492      * A time period in milliseconds for which we can tolerate stale data regarding user state.
493      *
494      * The smaller the value, the more accurate the user credits and other state data are.
495      *
496      * If a request for user state (e.g. balance) is received and the request timestamp exceeds
497      * the timestamp of the last known balance amount by this value, then a re-computation for
498      * the balance is triggered.
499      */
500     final val user_state_timestamp_threshold = "user.state.timestamp.threshold"
501
502     /**
503      * The time unit is the lowest billable time period.
504      * For example, with a time unit of ten seconds, if a VM is started up and shut down in nine
505      * seconds, then the user will be billed for ten seconds.
506      *
507      * This is an overall constant. We use it as a property in order to prepare ourselves for
508      * multi-cloud setup, where the same Aquarium instance is used to bill several distinct cloud
509      * infrastructures.
510      */
511     final val time_unit_in_millis = "time.unit.in.seconds"
512   }
513 }