0e55a09b502da2c2a31f47b1925678c6f84ca9b4
[aquarium] / src / main / scala / gr / grnet / aquarium / Aquarium.scala
1 /*
2  * Copyright 2011-2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *
12  *   2. Redistributions in binary form must reproduce the above
13  *      copyright notice, this list of conditions and the following
14  *      disclaimer in the documentation and/or other materials
15  *      provided with the distribution.
16  *
17  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28  * POSSIBILITY OF SUCH DAMAGE.
29  *
30  * The views and conclusions contained in the software and
31  * documentation are those of the authors and should not be
32  * interpreted as representing official policies, either expressed
33  * or implied, of GRNET S.A.
34  */
35
36 package gr.grnet.aquarium
37
38 import com.ckkloverdos.env.Env
39 import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey}
40 import com.ckkloverdos.props.Props
41 import connector.rabbitmq.RabbitMQProducer
42 import gr.grnet.aquarium.store.{PolicyStore, StoreProvider}
43 import java.io.File
44 import gr.grnet.aquarium.util.{Loggable, Lifecycle}
45 import gr.grnet.aquarium.service.{StoreWatcherService, RabbitMQService, TimerService, EventBusService, AkkaService}
46 import com.ckkloverdos.convert.Converters
47 import java.util.concurrent.atomic.AtomicBoolean
48 import org.slf4j.{LoggerFactory, Logger}
49 import com.ckkloverdos.maybe._
50 import com.ckkloverdos.sys.SysProp
51 import gr.grnet.aquarium.service.event.AquariumCreatedEvent
52 import gr.grnet.aquarium.policy.{FullPriceTable, PolicyModel, CachingPolicyStore, PolicyDefinedFullPriceTableRef, StdUserAgreement, UserAgreementModel, ResourceType}
53 import gr.grnet.aquarium.charging.{ChargingService, ChargingBehavior}
54 import gr.grnet.aquarium.util.date.TimeHelpers
55
56 /**
57  *
58  * @author Christos KK Loverdos <loverdos@gmail.com>
59  */
60
61 final class Aquarium(env: Env) extends Lifecycle with Loggable {
62   import Aquarium.EnvKeys
63
64   @volatile private[this] var _chargingBehaviorMap = Map[String, ChargingBehavior]()
65
66   private[this] lazy val cachingPolicyStore = new CachingPolicyStore(
67     apply(EnvKeys.defaultPolicyModel),
68     apply(EnvKeys.storeProvider).policyStore
69   )
70
71   private[this] val _isStopping = new AtomicBoolean(false)
72
73   override def toString = "%s/v%s".format(getClass.getName, version)
74
75   def isStopping() = _isStopping.get()
76
77   @inline
78   def getClientLogger(client: AnyRef): Logger = {
79     client match {
80       case null ⇒
81         this.logger
82
83       case _ ⇒
84         LoggerFactory.getLogger(client.getClass)
85     }
86   }
87
88   def debug(client: AnyRef, fmt: String, args: Any*) = {
89     getClientLogger(client).debug(fmt.format(args: _*))
90   }
91
92   def info(client: AnyRef, fmt: String, args: Any*) = {
93     getClientLogger(client).info(fmt.format(args: _*))
94   }
95
96   def warn(client: AnyRef, fmt: String, args: Any*) = {
97     getClientLogger(client).warn(fmt.format(args: _*))
98   }
99
100   @throws(classOf[AquariumInternalError])
101   def apply[T: Manifest](key: TypedKey[T]): T = {
102     try {
103      env.getEx(key)
104     } catch {
105       case e: Exception ⇒
106         throw new AquariumInternalError("Could not locate %s in Aquarium environment".format(key))
107     }
108   }
109
110   private[this] lazy val _allServices = Aquarium.ServiceKeys.map(this.apply(_))
111
112   private[this] def startServices(): Unit = {
113     for(service ← _allServices) {
114       logStartingF(service.toString) {
115         service.start()
116       } {}
117     }
118   }
119
120   private[this] def stopServices(): Unit = {
121     val services = _allServices.reverse
122
123     for(service ← services) {
124       logStoppingF(service.toString) {
125         safeUnit(service.stop())
126       } {}
127     }
128   }
129
130   private[this] def showBasicConfiguration(): Unit = {
131     for(folder ← this.eventsStoreFolder) {
132       logger.info("{} = {}", EnvKeys.eventsStoreFolder.name, folder)
133     }
134     this.eventsStoreFolder.throwMe // on error
135
136     logger.info("default policy = {}", defaultPolicyModel.toJsonString)
137   }
138
139   private[this] def addShutdownHooks(): Unit = {
140     Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
141       def run = {
142         if(!_isStopping.get()) {
143           logStoppingF("Aquarium") {
144             stop()
145           } {}
146         }
147       }
148     }))
149   }
150
151   def start(): Unit = {
152     this._isStopping.set(false)
153     showBasicConfiguration()
154     addShutdownHooks()
155     startServices()
156   }
157
158   def stop(): Unit = {
159     this._isStopping.set(true)
160     stopServices()
161   }
162
163   /**
164    * Stops Aquarium after the given millis. Used during testing.
165    */
166   def stopAfterMillis(millis: Long) {
167     Thread sleep millis
168     stop()
169   }
170
171   /**
172    * Reflectively provide a new instance of a class and configure it appropriately.
173    */
174   def newInstance[C <: AnyRef](_class: Class[C]): C = {
175     newInstance(_class.getName)
176   }
177
178   /**
179    * Reflectively provide a new instance of a class and configure it appropriately.
180    */
181   def newInstance[C <: AnyRef](className: String): C = {
182     val originalProps = apply(EnvKeys.originalProps)
183
184     val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
185     instanceM match {
186       case Just(instance) ⇒
187 //        eventBus.addSubscriber[C](instance)
188         instance match {
189           case aquariumAware: AquariumAware ⇒
190             aquariumAware.awareOfAquarium(AquariumCreatedEvent(this))
191
192           case _ ⇒
193         }
194
195         instance match {
196           case configurable: Configurable if (originalProps ne null) ⇒
197             val localProps = configurable.propertyPrefix match {
198               case somePrefix @ Some(prefix) ⇒
199                 if(prefix.length == 0) {
200                   logger.warn(
201                     "Property prefix for %s is %s. Consider using None".format(instance, somePrefix))
202                 }
203
204                 originalProps.subsetForKeyPrefix(prefix)
205
206               case None ⇒
207                 originalProps
208             }
209
210             logger.debug("Configuring {} with props (prefix={})", configurable.getClass.getName, configurable.propertyPrefix)
211             MaybeEither(configurable configure localProps) match {
212               case Just(_) ⇒
213                 logger.info("Configured {} with props (prefix={})", configurable.getClass.getName, configurable.propertyPrefix)
214
215               case Failed(e) ⇒
216                 throw new AquariumInternalError("Could not configure instance of %s".format(className), e)
217             }
218
219           case _ ⇒
220         }
221
222         instance
223
224       case Failed(e) ⇒
225         throw new AquariumInternalError("Could not instantiate %s".format(className), e)
226     }
227
228   }
229
230   def currentResourceTypesMap: Map[String, ResourceType] = {
231     val policyOpt = policyStore.loadValidPolicyAt(TimeHelpers.nowMillis())
232     if(policyOpt.isEmpty) {
233       throw new AquariumInternalError("Not even the default policy found")
234     }
235
236     policyOpt.get.resourceTypesMap
237   }
238
239   def unsafeValidPolicyAt(referenceTimeMillis: Long): PolicyModel = {
240     policyStore.loadValidPolicyAt(referenceTimeMillis) match {
241       case None ⇒
242         throw new AquariumInternalError(
243           "No policy found at %s".format(TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis))
244         )
245
246       case Some(policy) ⇒
247         policy
248     }
249   }
250
251   def unsafePriceTableForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTable = {
252     val policyAtReferenceTime = unsafeValidPolicyAt(referenceTimeMillis)
253     policyAtReferenceTime.roleMapping.get(role) match {
254       case None ⇒
255         throw new AquariumInternalError("Unknown price table for role %s at %s".format(
256           role,
257           TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis)
258         ))
259
260       case Some(fullPriceTable) ⇒
261         fullPriceTable
262     }
263   }
264
265   /**
266    * Computes the initial user agreement for the given role and reference time. Also,
267    * records the ID from a potential related IMEvent.
268    *
269    * @param role                The role in the agreement
270    * @param referenceTimeMillis The reference time to consider for the agreement
271    */
272   def initialUserAgreement(
273       role: String,
274       referenceTimeMillis: Long,
275       relatedIMEventID: Option[String]
276   ): UserAgreementModel = {
277
278     // Just checking
279     assert(null ne unsafePriceTableForRoleAt(role, referenceTimeMillis))
280
281     StdUserAgreement(
282       "<StandardUserAgreement>",
283       relatedIMEventID,
284       0,
285       Long.MaxValue,
286       role,
287       PolicyDefinedFullPriceTableRef()
288     )
289   }
290
291   def initialUserBalance(role: String, referenceTimeMillis: Long): Double = {
292     // FIXME: Where is the mapping?
293     1000.0
294   }
295
296   def chargingBehaviorOf(resourceType: ResourceType): ChargingBehavior = {
297     // A resource type never changes charging behavior. By definition.
298     val className = resourceType.chargingBehavior
299     _chargingBehaviorMap.get(className) match {
300       case Some(chargingBehavior) ⇒
301         chargingBehavior
302
303       case _ ⇒
304         try {
305           _chargingBehaviorMap synchronized {
306             val chargingBehavior = newInstance[ChargingBehavior](className)
307             _chargingBehaviorMap = _chargingBehaviorMap.updated(className, chargingBehavior)
308             chargingBehavior
309           }
310         }
311         catch {
312           case e: Exception ⇒
313             throw new AquariumInternalError("Could not load charging behavior %s".format(className), e)
314         }
315     }
316   }
317
318   def defaultPolicyModel = apply(EnvKeys.defaultPolicyModel)
319
320   def defaultClassLoader = apply(EnvKeys.defaultClassLoader)
321
322   def resourceEventStore = apply(EnvKeys.storeProvider).resourceEventStore
323
324   def imEventStore = apply(EnvKeys.storeProvider).imEventStore
325
326   def userStateStore = apply(EnvKeys.storeProvider).userStateStore
327
328   def policyStore = this.cachingPolicyStore
329
330   def eventsStoreFolder = apply(EnvKeys.eventsStoreFolder)
331
332   def eventBus = apply(EnvKeys.eventBus)
333
334   def chargingService = apply(EnvKeys.chargingService)
335
336   def userStateTimestampThreshold = apply(EnvKeys.userStateTimestampThreshold)
337
338   def adminCookie = apply(EnvKeys.adminCookie)
339
340   def converters = apply(EnvKeys.converters)
341
342   def saveResourceEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveRCEvents)
343
344   def saveIMEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveIMEvents)
345
346   def timerService = apply(EnvKeys.timerService)
347
348   def restPort = apply(EnvKeys.restPort)
349
350   def akkaService = apply(EnvKeys.akkaService)
351
352   def version = apply(EnvKeys.version)
353 }
354
355 object Aquarium {
356   final val PropsToShow = List(
357     SysProp.JavaVMName,
358     SysProp.JavaVersion,
359     SysProp.JavaHome,
360     SysProp.JavaClassVersion,
361     SysProp.JavaLibraryPath,
362     SysProp.JavaClassPath,
363     SysProp.JavaIOTmpDir,
364     SysProp.UserName,
365     SysProp.UserHome,
366     SysProp.UserDir,
367     SysProp.FileEncoding
368   )
369
370   object HTTP {
371    final val RESTAdminHeaderName = "X-Aquarium-Admin-Cookie"
372    final val RESTAdminHeaderNameLowerCase = RESTAdminHeaderName.toLowerCase
373  }
374
375   final class AquariumEnvKey[T: Manifest](override val name: String) extends TypedKeySkeleton[T](name) {
376     override def toString = "%s(%s)".format(manifest[T], name)
377   }
378
379   final val ServiceKeys: List[TypedKey[_ <: Lifecycle]] = List(
380     EnvKeys.timerService,
381     EnvKeys.akkaService,
382     EnvKeys.eventBus,
383     EnvKeys.restService,
384     EnvKeys.rabbitMQService,
385     EnvKeys.storeWatcherService
386   )
387
388   object EnvKeys {
389     /**
390      * The Aquarium version. Will be reported in any due occasion.
391      */
392     final val version = StringKey("version")
393
394     final val originalProps: TypedKey[Props] =
395       new AquariumEnvKey[Props]("originalProps")
396
397     /**
398      * The fully qualified name of the class that implements the `StoreProvider`.
399      * Will be instantiated reflectively and should have a public default constructor.
400      */
401     final val storeProvider: TypedKey[StoreProvider] =
402       new AquariumEnvKey[StoreProvider]("store.provider.class")
403
404     /**
405      * If a value is given to this property, then it represents a folder where all events coming to aquarium are
406      * saved.
407      *
408      * This is for debugging purposes.
409      */
410     final val eventsStoreFolder: TypedKey[Option[File]] =
411       new AquariumEnvKey[Option[File]]("events.store.folder")
412
413     /**
414      * If this is `true` and `events.store.folder` is defined, then all resource events are
415      * also stored in `events.store.folder`.
416      *
417      * This is for debugging purposes.
418      */
419
420     final val eventsStoreSaveRCEvents = BooleanKey("events.store.save.rc.events")
421
422     /**
423      * If this is `true` and `events.store.folder` is defined, then all IM events are
424      * also stored in `events.store.folder`.
425      *
426      * This is for debugging purposes.
427      */
428     final val eventsStoreSaveIMEvents = BooleanKey("events.store.save.im.events")
429
430     /**
431      * A time period in milliseconds for which we can tolerate stale parts regarding user state.
432      *
433      * The smaller the value, the more accurate the user credits and other state parts are.
434      *
435      * If a request for user state (e.g. balance) is received and the request timestamp exceeds
436      * the timestamp of the last known balance amount by this value, then a re-computation for
437      * the balance is triggered.
438      */
439     final val userStateTimestampThreshold = LongKey("user.state.timestamp.threshold")
440
441     /**
442      * REST service listening port.
443      */
444     final val restPort = IntKey("rest.port")
445
446     final val restShutdownTimeoutMillis = LongKey("rest.shutdown.timeout.millis")
447
448     /**
449      * A cookie used in every administrative REST API call, so that Aquarium knows it comes from
450      * an authorised client.
451      */
452     final val adminCookie: TypedKey[Option[String]] =
453       new AquariumEnvKey[Option[String]]("admin.cookie")
454
455     /**
456      * The class that initializes the REST service
457      */
458     final val restService: TypedKey[Lifecycle] =
459       new AquariumEnvKey[Lifecycle]("rest.service.class")
460
461     final val akkaService: TypedKey[AkkaService] =
462       new AquariumEnvKey[AkkaService]("akka.service")
463
464     final val eventBus: TypedKey[EventBusService] =
465       new AquariumEnvKey[EventBusService]("event.bus.service")
466
467     final val timerService: TypedKey[TimerService] =
468       new AquariumEnvKey[TimerService]("timer.service")
469
470     final val rabbitMQService: TypedKey[RabbitMQService] =
471       new AquariumEnvKey[RabbitMQService]("rabbitmq.service")
472
473     final val rabbitMQProducer: TypedKey[RabbitMQProducer] =
474       new AquariumEnvKey[RabbitMQProducer]("rabbitmq.client")
475
476     final val storeWatcherService: TypedKey[StoreWatcherService] =
477       new AquariumEnvKey[StoreWatcherService]("store.watcher.service")
478
479     final val converters: TypedKey[Converters] =
480       new AquariumEnvKey[Converters]("converters")
481
482     final val chargingService: TypedKey[ChargingService] =
483       new AquariumEnvKey[ChargingService]("charging.service")
484
485     final val defaultClassLoader: TypedKey[ClassLoader] =
486       new AquariumEnvKey[ClassLoader]("default.class.loader")
487
488     final val defaultPolicyModel: TypedKey[PolicyModel] =
489       new AquariumEnvKey[PolicyModel]("default.policy.model")
490   }
491 }