efa0b1b97771e6ee16d3fc5288a7e68cf0f1621e
[aquarium] / src / main / scala / gr / grnet / aquarium / Aquarium.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium
37
38 import java.io.File
39
40 import com.ckkloverdos.maybe._
41 import com.ckkloverdos.props.Props
42 import com.ckkloverdos.convert.Converters.{DefaultConverters => TheDefaultConverters}
43
44 import gr.grnet.aquarium.service._
45 import gr.grnet.aquarium.util.{Lifecycle, Loggable, shortNameOfClass, shortClassNameOf}
46 import gr.grnet.aquarium.store._
47 import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService
48 import gr.grnet.aquarium.converter.StdConverters
49 import java.util.concurrent.atomic.AtomicBoolean
50 import gr.grnet.aquarium.ResourceLocator._
51 import com.ckkloverdos.sys.SysProp
52
53 /**
54  * This is the Aquarium entry point.
55  *
56  * Responsible to load all of application configuration and provide the relevant services.
57  *
58  * @author Christos KK Loverdos <loverdos@gmail.com>.
59  */
60 final class Aquarium(val props: Props) extends Lifecycle with Loggable {
61   import Aquarium.Keys
62
63   private[this] val _isStopping = new AtomicBoolean(false)
64
65   def isStopping() = _isStopping.get()
66
67   def debug(client: AnyRef, fmt: String, args: Any*) = {
68     logger.debug("[%s] %s".format(shortClassNameOf(client), fmt.format(args: _*)))
69   }
70
71   def info(client: AnyRef, fmt: String, args: Any*) = {
72     logger.info("[%s] %s".format(shortClassNameOf(client), fmt.format(args: _*)))
73   }
74
75   def warn(client: AnyRef, fmt: String, args: Any*) = {
76     logger.warn("[%s] %s".format(shortClassNameOf(client), fmt.format(args: _*)))
77   }
78
79   /**
80    * Reflectively provide a new instance of a class and configure it appropriately.
81    */
82   private[this] def newInstance[C : Manifest](_className: String = ""): C = {
83     val className = _className match {
84       case "" ⇒
85         manifest[C].erasure.getName
86
87       case name ⇒
88         name
89     }
90
91     val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
92     instanceM match {
93       case Just(instance) ⇒ instance match {
94         case configurable: Configurable ⇒
95           val localProps = configurable.propertyPrefix match {
96             case Some(prefix) ⇒
97               props.subsetForKeyPrefix(prefix)
98
99             case None ⇒
100               props
101           }
102
103           logger.debug("Configuring {} with props", configurable.getClass.getName)
104           MaybeEither(configurable configure localProps) match {
105             case Just(_) ⇒
106               logger.info("Configured {} with props", configurable.getClass.getName)
107               instance
108
109             case Failed(e) ⇒
110               throw new AquariumInternalError("Could not configure instance of %s".format(className), e)
111           }
112
113         case _ ⇒
114           instance
115       }
116
117       case Failed(e) ⇒
118         throw new AquariumInternalError("Could not instantiate %s".format(className), e)
119     }
120
121   }
122
123   private[this] lazy val _actorProvider = newInstance[RoleableActorProviderService](props(Keys.actor_provider_class))
124
125   /**
126    * Initializes a store provider, according to the value configured
127    * in the configuration file. The
128    */
129   private[this] lazy val _storeProvider = newInstance[StoreProvider](props(Keys.store_provider_class))
130   
131   private[this] lazy val _restService = newInstance[Lifecycle](props(Keys.rest_service_class))
132
133   private[this] lazy val _userStateStoreM: Maybe[UserStateStore] = {
134     // If there is a specific `UserStateStore` implementation specified in the
135     // properties, then this implementation overrides the user store given by
136     // `StoreProvider`.
137     props.get(Keys.user_state_store_class) map { className ⇒
138       val instance = newInstance[UserStateStore](className)
139       logger.info("Overriding %s provisioning. Implementation given by: %s".format(
140         shortNameOfClass(classOf[UserStateStore]),
141         instance.getClass))
142       instance
143     }
144   }
145
146   private[this] lazy val _resourceEventStoreM: Maybe[ResourceEventStore] = {
147     // If there is a specific `EventStore` implementation specified in the
148     // properties, then this implementation overrides the event store given by
149     // `StoreProvider`.
150     props.get(Keys.resource_event_store_class) map { className ⇒
151       val instance = newInstance[ResourceEventStore](className)
152       logger.info("Overriding EventStore provisioning. Implementation given by: %s".format(instance.getClass))
153       instance
154     }
155   }
156
157   private[this] lazy val _imEventStoreM: Maybe[IMEventStore] = {
158     props.get(Keys.user_event_store_class) map { className ⇒
159       val instance = newInstance[IMEventStore](className)
160       logger.info("Overriding IMEventStore provisioning. Implementation given by: %s".format(instance.getClass))
161       instance
162     }
163   }
164
165   private[this] lazy val _WalletEventStoreM: Maybe[WalletEntryStore] = {
166     // If there is a specific `IMStore` implementation specified in the
167     // properties, then this implementation overrides the event store given by
168     // `IMProvider`.
169     props.get(Keys.wallet_entry_store_class) map {
170       className ⇒
171         val instance = newInstance[WalletEntryStore](className)
172         logger.info("Overriding WalletEntryStore provisioning. Implementation given by: %s".format(instance.getClass))
173         instance
174     }
175   }
176
177   private[this] lazy val _policyStoreM: Maybe[PolicyStore] = {
178     props.get(Keys.policy_store_class) map {
179       className ⇒
180         val instance = newInstance[PolicyStore](className)
181         logger.info("Overriding PolicyStore provisioning. Implementation given by: %s".format(instance.getClass))
182         instance
183     }
184   }
185
186   private[this] lazy val _eventsStoreFolder: Maybe[File] = {
187     props.get(Keys.events_store_folder) map {
188       folderName ⇒
189         logger.info("{} = {}", Keys.events_store_folder, folderName)
190         
191         val canonicalFolder = {
192           val folder = new File(folderName)
193           if(folder.isAbsolute) {
194             folder.getCanonicalFile
195           } else {
196             logger.info("{} is not absolute, making it relative to Aquarium Home", Keys.events_store_folder)
197             new File(ResourceLocator.Homes.Folders.AquariumHome, folderName).getCanonicalFile
198           }
199         }
200
201         val canonicalPath = canonicalFolder.getCanonicalPath
202
203         if(canonicalFolder.exists() && !canonicalFolder.isDirectory) {
204           throw new AquariumInternalError("%s = %s is not a folder".format(Keys.events_store_folder, canonicalFolder))
205         }
206
207         // Now, events folder must be outside AQUARIUM_HOME, since AQUARIUM_HOME can be wiped out for an upgrade but
208         // we still want to keep the events.
209         val ahCanonicalPath = ResourceLocator.Homes.Folders.AquariumHome.getCanonicalPath
210         if(canonicalPath.startsWith(ahCanonicalPath)) {
211           throw new AquariumException(
212             "%s = %s is under Aquarium Home = %s".format(
213               Keys.events_store_folder,
214               canonicalFolder,
215               ahCanonicalPath
216             ))
217         }
218
219         canonicalFolder.mkdirs()
220
221         canonicalFolder
222     }
223   }
224
225   private[this] lazy val _events_store_save_rc_events = props.getBoolean(Keys.events_store_save_rc_events).getOr(false)
226
227   private[this] lazy val _events_store_save_im_events = props.getBoolean(Keys.events_store_save_im_events).getOr(false)
228
229   private[this] lazy val _converters = StdConverters.AllConverters
230
231   private[this] lazy val _timerService: TimerService = newInstance[SimpleTimerService]()
232
233   private[this] lazy val _akka = newInstance[AkkaService]()
234
235   private[this] lazy val _eventBus = newInstance[EventBusService]()
236
237   private[this] lazy val _rabbitmqService = newInstance[RabbitMQService]()
238
239   private[this] lazy val _storeWatcherService = newInstance[StoreWatcherService]()
240
241   private[this] lazy val _allServices = List(
242     _timerService,
243     _akka,
244     _actorProvider,
245     _eventBus,
246     _restService,
247     _rabbitmqService,
248     _storeWatcherService
249   )
250
251   def get(key: String, default: String = ""): String = props.getOr(key, default)
252
253   def defaultClassLoader = Thread.currentThread().getContextClassLoader
254
255   /**
256    * FIXME: This must be ditched.
257    * 
258    * Find a file whose location can be overiden in
259    * the configuration file (e.g. policy.yaml)
260    *
261    * @param name Name of the file to search for
262    * @param prop Name of the property that defines the file path
263    * @param default Name to return if no file is found
264    */
265   def findConfigFile(name: String, prop: String, default: String): File = {
266     // Check for the configured value first
267     val configured = new File(get(prop))
268     if (configured.exists)
269       return configured
270
271     // Look into the configuration context
272     ResourceLocator.getResource(name) match {
273       case Just(policyResource) ⇒
274         val path = policyResource.url.getPath
275         new File(path)
276       case _ ⇒
277         new File(default)
278     }
279   }
280
281   private[this] def startServices(): Unit = {
282     for(service ← _allServices) {
283       logStartingF(service.toString) {
284         service.start()
285       } {}
286     }
287   }
288
289   private[this] def stopServices(): Unit = {
290     val services = _allServices.reverse
291
292     for(service ← services) {
293       logStoppingF(service.toString) {
294         safeUnit(service.stop())
295       } {}
296     }
297   }
298
299   def stopWithDelay(millis: Long) {
300     Thread sleep millis
301     stop()
302   }
303
304   private[this] def configure(): Unit = {
305     logger.info("Aquarium Home = %s".format(
306       if(Homes.Folders.AquariumHome.isAbsolute)
307         Homes.Folders.AquariumHome
308       else
309         "%s [=%s]".format(Homes.Folders.AquariumHome, Homes.Folders.AquariumHome.getCanonicalPath)
310     ))
311
312     for(folder ← this.eventsStoreFolder) {
313       logger.info("{} = {}", Aquarium.Keys.events_store_folder, folder)
314     }
315     this.eventsStoreFolder.throwMe // on error
316
317     for(prop ← Aquarium.PropsToShow) {
318       logger.info("{} = {}", prop.name, prop.rawValue)
319     }
320
321
322     logger.info("CONF_HERE = {}", CONF_HERE)
323   }
324
325   private[this] def addShutdownHooks(): Unit = {
326     Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
327       def run = {
328         if(!_isStopping.get()) {
329           logStoppingF("Aquarium") {
330             stop()
331           } {}
332         }
333       }
334     }))
335   }
336
337   def start() = {
338     this._isStopping.set(false)
339     configure()
340     addShutdownHooks()
341     startServices()
342   }
343
344   def stop() = {
345     this._isStopping.set(true)
346     stopServices()
347   }
348
349   def converters = _converters
350   
351   def actorProvider = _actorProvider
352
353   def eventBus = _eventBus
354
355   def timerService = _timerService
356
357   def userStateStore = {
358     _userStateStoreM match {
359       case Just(us) ⇒ us
360       case _        ⇒ storeProvider.userStateStore
361     }
362   }
363
364   def resourceEventStore = {
365     _resourceEventStoreM match {
366       case Just(es) ⇒ es
367       case _        ⇒ storeProvider.resourceEventStore
368     }
369   }
370
371   def walletStore = {
372     _WalletEventStoreM match {
373       case Just(es) ⇒ es
374       case _        ⇒ storeProvider.walletEntryStore
375     }
376   }
377
378   def imEventStore = {
379     _imEventStoreM match {
380       case Just(es) ⇒ es
381       case _        ⇒ storeProvider.imEventStore
382     }
383   }
384
385   def policyStore = {
386     _policyStoreM match {
387       case Just(es) ⇒ es
388       case _        ⇒ storeProvider.policyStore
389     }
390   }
391
392   def storeProvider = _storeProvider
393   
394   def withStoreProviderClass[C <: StoreProvider](spc: Class[C]): Aquarium = {
395     val map = this.props.map
396     val newMap = map.updated(Keys.store_provider_class, spc.getName)
397     val newProps = new Props(newMap)
398     new Aquarium(newProps)
399   }
400
401   def eventsStoreFolder = _eventsStoreFolder
402
403   def saveResourceEventsToEventsStoreFolder = _events_store_save_rc_events
404
405   def saveIMEventsToEventsStoreFolder = _events_store_save_im_events
406
407   def adminCookie: MaybeOption[String] = props.get(Aquarium.Keys.admin_cookie) match {
408     case just @ Just(_) ⇒ just
409     case _ ⇒ NoVal
410   }
411 }
412
413 object Aquarium {
414   final val PropsToShow = List(
415     SysProp.JavaVMName,
416     SysProp.JavaVersion,
417     SysProp.JavaHome,
418     SysProp.JavaClassVersion,
419     SysProp.JavaLibraryPath,
420     SysProp.JavaClassPath,
421     SysProp.JavaIOTmpDir,
422     SysProp.UserName,
423     SysProp.UserHome,
424     SysProp.UserDir,
425     SysProp.FileEncoding
426   )
427
428   implicit val DefaultConverters = TheDefaultConverters
429
430   final val PolicyConfName = ResourceLocator.ResourceNames.POLICY_YAML
431
432   final val RolesAgreementsName = ResourceLocator.ResourceNames.ROLE_AGREEMENTS_MAP
433
434   final lazy val AquariumPropertiesResource = ResourceLocator.Resources.AquariumPropertiesResource
435
436   final lazy val AquariumProperties = {
437     val maybeProps = Props(AquariumPropertiesResource)
438     maybeProps match {
439       case Just(props) ⇒
440         props
441
442       case NoVal ⇒
443         throw new AquariumInternalError(
444           "Could not load %s from %s".format(
445             ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
446             AquariumPropertiesResource))
447
448
449       case Failed(e) ⇒
450         throw new AquariumInternalError(
451           "Could not load %s from %s".format(
452             ResourceLocator.ResourceNames.AQUARIUM_PROPERTIES,
453             AquariumPropertiesResource),
454           e)
455     }
456   }
457
458   /**
459    * The main [[gr.grnet.aquarium.Aquarium]] instance.
460    */
461   final lazy val Instance = {
462     Maybe(new Aquarium(AquariumProperties)) match {
463       case Just(masterConf) ⇒
464         masterConf
465
466       case NoVal ⇒
467         throw new AquariumInternalError(
468           "Could not create Aquarium configuration from %s".format(
469             AquariumPropertiesResource))
470
471       case Failed(e) ⇒
472         throw new AquariumInternalError(
473           "Could not create Aquarium configuration from %s".format(
474             AquariumPropertiesResource),
475           e)
476     }
477   }
478
479   /**
480    * Defines the names of all the known keys inside the master properties file.
481    */
482   final object Keys {
483
484     /**
485      * The Aquarium version. Will be reported in any due occasion.
486      */
487     final val version = "version"
488
489     /**
490      * The fully qualified name of the class that implements the `RoleableActorProviderService`.
491      * Will be instantiated reflectively and should have a public default constructor.
492      */
493     final val actor_provider_class = "actor.provider.class"
494
495     /**
496      * The class that initializes the REST service
497      */
498     final val rest_service_class = "rest.service.class"
499
500     /**
501      * The fully qualified name of the class that implements the `StoreProvider`.
502      * Will be instantiated reflectively and should have a public default constructor.
503      */
504     final val store_provider_class = "store.provider.class"
505
506     /**
507      * The class that implements the User store
508      */
509     final val user_state_store_class = "user.state.store.class"
510
511     /**
512      * The class that implements the resource event store
513      */
514     final val resource_event_store_class = "resource.event.store.class"
515
516     /**
517      * The class that implements the IM event store
518      */
519     final val user_event_store_class = "user.event.store.class"
520
521     /**
522      * The class that implements the wallet entries store
523      */
524     final val wallet_entry_store_class = "wallet.entry.store.class"
525
526     /**
527      * The class that implements the wallet entries store
528      */
529     final val policy_store_class = "policy.store.class"
530
531
532     /** The lower mark for the UserActors' LRU.
533      *
534      * The terminology is borrowed from the (also borrowed) Apache-lucene-solr-based implementation.
535      *
536      */
537     final val user_actors_lru_lower_mark = "user.actors.LRU.lower.mark"
538
539     /**
540      * The upper mark for the UserActors' LRU.
541      *
542      * The terminology is borrowed from the (also borrowed) Apache-lucene-solr-based implementation.
543      */
544     final val user_actors_lru_upper_mark = "user.actors.LRU.upper.mark"
545
546     /**
547      * REST service listening port.
548      *
549      * Default is 8080.
550      */
551     final val rest_port = "rest.port"
552
553     /**
554      * Location of the Aquarium accounting policy config file
555      */
556     final val aquarium_policy = "aquarium.policy"
557
558     /**
559      * Location of the role-agreement mapping file
560      */
561     final val aquarium_role_agreement_map = "aquarium.role-agreement.map"
562     
563     /**
564      * A time period in milliseconds for which we can tolerate stale data regarding user state.
565      *
566      * The smaller the value, the more accurate the user credits and other state data are.
567      *
568      * If a request for user state (e.g. balance) is received and the request timestamp exceeds
569      * the timestamp of the last known balance amount by this value, then a re-computation for
570      * the balance is triggered.
571      */
572     final val user_state_timestamp_threshold = "user.state.timestamp.threshold"
573
574     /**
575      * The time unit is the lowest billable time period.
576      * For example, with a time unit of ten seconds, if a VM is started up and shut down in nine
577      * seconds, then the user will be billed for ten seconds.
578      *
579      * This is an overall constant. We use it as a property in order to prepare ourselves for
580      * multi-cloud setup, where the same Aquarium instance is used to bill several distinct cloud
581      * infrastructures.
582      */
583     final val time_unit_in_millis = "time.unit.in.seconds"
584
585     /**
586      * If a value is given to this property, then it represents a folder where all events coming to aquarium are
587      * saved.
588      */
589     final val events_store_folder = "events.store.folder"
590
591     /**
592      * If this is `true` and `events.store.folder` is defined, then all resource events are
593      * also stored in `events.store.folder`.
594      *
595      * This is for debugging purposes.
596      */
597     final val events_store_save_rc_events = "events.store.save.rc.events"
598
599     /**
600      * If this is `true` and `events.store.folder` is defined, then all IM events are
601      * also stored in `events.store.folder`.
602      *
603      * This is for debugging purposes.
604      */
605     final val events_store_save_im_events = "events.store.save.im.events"
606
607     /**
608      * If set to `true`, then an IM event that cannot be parsed to [[gr.grnet.aquarium.event.model.im.IMEventModel]] is
609      * saved to the [[gr.grnet.aquarium.store.IMEventStore]].
610      */
611     final val save_unparsed_event_im = "save.unparsed.event.im"
612
613     /**
614      * A cookie used in every administrative REST API call, so that Aquarium knows it comes from
615      * an authorised client.
616      */
617     final val admin_cookie = "admin.cookie"
618   }
619
620   object HTTP {
621     final val RESTAdminHeaderName = "X-Aquarium-Admin-Cookie"
622     final val RESTAdminHeaderNameLowerCase = RESTAdminHeaderName.toLowerCase
623   }
624 }