Fix API breakage from previous upgrade
[aquarium] / src / main / scala / gr / grnet / aquarium / Configurator.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium
37
38 import actor.ActorProvider
39 import com.ckkloverdos.props.Props
40 import com.ckkloverdos.maybe.{Maybe, Failed, Just, NoVal}
41 import com.ckkloverdos.convert.Converters.{DefaultConverters => TheDefaultConverters}
42 import processor.actor.{UserEventProcessorService, ResourceEventProcessorService}
43 import store._
44 import util.{Lifecycle, Loggable}
45 import java.io.File
46
47 /**
48  * The master configurator. Responsible to load all of application configuration and provide the relevant services.
49  *
50  * @author Christos KK Loverdos <loverdos@gmail.com>.
51  */
52 class Configurator(val props: Props) extends Loggable {
53   import Configurator.Keys
54
55   /**
56    * Reflectively provide a new instance of a class and configure it appropriately.
57    */
58   private[this] def newInstance[C : Manifest](className: String): C = {
59     val instanceM = Maybe(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
60     instanceM match {
61       case Just(instance) ⇒ instance match {
62         case configurable: Configurable ⇒
63           Maybe(configurable configure props) match {
64             case Just(_) ⇒
65               instance
66             case Failed(e) ⇒
67               throw new AquariumException("Could not configure instance of %s".format(className), e)
68             case NoVal ⇒
69               throw new AquariumException("Could not configure instance of %s".format(className))
70           }
71         case _ ⇒
72           instance
73       }
74       case Failed(e) ⇒
75         throw new AquariumException("Could not instantiate %s".format(className), e)
76       case NoVal ⇒
77         throw new AquariumException("Could not instantiate %s".format(className))
78     }
79
80   }
81
82   private[this] lazy val _actorProvider: ActorProvider = {
83     val instance = newInstance[ActorProvider](props.getEx(Keys.actor_provider_class))
84     logger.info("Loaded ActorProvider: %s".format(instance.getClass))
85     instance
86   }
87
88   /**
89    * Initializes a store provider, according to the value configured
90    * in the configuration file. The
91    */
92   private[this] lazy val _storeProvider: StoreProvider = {
93     val instance = newInstance[StoreProvider](props.getEx(Keys.store_provider_class))
94     logger.info("Loaded StoreProvider: %s".format(instance.getClass))
95     instance
96   }
97   
98   private[this] lazy val _restService: Lifecycle = {
99     val instance = newInstance[Lifecycle](props.getEx(Keys.rest_service_class))
100     logger.info("Loaded RESTService: %s".format(instance.getClass))
101     instance
102   }
103
104   private[this] lazy val _userStateStoreM: Maybe[UserStateStore] = {
105     // If there is a specific `UserStateStore` implementation specified in the
106     // properties, then this implementation overrides the user store given by
107     // `StoreProvider`.
108     props.get(Keys.user_state_store_class) map { className ⇒
109       val instance = newInstance[UserStateStore](className)
110       logger.info("Overriding UserStateStore provisioning. Implementation given by: %s".format(instance.getClass))
111       instance
112     }
113   }
114
115   private[this] lazy val _resourceEventStoreM: Maybe[ResourceEventStore] = {
116     // If there is a specific `EventStore` implementation specified in the
117     // properties, then this implementation overrides the event store given by
118     // `StoreProvider`.
119     props.get(Keys.resource_event_store_class) map { className ⇒
120       val instance = newInstance[ResourceEventStore](className)
121       logger.info("Overriding EventStore provisioning. Implementation given by: %s".format(instance.getClass))
122       instance
123     }
124   }
125
126   private[this] lazy val _userEventStoreM: Maybe[UserEventStore] = {
127     props.get(Keys.user_event_store_class) map { className ⇒
128       val instance = newInstance[UserEventStore](className)
129       logger.info("Overriding UserEventStore provisioning. Implementation given by: %s".format(instance.getClass))
130       instance
131     }
132   }
133
134   private[this] lazy val _WalletEventStoreM: Maybe[WalletEntryStore] = {
135     // If there is a specific `IMStore` implementation specified in the
136     // properties, then this implementation overrides the event store given by
137     // `IMProvider`.
138     props.get(Keys.wallet_entry_store_class) map {
139       className ⇒
140         val instance = newInstance[WalletEntryStore](className)
141         logger.info("Overriding WalletEntryStore provisioning. Implementation given by: %s".format(instance.getClass))
142         instance
143     }
144   }
145
146   private[this] lazy val _policyStoreM: Maybe[PolicyStore] = {
147     props.get(Keys.policy_store_class) map {
148       className ⇒
149         val instance = newInstance[PolicyStore](className)
150         logger.info("Overriding PolicyStore provisioning. Implementation given by: %s".format(instance.getClass))
151         instance
152     }
153   }
154
155   private[this] lazy val _eventsStoreFolder: Maybe[File] = {
156     props.get(Keys.events_store_folder) map {
157       folderName ⇒
158         val canonicalFolder = {
159           val folder = new File(folderName)
160           if(folder.isAbsolute) {
161             folder.getCanonicalFile
162           } else {
163             logger.info("{} is not absolute, making it relative to AQUARIUM_HOME", Keys.events_store_folder)
164             new File(ResourceLocator.AQUARIUM_HOME_FOLDER, folderName).getCanonicalFile
165           }
166         }
167
168         val canonicalPath = canonicalFolder.getCanonicalPath
169
170         logger.info("{} = {}", Keys.events_store_folder, canonicalPath)
171
172         if(canonicalFolder.exists() && !canonicalFolder.isDirectory) {
173           throw new AquariumException("%s = %s is not a folder".format(Keys.events_store_folder, canonicalFolder))
174         }
175
176         // Now, events folder must be outside AQUARIUM_HOME, since AQUARIUM_HOME can be wiped out for an upgrade but
177         // we still want to keep the events.
178         val ahCanonicalPath = ResourceLocator.AQUARIUM_HOME_FOLDER.getCanonicalPath
179         if(canonicalPath.startsWith(ahCanonicalPath)) {
180           throw new AquariumException(
181             "%s = %s is under %s = %s".format(
182               Keys.events_store_folder, canonicalFolder,
183               ResourceLocator.AQUARIUM_HOME.name, ahCanonicalPath
184             ))
185         }
186
187         canonicalFolder.mkdirs()
188
189         canonicalFolder
190     }
191   }
192
193   private[this] lazy val _resEventProc: ResourceEventProcessorService = new ResourceEventProcessorService
194
195   private[this] lazy val _imEventProc: UserEventProcessorService = new UserEventProcessorService
196
197   def get(key: String, default: String = ""): String = props.getOr(key, default)
198
199   def defaultClassLoader = Thread.currentThread().getContextClassLoader
200
201   /**
202    * FIXME: This must be ditched.
203    * 
204    * Find a file whose location can be overiden in
205    * the configuration file (e.g. policy.yaml)
206    *
207    * @param name Name of the file to search for
208    * @param prop Name of the property that defines the file path
209    * @param default Name to return if no file is found
210    */
211   def findConfigFile(name: String, prop: String, default: String): File = {
212     // Check for the configured value first
213     val configured = new File(get(prop))
214     if (configured.exists)
215       return configured
216
217     // Look into the configuration context
218     ResourceLocator.getResource(name) match {
219       case Just(policyResource) ⇒
220         val path = policyResource.url.getPath
221         new File(path)
222       case _ ⇒
223         new File(default)
224     }
225   }
226
227   def startServices(): Unit = {
228     _restService.start()
229     _actorProvider.start()
230     _resEventProc.start()
231     _imEventProc.start()
232   }
233
234   def stopServices(): Unit = {
235     _imEventProc.stop()
236     _resEventProc.stop()
237     _restService.stop()
238     _actorProvider.stop()
239
240 //    akka.actor.Actor.registry.shutdownAll()
241   }
242
243   def stopServicesWithDelay(millis: Long) {
244     Thread sleep millis
245     stopServices()
246   }
247   
248   def actorProvider = _actorProvider
249
250   def userStateStore = {
251     _userStateStoreM match {
252       case Just(us) ⇒ us
253       case _        ⇒ storeProvider.userStateStore
254     }
255   }
256
257   def resourceEventStore = {
258     _resourceEventStoreM match {
259       case Just(es) ⇒ es
260       case _        ⇒ storeProvider.resourceEventStore
261     }
262   }
263
264   def walletStore = {
265     _WalletEventStoreM match {
266       case Just(es) ⇒ es
267       case _        ⇒ storeProvider.walletEntryStore
268     }
269   }
270
271   def userEventStore = {
272     _userEventStoreM match {
273       case Just(es) ⇒ es
274       case _        ⇒ storeProvider.userEventStore
275     }
276   }
277
278   def policyStore = {
279     _policyStoreM match {
280       case Just(es) ⇒ es
281       case _        ⇒ storeProvider.policyStore
282     }
283   }
284
285   def storeProvider = _storeProvider
286   
287   def withStoreProviderClass[C <: StoreProvider](spc: Class[C]): Configurator = {
288     val map = this.props.map
289     val newMap = map.updated(Keys.store_provider_class, spc.getName)
290     val newProps = new Props(newMap)
291     new Configurator(newProps)
292   }
293
294   // FIXME: This is instead of props.getInt which currently contains a bug.
295   // FIXME: Fix the original bug and delete this method
296   def getInt(name: String): Maybe[Int] = {
297     props.get(name).map(_.toInt)
298   }
299
300   def eventsStoreFolder = _eventsStoreFolder
301 }
302
303 object Configurator {
304   implicit val DefaultConverters = TheDefaultConverters
305
306   val MasterConfName = "aquarium.properties"
307
308   val PolicyConfName = "policy.yaml"
309
310   val RolesAgreementsName = "roles-agreements.map"
311
312   lazy val MasterConfResource = {
313     val maybeMCResource = ResourceLocator getResource MasterConfName
314     maybeMCResource match {
315       case Just(masterConfResource) ⇒
316         masterConfResource
317       case NoVal ⇒
318         throw new AquariumException("Could not find master configuration file: %s".format(MasterConfName))
319       case Failed(e) ⇒
320         throw new AquariumException(e, "Could not find master configuration file: %s".format(MasterConfName))
321     }
322   }
323
324   lazy val MasterConfProps = {
325     val maybeProps = Props apply MasterConfResource
326     maybeProps match {
327       case Just(props) ⇒
328         props
329       case NoVal ⇒
330         throw new AquariumException("Could not load master configuration file: %s".format(MasterConfName))
331       case Failed(e) ⇒
332         throw new AquariumException(e, "Could not load master configuration file: %s".format(MasterConfName))
333     }
334   }
335
336   lazy val MasterConfigurator = {
337     Maybe(new Configurator(MasterConfProps)) match {
338       case Just(masterConf) ⇒
339         masterConf
340       case NoVal ⇒
341         throw new AquariumException("Could not initialize master configuration file: %s".format(MasterConfName))
342       case Failed(e) ⇒
343         throw new AquariumException(e, "Could not initialize master configuration file: %s".format(MasterConfName))
344     }
345   }
346
347   /**
348    * Defines the names of all the known keys inside the master properties file.
349    */
350   final object Keys {
351
352     /**
353      * The Aquarium version. Will be reported in any due occasion.
354      */
355     final val version = "version"
356
357     /**
358      * The fully qualified name of the class that implements the `ActorProvider`.
359      * Will be instantiated reflectively and should have a public default constructor.
360      */
361     final val actor_provider_class = "actor.provider.class"
362
363     /**
364      * The class that initializes the REST service
365      */
366     final val rest_service_class = "rest.service.class"
367
368     /**
369      * The fully qualified name of the class that implements the `StoreProvider`.
370      * Will be instantiated reflectively and should have a public default constructor.
371      */
372     final val store_provider_class = "store.provider.class"
373
374     /**
375      * The class that implements the User store
376      */
377     final val user_state_store_class = "user.state.store.class"
378
379     /**
380      * The class that implements the resource event store
381      */
382     final val resource_event_store_class = "resource.event.store.class"
383
384     /**
385      * The class that implements the IM event store
386      */
387     final val user_event_store_class = "user.event.store.class"
388
389     /**
390      * The class that implements the wallet entries store
391      */
392     final val wallet_entry_store_class = "wallet.entry.store.class"
393
394     /**
395      * The class that implements the wallet entries store
396      */
397     final val policy_store_class = "policy.store.class"
398
399
400     /** The lower mark for the UserActors' LRU, managed by UserActorManager.
401      *
402      * The terminology is borrowed from the (also borrowed) Apache-lucene-solr-based implementation.
403      *
404      */
405     final val user_actors_lru_lower_mark = "user.actors.LRU.lower.mark"
406
407     /**
408      * The upper mark for the UserActors' LRU, managed by UserActorManager.
409      *
410      * The terminology is borrowed from the (also borrowed) Apache-lucene-solr-based implementation.
411      */
412     final val user_actors_lru_upper_mark = "user.actors.LRU.upper.mark"
413
414     /**
415      * Comma separated list of amqp servers running in active-active
416      * configuration.
417      */
418     final val amqp_servers = "amqp.servers"
419
420     /**
421      * Comma separated list of amqp servers running in active-active
422      * configuration.
423      */
424     final val amqp_port = "amqp.port"
425
426     /**
427      * User name for connecting with the AMQP server
428      */
429     final val amqp_username = "amqp.username"
430
431     /**
432      * Passwd for connecting with the AMQP server
433      */
434     final val amqp_password = "amqp.passwd"
435
436     /**
437      * Virtual host on the AMQP server
438      */
439     final val amqp_vhost = "amqp.vhost"
440
441     /**
442      * Comma separated list of exchanges known to aquarium
443      */
444     final val amqp_exchange = "amqp.exchange"
445
446     /**
447      * Queues for retrieving resource events from. Multiple queues can be
448      * declared, seperated by semicolon
449      *
450      * Format is `exchange:routing.key:queue-name;<exchnage2:routing.key2:queue-name>;...`
451      */
452     final val amqp_resevents_queues = "amqp.resevents.queues"
453
454     /**
455      * Queues for retrieving user events from. Multiple queues can be
456      * declared, seperated by semicolon
457      *
458      * Format is `exchange:routing.key:queue-name;<exchnage2:routing.key2:queue-name>;...`
459      */
460     final val amqp_userevents_queues="amqp.userevents.queues"
461
462     /**
463      * REST service listening port.
464      *
465      * Default is 8080.
466      */
467     final val rest_port = "rest.port"
468
469     /*
470      * Provider for persistence services
471      */
472     final val persistence_provider = "persistence.provider"
473
474     /**
475      * Hostname for the persistence service
476      */
477     final val persistence_host = "persistence.host"
478
479     /**
480      * Username for connecting to the persistence service
481      */
482     final val persistence_username = "persistence.username"
483
484     /**
485      *  Password for connecting to the persistence service
486      */
487     final val persistence_password = "persistence.password"
488
489     /**
490      *  Password for connecting to the persistence service
491      */
492     final val persistence_port = "persistence.port"
493
494     /**
495      *  The DB schema to use
496      */
497     final val persistence_db = "persistence.db"
498
499     /**
500      * Maximum number of open connections to MongoDB
501      */
502     final val mongo_connection_pool_size = "mongo.connection.pool.size"
503
504     /**
505      * Location of the Aquarium accounting policy config file
506      */
507     final val aquarium_policy = "aquarium.policy"
508
509     /**
510      * Location of the role-agreement mapping file
511      */
512     final val aquarium_role_agreement_map = "aquarium.role-agreement.map"
513     
514     /**
515      * A time period in milliseconds for which we can tolerate stale data regarding user state.
516      *
517      * The smaller the value, the more accurate the user credits and other state data are.
518      *
519      * If a request for user state (e.g. balance) is received and the request timestamp exceeds
520      * the timestamp of the last known balance amount by this value, then a re-computation for
521      * the balance is triggered.
522      */
523     final val user_state_timestamp_threshold = "user.state.timestamp.threshold"
524
525     /**
526      * The time unit is the lowest billable time period.
527      * For example, with a time unit of ten seconds, if a VM is started up and shut down in nine
528      * seconds, then the user will be billed for ten seconds.
529      *
530      * This is an overall constant. We use it as a property in order to prepare ourselves for
531      * multi-cloud setup, where the same Aquarium instance is used to bill several distinct cloud
532      * infrastructures.
533      */
534     final val time_unit_in_millis = "time.unit.in.seconds"
535
536     /**
537      * If a value is given to this property, then it represents a folder where all events coming to aquarium are
538      * stored.
539      */
540     final val events_store_folder = "events.store.folder"
541
542     /**
543      * If set to `true`, then an IM event that cannot be parsed to [[gr.grnet.aquarium.logic.events.UserEvent]] is
544      * saved to the [[gr.grnet.aquarium.store.UserEventStore]].
545      */
546     final val save_unparsed_event_im = "save.unparsed.event.im"
547   }
548 }