Better implement one query on resource event store
[aquarium] / src / main / scala / gr / grnet / aquarium / Configurator.scala
1 /*
2  * Copyright 2011 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium
37
38 import actor.{ActorProvider}
39 import com.ckkloverdos.resource._
40 import com.ckkloverdos.sys.SysProp
41 import com.ckkloverdos.props.Props
42 import com.ckkloverdos.maybe.{Maybe, Failed, Just, NoVal}
43 import com.ckkloverdos.convert.Converters.{DefaultConverters => TheDefaultConverters}
44 import processor.actor.{UserEventProcessorService, ResourceEventProcessorService, EventProcessorService}
45 import store._
46 import util.{Lifecycle, Loggable}
47
48 /**
49  * The master configurator. Responsible to load all of application configuration and provide the relevant services.
50  *
51  * @author Christos KK Loverdos <loverdos@gmail.com>.
52  */
53 class Configurator(val props: Props) extends Loggable {
54   import Configurator.Keys
55
56   /**
57    * Reflectively provide a new instance of a class and configure it appropriately.
58    */
59   private[this] def newInstance[C : Manifest](className: String): C = {
60     val instanceM = Maybe(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
61     instanceM match {
62       case Just(instance) ⇒ instance match {
63         case configurable: Configurable ⇒
64           Maybe(configurable configure props) match {
65             case Just(_) ⇒
66               instance
67             case Failed(e, _) ⇒
68               throw new Exception("Could not configure instance of %s".format(className), e)
69             case NoVal ⇒
70               throw new Exception("Could not configure instance of %s".format(className))
71           }
72         case _ ⇒
73           instance
74       }
75       case Failed(e, _) ⇒
76         throw new Exception("Could not instantiate %s".format(className), e)
77       case NoVal ⇒
78         throw new Exception("Could not instantiate %s".format(className))
79     }
80
81   }
82
83   private[this] lazy val _actorProvider: ActorProvider = {
84     val instance = newInstance[ActorProvider](props.getEx(Keys.actor_provider_class))
85     logger.info("Loaded ActorProvider: %s".format(instance.getClass))
86     instance
87   }
88
89   private[this] lazy val _storeProvider: StoreProvider = {
90     val instance = newInstance[StoreProvider](props.getEx(Keys.store_provider_class))
91     logger.info("Loaded StoreProvider: %s".format(instance.getClass))
92     instance
93   }
94   
95   private[this] lazy val _restService: Lifecycle = {
96     val instance = newInstance[Lifecycle](props.getEx(Keys.rest_service_class))
97     logger.info("Loaded RESTService: %s".format(instance.getClass))
98     instance
99   }
100
101   private[this] lazy val _userStateStoreM: Maybe[UserStateStore] = {
102     // If there is a specific `UserStateStore` implementation specified in the
103     // properties, then this implementation overrides the user store given by
104     // `StoreProvider`.
105     props.get(Keys.user_state_store_class) map { className ⇒
106       val instance = newInstance[UserStateStore](className)
107       logger.info("Overriding UserStateStore provisioning. Implementation given by: %s".format(instance.getClass))
108       instance
109     }
110   }
111
112   private[this] lazy val _resourceEventStoreM: Maybe[ResourceEventStore] = {
113     // If there is a specific `EventStore` implementation specified in the
114     // properties, then this implementation overrides the event store given by
115     // `StoreProvider`.
116     props.get(Keys.resource_event_store_class) map { className ⇒
117       val instance = newInstance[ResourceEventStore](className)
118       logger.info("Overriding EventStore provisioning. Implementation given by: %s".format(instance.getClass))
119       instance
120     }
121   }
122
123   private[this] lazy val _userEventStoreM: Maybe[UserEventStore] = {
124     props.get(Keys.user_event_store_class) map { className ⇒
125       val instance = newInstance[UserEventStore](className)
126       logger.info("Overriding UserEventStore provisioning. Implementation given by: %s".format(instance.getClass))
127       instance
128     }
129   }
130
131   private[this] lazy val _WalletEventStoreM: Maybe[WalletEntryStore] = {
132     // If there is a specific `IMStore` implementation specified in the
133     // properties, then this implementation overrides the event store given by
134     // `IMProvider`.
135     props.get(Keys.wallet_entry_store_class) map {
136       className ⇒
137         val instance = newInstance[WalletEntryStore](className)
138         logger.info("Overriding WalletEntryStore provisioning. Implementation given by: %s".format(instance.getClass))
139         instance
140     }
141   }
142
143   private[this] lazy val _policyStoreM: Maybe[PolicyStore] = {
144     props.get(Keys.policy_store_class) map {
145       className ⇒
146         val instance = newInstance[PolicyStore](className)
147         logger.info("Overriding PolicyStore provisioning. Implementation given by: %s".format(instance.getClass))
148         instance
149     }
150   }
151
152   private[this] lazy val _resEventProc: ResourceEventProcessorService = new ResourceEventProcessorService
153
154   private[this] lazy val _imEventProc: UserEventProcessorService = new UserEventProcessorService
155
156   def get(key: String, default: String = ""): String = props.getOr(key, default)
157
158   def defaultClassLoader = Thread.currentThread().getContextClassLoader
159
160   def startServices(): Unit = {
161     _restService.start()
162     _actorProvider.start()
163     _resEventProc.start()
164     _imEventProc.start()
165   }
166
167   def stopServices(): Unit = {
168     _imEventProc.stop()
169     _resEventProc.stop()
170     _restService.stop()
171     _actorProvider.stop()
172
173 //    akka.actor.Actor.registry.shutdownAll()
174   }
175
176   def stopServicesWithDelay(millis: Long) {
177     Thread sleep millis
178     stopServices()
179   }
180   
181   def actorProvider = _actorProvider
182
183   def userStateStore = {
184     _userStateStoreM match {
185       case Just(us) ⇒ us
186       case _        ⇒ storeProvider.userStateStore
187     }
188   }
189
190   def resourceEventStore = {
191     _resourceEventStoreM match {
192       case Just(es) ⇒ es
193       case _        ⇒ storeProvider.resourceEventStore
194     }
195   }
196
197   def walletStore = {
198     _WalletEventStoreM match {
199       case Just(es) ⇒ es
200       case _        ⇒ storeProvider.walletEntryStore
201     }
202   }
203
204   def userEventStore = {
205     _userEventStoreM match {
206       case Just(es) ⇒ es
207       case _        ⇒ storeProvider.userEventStore
208     }
209   }
210
211   def policyEventStore = {
212     _policyStoreM match {
213       case Just(es) ⇒ es
214       case _        ⇒ storeProvider.policyStore
215     }
216   }
217
218   def storeProvider = _storeProvider
219   
220   def withStoreProviderClass[C <: StoreProvider](spc: Class[C]): Configurator = {
221     val map = this.props.map
222     val newMap = map.updated(Keys.store_provider_class, spc.getName)
223     val newProps = new Props(newMap)
224     new Configurator(newProps)
225   }
226 }
227
228 object Configurator {
229   implicit val DefaultConverters = TheDefaultConverters
230
231   val MasterConfName = "aquarium.properties"
232
233   val PolicyConfName = "policy.yaml"
234
235   /**
236    * Current directory resource context.
237    * Normally this should be the application installation directory.
238    *
239    * It takes priority over `ClasspathBaseResourceContext`.
240    */
241   val AppBaseResourceContext = new FileStreamResourceContext(".")
242
243   /**
244    * The venerable /etc resource context
245    */
246   val SlashEtcResourceContext = new FileStreamResourceContext("/etc/aquarium")
247
248   /**
249    * Class loader resource context.
250    * This has the lowest priority.
251    */
252   val ClasspathBaseResourceContext = new ClassLoaderStreamResourceContext(Thread.currentThread().getContextClassLoader)
253
254   /**
255    * Use this property to override the place where aquarium configuration resides.
256    *
257    * The value of this property is a folder that defines the highest-priority resource context.
258    */
259   val ConfBaseFolderSysProp = SysProp("aquarium.conf.base.folder")
260
261   val BasicResourceContext = new CompositeStreamResourceContext(
262     NoVal,
263     SlashEtcResourceContext,
264     AppBaseResourceContext,
265     ClasspathBaseResourceContext)
266
267   /**
268    * The resource context used in the application.
269    */
270   lazy val MasterResourceContext = {
271     ConfBaseFolderSysProp.value match {
272       case Just(value) ⇒
273         // We have a system override for the configuration location
274         new CompositeStreamResourceContext(Just(BasicResourceContext), new FileStreamResourceContext(value))
275       case NoVal ⇒
276         BasicResourceContext
277       case Failed(e, m) ⇒
278         throw new RuntimeException(m , e)
279     }
280   }
281
282   lazy val MasterConfResource = {
283     val maybeMCResource = MasterResourceContext getResource MasterConfName
284     maybeMCResource match {
285       case Just(masterConfResource) ⇒
286         masterConfResource
287       case NoVal ⇒
288         throw new RuntimeException("Could not find master configuration file: %s".format(MasterConfName))
289       case Failed(e, m) ⇒
290         throw new RuntimeException(m, e)
291     }
292   }
293
294   lazy val MasterConfProps = {
295     val maybeProps = Props apply MasterConfResource
296     maybeProps match {
297       case Just(props) ⇒
298         props
299       case NoVal ⇒
300         throw new RuntimeException("Could not load master configuration file: %s".format(MasterConfName))
301       case Failed(e, m) ⇒
302         throw new RuntimeException(m, e)
303     }
304   }
305
306   lazy val MasterConfigurator = {
307     Maybe(new Configurator(MasterConfProps)) match {
308       case Just(masterConf) ⇒
309         masterConf
310       case NoVal ⇒
311         throw new RuntimeException("Could not initialize master configuration file: %s".format(MasterConfName))
312       case Failed(e, m) ⇒
313         throw new RuntimeException(m, e)
314     }
315   }
316
317   /**
318    * Defines the names of all the known keys inside the master properties file.
319    */
320   final object Keys {
321     /**
322      * The Aquarium version. Will be reported in any due occasion.
323      */
324     final val version = "version"
325
326     /**
327      * The fully qualified name of the class that implements the `ActorProvider`.
328      * Will be instantiated reflectively and should have a public default constructor.
329      */
330     final val actor_provider_class = "actor.provider.class"
331
332     /**
333      * The class that initializes the REST service
334      */
335     final val rest_service_class = "rest.service.class"
336
337     /**
338      * The fully qualified name of the class that implements the `StoreProvider`.
339      * Will be instantiated reflectively and should have a public default constructor.
340      */
341     final val store_provider_class = "store.provider.class"
342
343     /**
344      * The class that implements the User store
345      */
346     final val user_state_store_class = "user.state.store.class"
347
348     /**
349      * The class that implements the resource event store
350      */
351     final val resource_event_store_class = "resource.event.store.class"
352
353     /**
354      * The class that implements the IM event store
355      */
356     final val user_event_store_class = "user.event.store.class"
357
358     /**
359      * The class that implements the wallet entries store
360      */
361     final val wallet_entry_store_class = "wallet.entry.store.class"
362
363     /**
364      * The class that implements the wallet entries store
365      */
366     final val policy_store_class = "policy.store.class"
367
368
369     /** The lower mark for the UserActors' LRU, managed by UserActorManager.
370      *
371      * The terminology is borrowed from the (also borrowed) Apache-lucene-solr-based implementation.
372      *
373      */
374     final val user_actors_lru_lower_mark = "user.actors.LRU.lower.mark"
375
376     /**
377      * The upper mark for the UserActors' LRU, managed by UserActorManager.
378      *
379      * The terminology is borrowed from the (also borrowed) Apache-lucene-solr-based implementation.
380      */
381     final val user_actors_lru_upper_mark = "user.actors.LRU.upper.mark"
382
383     /**
384      * Comma separated list of amqp servers running in active-active
385      * configuration.
386      */
387     final val amqp_servers = "amqp.servers"
388
389     /**
390      * Comma separated list of amqp servers running in active-active
391      * configuration.
392      */
393     final val amqp_port = "amqp.port"
394
395     /**
396      * User name for connecting with the AMQP server
397      */
398     final val amqp_username = "amqp.username"
399
400     /**
401      * Passwd for connecting with the AMQP server
402      */
403     final val amqp_password = "amqp.passwd"
404
405     /**
406      * Virtual host on the AMQP server
407      */
408     final val amqp_vhost = "amqp.vhost"
409
410     /**
411      * Comma separated list of exchanges known to aquarium
412      */
413     final val amqp_exchanges = "amqp.exchanges"
414
415     /**
416      * REST service listening port.
417      *
418      * Default is 8080.
419      */
420     final val rest_port = "rest.port"
421
422     /*
423      * Provider for persistence services
424      */
425     final val persistence_provider = "persistence.provider"
426
427     /**
428      * Hostname for the persistence service
429      */
430     final val persistence_host = "persistence.host"
431
432     /**
433      * Username for connecting to the persistence service
434      */
435     final val persistence_username = "persistence.username"
436
437     /**
438      *  Password for connecting to the persistence service
439      */
440     final val persistence_password = "persistence.password"
441
442     /**
443      *  Password for connecting to the persistence service
444      */
445     final val persistence_port = "persistence.port"
446
447     /**
448      *  The DB schema to use
449      */
450     final val persistence_db = "persistence.db"
451
452     /**
453      * Maximum number of open connections to MongoDB
454      */
455     final val mongo_connection_pool_size = "mongo.connection.pool.size"
456
457     /**
458      * Location of the Aquarium accounting policy config file
459      */
460     final val aquarium_policy = "aquarium.policy"
461
462     /**
463      * A time period in milliseconds for which we can tolerate stale data regarding user state.
464      *
465      * The smaller the value, the more accurate the user credits and other state data are.
466      *
467      * If a request for user state (e.g. balance) is received and the request timestamp exceeds
468      * the timestamp of the last known balance amount by this value, then a re-computation for
469      * the balance is triggered.
470      */
471     final val user_state_timestamp_threshold = "user.state.timestamp.threshold"
472
473     /**
474      * The time unit is the lowest billable time period.
475      * For example, with a time unit of ten seconds, if a VM is started up and shut down in nine
476      * seconds, then the user will be billed for ten seconds.
477      *
478      * This is an overall constant. We use it as a property in order to prepare ourselves for
479      * multi-cloud setup, where the same Aquarium instance is used to bill several distinct cloud
480      * infrastructures.
481      */
482     final val time_unit_in_millis = "time.unit.in.seconds"
483   }
484 }