52a3749b78d571705c26d663cdc5124f5af533bb
[aquarium] / src / main / scala / gr / grnet / aquarium / Aquarium.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium
37
38 import java.io.File
39
40 import com.ckkloverdos.convert.Converters.{DefaultConverters => TheDefaultConverters}
41 import com.ckkloverdos.maybe._
42 import com.ckkloverdos.props.Props
43 import com.ckkloverdos.sys.SysProp
44
45 import gr.grnet.aquarium.util.{Lifecycle, Loggable, shortNameOfClass, shortClassNameOf}
46 import gr.grnet.aquarium.store._
47 import gr.grnet.aquarium.service._
48 import gr.grnet.aquarium.converter.StdConverters
49 import java.util.concurrent.atomic.AtomicBoolean
50 import gr.grnet.aquarium.ResourceLocator._
51 import gr.grnet.aquarium.computation.UserStateComputations
52 import gr.grnet.aquarium.logic.accounting.algorithm.{SimpleCostPolicyAlgorithmCompiler, CostPolicyAlgorithmCompiler}
53 import gr.grnet.aquarium.logic.accounting.dsl.DSLResourcesMap
54 import gr.grnet.aquarium.logic.accounting.Policy
55 import org.slf4j.{Logger, LoggerFactory}
56
57 /**
58  * This is the Aquarium entry point.
59  *
60  * Responsible to load all of application configuration and provide the relevant services.
61  *
62  * @author Christos KK Loverdos <loverdos@gmail.com>.
63  */
64 final class Aquarium(val props: Props) extends Lifecycle with Loggable { aquariumSelf ⇒
65   import Aquarium.Keys
66
67   private[this] val _isStopping = new AtomicBoolean(false)
68
69   def isStopping() = _isStopping.get()
70
71   @inline
72   def getClientLogger(client: AnyRef): Logger = {
73     client match {
74       case null ⇒
75         this.logger
76
77       case _ ⇒
78         LoggerFactory.getLogger(client.getClass)
79     }
80   }
81
82   def debug(client: AnyRef, fmt: String, args: Any*) = {
83     getClientLogger(client).debug(fmt.format(args: _*))
84   }
85
86   def info(client: AnyRef, fmt: String, args: Any*) = {
87     getClientLogger(client).info(fmt.format(args: _*))
88   }
89
90   def warn(client: AnyRef, fmt: String, args: Any*) = {
91     getClientLogger(client).warn(fmt.format(args: _*))
92   }
93
94   /**
95    * Reflectively provide a new instance of a class and configure it appropriately.
96    */
97   private[this] def newInstance[C : Manifest](_className: String = ""): C = {
98     val className = _className match {
99       case "" ⇒
100         manifest[C].erasure.getName
101
102       case name ⇒
103         name
104     }
105
106     val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
107     instanceM match {
108       case Just(instance) ⇒ instance match {
109         case configurable: Configurable ⇒
110           val localProps = configurable.propertyPrefix match {
111             case Some(prefix) ⇒
112               props.subsetForKeyPrefix(prefix)
113
114             case None ⇒
115               props
116           }
117
118           logger.debug("Configuring {} with props", configurable.getClass.getName)
119           MaybeEither(configurable configure localProps) match {
120             case Just(_) ⇒
121               logger.info("Configured {} with props", configurable.getClass.getName)
122               instance
123
124             case Failed(e) ⇒
125               throw new AquariumInternalError("Could not configure instance of %s".format(className), e)
126           }
127
128         case _ ⇒
129           instance
130       }
131
132       case Failed(e) ⇒
133         throw new AquariumInternalError("Could not instantiate %s".format(className), e)
134     }
135
136   }
137
138   private[this] lazy val _algorithmCompiler: CostPolicyAlgorithmCompiler = SimpleCostPolicyAlgorithmCompiler
139
140   private[this] lazy val _userStateComputations = new UserStateComputations(aquariumSelf)
141
142   private[this] lazy val _actorProvider = newInstance[RoleableActorProviderService](props(Keys.actor_provider_class))
143
144   /**
145    * Initializes a store provider, according to the value configured
146    * in the configuration file. The
147    */
148   private[this] lazy val _storeProvider = newInstance[StoreProvider](props(Keys.store_provider_class))
149   
150   private[this] lazy val _restService = newInstance[Lifecycle](props(Keys.rest_service_class))
151
152   private[this] lazy val _userStateStoreM: Maybe[UserStateStore] = {
153     // If there is a specific `UserStateStore` implementation specified in the
154     // properties, then this implementation overrides the user store given by
155     // `StoreProvider`.
156     props.get(Keys.user_state_store_class) map { className ⇒
157       val instance = newInstance[UserStateStore](className)
158       logger.info("Overriding %s provisioning. Implementation given by: %s".format(
159         shortNameOfClass(classOf[UserStateStore]),
160         instance.getClass))
161       instance
162     }
163   }
164
165   private[this] lazy val _resourceEventStoreM: Maybe[ResourceEventStore] = {
166     // If there is a specific `EventStore` implementation specified in the
167     // properties, then this implementation overrides the event store given by
168     // `StoreProvider`.
169     props.get(Keys.resource_event_store_class) map { className ⇒
170       val instance = newInstance[ResourceEventStore](className)
171       logger.info("Overriding EventStore provisioning. Implementation given by: %s".format(instance.getClass))
172       instance
173     }
174   }
175
176   private[this] lazy val _imEventStoreM: Maybe[IMEventStore] = {
177     props.get(Keys.user_event_store_class) map { className ⇒
178       val instance = newInstance[IMEventStore](className)
179       logger.info("Overriding IMEventStore provisioning. Implementation given by: %s".format(instance.getClass))
180       instance
181     }
182   }
183
184   private[this] lazy val _policyStoreM: Maybe[PolicyStore] = {
185     props.get(Keys.policy_store_class) map {
186       className ⇒
187         val instance = newInstance[PolicyStore](className)
188         logger.info("Overriding PolicyStore provisioning. Implementation given by: %s".format(instance.getClass))
189         instance
190     }
191   }
192
193   private[this] lazy val _eventsStoreFolder: Maybe[File] = {
194     props.get(Keys.events_store_folder) map {
195       folderName ⇒
196         logger.info("{} = {}", Keys.events_store_folder, folderName)
197         
198         val canonicalFolder = {
199           val folder = new File(folderName)
200           if(folder.isAbsolute) {
201             folder.getCanonicalFile
202           } else {
203             logger.info("{} is not absolute, making it relative to Aquarium Home", Keys.events_store_folder)
204             new File(ResourceLocator.Homes.Folders.AquariumHome, folderName).getCanonicalFile
205           }
206         }
207
208         val canonicalPath = canonicalFolder.getCanonicalPath
209
210         if(canonicalFolder.exists() && !canonicalFolder.isDirectory) {
211           throw new AquariumInternalError("%s = %s is not a folder".format(Keys.events_store_folder, canonicalFolder))
212         }
213
214         // Now, events folder must be outside AQUARIUM_HOME, since AQUARIUM_HOME can be wiped out for an upgrade but
215         // we still want to keep the events.
216         val ahCanonicalPath = ResourceLocator.Homes.Folders.AquariumHome.getCanonicalPath
217         if(canonicalPath.startsWith(ahCanonicalPath)) {
218           throw new AquariumException(
219             "%s = %s is under Aquarium Home = %s".format(
220               Keys.events_store_folder,
221               canonicalFolder,
222               ahCanonicalPath
223             ))
224         }
225
226         canonicalFolder.mkdirs()
227
228         canonicalFolder
229     }
230   }
231
232   private[this] lazy val _events_store_save_rc_events = props.getBoolean(Keys.events_store_save_rc_events).getOr(false)
233
234   private[this] lazy val _events_store_save_im_events = props.getBoolean(Keys.events_store_save_im_events).getOr(false)
235
236   private[this] lazy val _converters = StdConverters.AllConverters
237
238   private[this] lazy val _timerService: TimerService = newInstance[SimpleTimerService]()
239
240   private[this] lazy val _akka = newInstance[AkkaService]()
241
242   private[this] lazy val _eventBus = newInstance[EventBusService]()
243
244   private[this] lazy val _rabbitmqService = newInstance[RabbitMQService]()
245
246   private[this] lazy val _storeWatcherService = newInstance[StoreWatcherService]()
247
248   private[this] lazy val _allServices = List(
249     _timerService,
250     _akka,
251     _actorProvider,
252     _eventBus,
253     _restService,
254     _rabbitmqService,
255     _storeWatcherService
256   )
257
258   def get(key: String, default: String = ""): String = props.getOr(key, default)
259
260   def defaultClassLoader = Thread.currentThread().getContextClassLoader
261
262   /**
263    * FIXME: This must be ditched.
264    * 
265    * Find a file whose location can be overiden in
266    * the configuration file (e.g. policy.yaml)
267    *
268    * @param name Name of the file to search for
269    * @param prop Name of the property that defines the file path
270    * @param default Name to return if no file is found
271    */
272   def findConfigFile(name: String, prop: String, default: String): File = {
273     // Check for the configured value first
274     val configured = new File(get(prop))
275     if (configured.exists)
276       return configured
277
278     // Look into the configuration context
279     ResourceLocator.getResource(name) match {
280       case Just(policyResource) ⇒
281         val path = policyResource.url.getPath
282         new File(path)
283       case _ ⇒
284         new File(default)
285     }
286   }
287
288   private[this] def startServices(): Unit = {
289     for(service ← _allServices) {
290       logStartingF(service.toString) {
291         service.start()
292       } {}
293     }
294   }
295
296   private[this] def stopServices(): Unit = {
297     val services = _allServices.reverse
298
299     for(service ← services) {
300       logStoppingF(service.toString) {
301         safeUnit(service.stop())
302       } {}
303     }
304   }
305
306   def stopWithDelay(millis: Long) {
307     Thread sleep millis
308     stop()
309   }
310
311   private[this] def configure(): Unit = {
312     logger.info("Aquarium Home = %s".format(
313       if(Homes.Folders.AquariumHome.isAbsolute)
314         Homes.Folders.AquariumHome
315       else
316         "%s [=%s]".format(Homes.Folders.AquariumHome, Homes.Folders.AquariumHome.getCanonicalPath)
317     ))
318
319     for(folder ← this.eventsStoreFolder) {
320       logger.info("{} = {}", Aquarium.Keys.events_store_folder, folder)
321     }
322     this.eventsStoreFolder.throwMe // on error
323
324     for(prop ← Aquarium.PropsToShow) {
325       logger.info("{} = {}", prop.name, prop.rawValue)
326     }
327
328     logger.info("CONF_HERE =  {}", HERE)
329     logger.info("{} = {}", ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES, ResourceLocator.Resources.AquariumPropertiesResource)
330     logger.info("{} = {}", ResourceLocator.ResourceNames.LOGBACK_XML, ResourceLocator.Resources.LogbackXMLResource)
331     logger.info("{} = {}", ResourceLocator.ResourceNames.POLICY_YAML, ResourceLocator.Resources.PolicyYAMLResource)
332   }
333
334   private[this] def addShutdownHooks(): Unit = {
335     Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
336       def run = {
337         if(!_isStopping.get()) {
338           logStoppingF("Aquarium") {
339             stop()
340           } {}
341         }
342       }
343     }))
344   }
345
346   def start() = {
347     this._isStopping.set(false)
348     configure()
349     addShutdownHooks()
350     startServices()
351   }
352
353   def stop() = {
354     this._isStopping.set(true)
355     stopServices()
356   }
357
358   def algorithmCompiler = _algorithmCompiler
359
360   def userStateComputations = _userStateComputations
361
362   def converters = _converters
363   
364   def actorProvider = _actorProvider
365
366   def eventBus = _eventBus
367
368   def timerService = _timerService
369
370   def userStateStore = {
371     _userStateStoreM match {
372       case Just(us) ⇒ us
373       case _        ⇒ storeProvider.userStateStore
374     }
375   }
376
377   def resourceEventStore = {
378     _resourceEventStoreM match {
379       case Just(es) ⇒ es
380       case _        ⇒ storeProvider.resourceEventStore
381     }
382   }
383
384   def imEventStore = {
385     _imEventStoreM match {
386       case Just(es) ⇒ es
387       case _        ⇒ storeProvider.imEventStore
388     }
389   }
390
391   def policyStore = {
392     _policyStoreM match {
393       case Just(es) ⇒ es
394       case _        ⇒ storeProvider.policyStore
395     }
396   }
397
398   def storeProvider = _storeProvider
399
400   def currentResourcesMap: DSLResourcesMap = {
401     // FIXME: Get rid of this singleton stuff
402     Policy.policy.resourcesMap
403   }
404
405   def initialAgreementForRole(role: String, referenceTimeMillis: Long): String = {
406     // FIXME: Where is the mapping?
407     "default"
408   }
409
410   def initialBalanceForRole(role: String, referenceTimeMillis: Long): Double = {
411     // FIXME: Where is the mapping?
412     10000.0
413   }
414
415   def defaultInitialUserRole: String = {
416     // FIXME: Read from properties?
417     "default"
418   }
419   
420   def withStoreProviderClass[C <: StoreProvider](spc: Class[C]): Aquarium = {
421     val map = this.props.map
422     val newMap = map.updated(Keys.store_provider_class, spc.getName)
423     val newProps = new Props(newMap)
424     new Aquarium(newProps)
425   }
426
427   def eventsStoreFolder = _eventsStoreFolder
428
429   def saveResourceEventsToEventsStoreFolder = _events_store_save_rc_events
430
431   def saveIMEventsToEventsStoreFolder = _events_store_save_im_events
432
433   def adminCookie: MaybeOption[String] = props.get(Aquarium.Keys.admin_cookie) match {
434     case just @ Just(_) ⇒ just
435     case _ ⇒ NoVal
436   }
437 }
438
439 object Aquarium {
440   final val PropsToShow = List(
441     SysProp.JavaVMName,
442     SysProp.JavaVersion,
443     SysProp.JavaHome,
444     SysProp.JavaClassVersion,
445     SysProp.JavaLibraryPath,
446     SysProp.JavaClassPath,
447     SysProp.JavaIOTmpDir,
448     SysProp.UserName,
449     SysProp.UserHome,
450     SysProp.UserDir,
451     SysProp.FileEncoding
452   )
453
454   implicit val DefaultConverters = TheDefaultConverters
455
456   final val PolicyConfName = ResourceLocator.ResourceNames.POLICY_YAML
457
458   final val RolesAgreementsName = ResourceLocator.ResourceNames.ROLE_AGREEMENTS_MAP
459
460   final lazy val AquariumPropertiesResource = ResourceLocator.Resources.AquariumPropertiesResource
461
462   final lazy val AquariumProperties = {
463     val maybeProps = Props(AquariumPropertiesResource)
464     maybeProps match {
465       case Just(props) ⇒
466         props
467
468       case NoVal ⇒
469         throw new AquariumInternalError(
470           "Could not load %s from %s".format(
471             ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
472             AquariumPropertiesResource))
473
474
475       case Failed(e) ⇒
476         throw new AquariumInternalError(
477           "Could not load %s from %s".format(
478             ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
479             AquariumPropertiesResource),
480           e)
481     }
482   }
483
484   /**
485    * The main [[gr.grnet.aquarium.Aquarium]] instance.
486    */
487   final lazy val Instance = {
488     Maybe(new Aquarium(AquariumProperties)) match {
489       case Just(masterConf) ⇒
490         masterConf
491
492       case NoVal ⇒
493         throw new AquariumInternalError(
494           "Could not create Aquarium configuration from %s".format(
495             AquariumPropertiesResource))
496
497       case Failed(e) ⇒
498         throw new AquariumInternalError(
499           "Could not create Aquarium configuration from %s".format(
500             AquariumPropertiesResource),
501           e)
502     }
503   }
504
505   /**
506    * Defines the names of all the known keys inside the master properties file.
507    */
508   final object Keys {
509
510     /**
511      * The Aquarium version. Will be reported in any due occasion.
512      */
513     final val version = "version"
514
515     /**
516      * The fully qualified name of the class that implements the `RoleableActorProviderService`.
517      * Will be instantiated reflectively and should have a public default constructor.
518      */
519     final val actor_provider_class = "actor.provider.class"
520
521     /**
522      * The class that initializes the REST service
523      */
524     final val rest_service_class = "rest.service.class"
525
526     /**
527      * The fully qualified name of the class that implements the `StoreProvider`.
528      * Will be instantiated reflectively and should have a public default constructor.
529      */
530     final val store_provider_class = "store.provider.class"
531
532     /**
533      * The class that implements the User store
534      */
535     final val user_state_store_class = "user.state.store.class"
536
537     /**
538      * The class that implements the resource event store
539      */
540     final val resource_event_store_class = "resource.event.store.class"
541
542     /**
543      * The class that implements the IM event store
544      */
545     final val user_event_store_class = "user.event.store.class"
546
547     /**
548      * The class that implements the wallet entries store
549      */
550     final val policy_store_class = "policy.store.class"
551
552
553     /** The lower mark for the UserActors' LRU.
554      *
555      * The terminology is borrowed from the (also borrowed) Apache-lucene-solr-based implementation.
556      *
557      */
558     final val user_actors_lru_lower_mark = "user.actors.LRU.lower.mark"
559
560     /**
561      * The upper mark for the UserActors' LRU.
562      *
563      * The terminology is borrowed from the (also borrowed) Apache-lucene-solr-based implementation.
564      */
565     final val user_actors_lru_upper_mark = "user.actors.LRU.upper.mark"
566
567     /**
568      * REST service listening port.
569      *
570      * Default is 8080.
571      */
572     final val rest_port = "rest.port"
573
574     /**
575      * Location of the Aquarium accounting policy config file
576      */
577     final val aquarium_policy = "aquarium.policy"
578
579     /**
580      * Location of the role-agreement mapping file
581      */
582     final val aquarium_role_agreement_map = "aquarium.role-agreement.map"
583     
584     /**
585      * A time period in milliseconds for which we can tolerate stale parts regarding user state.
586      *
587      * The smaller the value, the more accurate the user credits and other state parts are.
588      *
589      * If a request for user state (e.g. balance) is received and the request timestamp exceeds
590      * the timestamp of the last known balance amount by this value, then a re-computation for
591      * the balance is triggered.
592      */
593     final val user_state_timestamp_threshold = "user.state.timestamp.threshold"
594
595     /**
596      * The time unit is the lowest billable time period.
597      * For example, with a time unit of ten seconds, if a VM is started up and shut down in nine
598      * seconds, then the user will be billed for ten seconds.
599      *
600      * This is an overall constant. We use it as a property in order to prepare ourselves for
601      * multi-cloud setup, where the same Aquarium instance is used to bill several distinct cloud
602      * infrastructures.
603      */
604     final val time_unit_in_millis = "time.unit.in.seconds"
605
606     /**
607      * If a value is given to this property, then it represents a folder where all events coming to aquarium are
608      * saved.
609      */
610     final val events_store_folder = "events.store.folder"
611
612     /**
613      * If this is `true` and `events.store.folder` is defined, then all resource events are
614      * also stored in `events.store.folder`.
615      *
616      * This is for debugging purposes.
617      */
618     final val events_store_save_rc_events = "events.store.save.rc.events"
619
620     /**
621      * If this is `true` and `events.store.folder` is defined, then all IM events are
622      * also stored in `events.store.folder`.
623      *
624      * This is for debugging purposes.
625      */
626     final val events_store_save_im_events = "events.store.save.im.events"
627
628     /**
629      * If set to `true`, then an IM event that cannot be parsed to [[gr.grnet.aquarium.event.model.im.IMEventModel]] is
630      * saved to the [[gr.grnet.aquarium.store.IMEventStore]].
631      */
632     final val save_unparsed_event_im = "save.unparsed.event.im"
633
634     /**
635      * A cookie used in every administrative REST API call, so that Aquarium knows it comes from
636      * an authorised client.
637      */
638     final val admin_cookie = "admin.cookie"
639   }
640
641   object HTTP {
642     final val RESTAdminHeaderName = "X-Aquarium-Admin-Cookie"
643     final val RESTAdminHeaderNameLowerCase = RESTAdminHeaderName.toLowerCase
644   }
645 }