root / src / main / scala / gr / grnet / aquarium / Aquarium.scala @ 7dbaeb04
History | View | Annotate | Download (15.3 kB)
1 |
/* |
---|---|
2 |
* Copyright 2011-2012 GRNET S.A. All rights reserved. |
3 |
* |
4 |
* Redistribution and use in source and binary forms, with or |
5 |
* without modification, are permitted provided that the following |
6 |
* conditions are met: |
7 |
* |
8 |
* 1. Redistributions of source code must retain the above |
9 |
* copyright notice, this list of conditions and the following |
10 |
* disclaimer. |
11 |
* |
12 |
* 2. Redistributions in binary form must reproduce the above |
13 |
* copyright notice, this list of conditions and the following |
14 |
* disclaimer in the documentation and/or other materials |
15 |
* provided with the distribution. |
16 |
* |
17 |
* THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS |
18 |
* OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
19 |
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
20 |
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR |
21 |
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
22 |
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
23 |
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF |
24 |
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
25 |
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT |
26 |
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN |
27 |
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
28 |
* POSSIBILITY OF SUCH DAMAGE. |
29 |
* |
30 |
* The views and conclusions contained in the software and |
31 |
* documentation are those of the authors and should not be |
32 |
* interpreted as representing official policies, either expressed |
33 |
* or implied, of GRNET S.A. |
34 |
*/ |
35 |
|
36 |
package gr.grnet.aquarium |
37 |
|
38 |
import com.ckkloverdos.env.Env |
39 |
import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey} |
40 |
import com.ckkloverdos.props.Props |
41 |
import gr.grnet.aquarium.store.{PolicyStore, StoreProvider} |
42 |
import java.io.File |
43 |
import gr.grnet.aquarium.util.{Loggable, Lifecycle} |
44 |
import gr.grnet.aquarium.service.{StoreWatcherService, RabbitMQService, TimerService, EventBusService, AkkaService} |
45 |
import com.ckkloverdos.convert.Converters |
46 |
import java.util.concurrent.atomic.AtomicBoolean |
47 |
import org.slf4j.{LoggerFactory, Logger} |
48 |
import com.ckkloverdos.maybe._ |
49 |
import com.ckkloverdos.sys.SysProp |
50 |
import gr.grnet.aquarium.service.event.AquariumCreatedEvent |
51 |
import gr.grnet.aquarium.policy.{FullPriceTable, PolicyModel, CachingPolicyStore, PolicyDefinedFullPriceTableRef, StdUserAgreement, UserAgreementModel, ResourceType} |
52 |
import gr.grnet.aquarium.charging.{ChargingService, ChargingBehavior} |
53 |
import gr.grnet.aquarium.util.date.TimeHelpers |
54 |
|
55 |
/** |
56 |
* |
57 |
* @author Christos KK Loverdos <loverdos@gmail.com> |
58 |
*/ |
59 |
|
60 |
final class Aquarium(env: Env) extends Lifecycle with Loggable { |
61 |
import Aquarium.EnvKeys |
62 |
|
63 |
@volatile private[this] var _chargingBehaviorMap = Map[String, ChargingBehavior]() |
64 |
|
65 |
private[this] lazy val cachingPolicyStore = new CachingPolicyStore( |
66 |
apply(EnvKeys.defaultPolicyModel), |
67 |
apply(EnvKeys.storeProvider).policyStore |
68 |
) |
69 |
|
70 |
private[this] val _isStopping = new AtomicBoolean(false) |
71 |
|
72 |
override def toString = "%s/v%s".format(getClass.getName, version) |
73 |
|
74 |
def isStopping() = _isStopping.get() |
75 |
|
76 |
@inline |
77 |
def getClientLogger(client: AnyRef): Logger = { |
78 |
client match { |
79 |
case null ⇒ |
80 |
this.logger |
81 |
|
82 |
case _ ⇒ |
83 |
LoggerFactory.getLogger(client.getClass) |
84 |
} |
85 |
} |
86 |
|
87 |
def debug(client: AnyRef, fmt: String, args: Any*) = { |
88 |
getClientLogger(client).debug(fmt.format(args: _*)) |
89 |
} |
90 |
|
91 |
def info(client: AnyRef, fmt: String, args: Any*) = { |
92 |
getClientLogger(client).info(fmt.format(args: _*)) |
93 |
} |
94 |
|
95 |
def warn(client: AnyRef, fmt: String, args: Any*) = { |
96 |
getClientLogger(client).warn(fmt.format(args: _*)) |
97 |
} |
98 |
|
99 |
@throws(classOf[AquariumInternalError]) |
100 |
def apply[T: Manifest](key: TypedKey[T]): T = { |
101 |
try { |
102 |
env.getEx(key) |
103 |
} catch { |
104 |
case e: Exception ⇒ |
105 |
throw new AquariumInternalError("Could not locate %s in Aquarium environment".format(key)) |
106 |
} |
107 |
} |
108 |
|
109 |
private[this] lazy val _allServices = Aquarium.ServiceKeys.map(this.apply(_)) |
110 |
|
111 |
private[this] def startServices(): Unit = { |
112 |
for(service ← _allServices) { |
113 |
logStartingF(service.toString) { |
114 |
service.start() |
115 |
} {} |
116 |
} |
117 |
} |
118 |
|
119 |
private[this] def stopServices(): Unit = { |
120 |
val services = _allServices.reverse |
121 |
|
122 |
for(service ← services) { |
123 |
logStoppingF(service.toString) { |
124 |
safeUnit(service.stop()) |
125 |
} {} |
126 |
} |
127 |
} |
128 |
|
129 |
private[this] def showBasicConfiguration(): Unit = { |
130 |
for(folder ← this.eventsStoreFolder) { |
131 |
logger.info("{} = {}", EnvKeys.eventsStoreFolder.name, folder) |
132 |
} |
133 |
this.eventsStoreFolder.throwMe // on error |
134 |
} |
135 |
|
136 |
private[this] def addShutdownHooks(): Unit = { |
137 |
Runtime.getRuntime.addShutdownHook(new Thread(new Runnable { |
138 |
def run = { |
139 |
if(!_isStopping.get()) { |
140 |
logStoppingF("Aquarium") { |
141 |
stop() |
142 |
} {} |
143 |
} |
144 |
} |
145 |
})) |
146 |
} |
147 |
|
148 |
def start(): Unit = { |
149 |
this._isStopping.set(false) |
150 |
showBasicConfiguration() |
151 |
addShutdownHooks() |
152 |
startServices() |
153 |
} |
154 |
|
155 |
def stop(): Unit = { |
156 |
this._isStopping.set(true) |
157 |
stopServices() |
158 |
} |
159 |
|
160 |
/** |
161 |
* Stops Aquarium after the given millis. Used during testing. |
162 |
*/ |
163 |
def stopAfterMillis(millis: Long) { |
164 |
Thread sleep millis |
165 |
stop() |
166 |
} |
167 |
|
168 |
/** |
169 |
* Reflectively provide a new instance of a class and configure it appropriately. |
170 |
*/ |
171 |
def newInstance[C <: AnyRef](_class: Class[C]): C = { |
172 |
newInstance(_class.getName) |
173 |
} |
174 |
|
175 |
/** |
176 |
* Reflectively provide a new instance of a class and configure it appropriately. |
177 |
*/ |
178 |
def newInstance[C <: AnyRef](className: String): C = { |
179 |
val originalProps = apply(EnvKeys.originalProps) |
180 |
|
181 |
val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C]) |
182 |
instanceM match { |
183 |
case Just(instance) ⇒ |
184 |
// eventBus.addSubscriber[C](instance) |
185 |
instance match { |
186 |
case aquariumAware: AquariumAware ⇒ |
187 |
aquariumAware.awareOfAquarium(AquariumCreatedEvent(this)) |
188 |
|
189 |
case _ ⇒ |
190 |
} |
191 |
|
192 |
instance match { |
193 |
case configurable: Configurable if (originalProps ne null) ⇒ |
194 |
val localProps = configurable.propertyPrefix match { |
195 |
case somePrefix @ Some(prefix) ⇒ |
196 |
if(prefix.length == 0) { |
197 |
logger.warn( |
198 |
"Property prefix for %s is %s. Consider using None".format(instance, somePrefix)) |
199 |
} |
200 |
|
201 |
originalProps.subsetForKeyPrefix(prefix) |
202 |
|
203 |
case None ⇒ |
204 |
originalProps |
205 |
} |
206 |
|
207 |
logger.debug("Configuring {} with props (prefix={})", configurable.getClass.getName, configurable.propertyPrefix) |
208 |
MaybeEither(configurable configure localProps) match { |
209 |
case Just(_) ⇒ |
210 |
logger.info("Configured {} with props (prefix={})", configurable.getClass.getName, configurable.propertyPrefix) |
211 |
|
212 |
case Failed(e) ⇒ |
213 |
throw new AquariumInternalError("Could not configure instance of %s".format(className), e) |
214 |
} |
215 |
|
216 |
case _ ⇒ |
217 |
} |
218 |
|
219 |
instance |
220 |
|
221 |
case Failed(e) ⇒ |
222 |
throw new AquariumInternalError("Could not instantiate %s".format(className), e) |
223 |
} |
224 |
|
225 |
} |
226 |
|
227 |
def currentResourceTypesMap: Map[String, ResourceType] = { |
228 |
val policyOpt = policyStore.loadValidPolicyAt(TimeHelpers.nowMillis()) |
229 |
if(policyOpt.isEmpty) { |
230 |
throw new AquariumInternalError("Not even the default policy found") |
231 |
} |
232 |
|
233 |
policyOpt.get.resourceTypesMap |
234 |
} |
235 |
|
236 |
def unsafeValidPolicyAt(referenceTimeMillis: Long): PolicyModel = { |
237 |
policyStore.loadValidPolicyAt(referenceTimeMillis) match { |
238 |
case None ⇒ |
239 |
throw new AquariumInternalError( |
240 |
"No policy found at %s".format(TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis)) |
241 |
) |
242 |
|
243 |
case Some(policy) ⇒ |
244 |
policy |
245 |
} |
246 |
} |
247 |
|
248 |
def unsafePriceTableForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTable = { |
249 |
val policyAtReferenceTime = unsafeValidPolicyAt(referenceTimeMillis) |
250 |
policyAtReferenceTime.roleMapping.get(role) match { |
251 |
case None ⇒ |
252 |
throw new AquariumInternalError("Unknown price table for role %s at %s".format( |
253 |
role, |
254 |
TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis) |
255 |
)) |
256 |
|
257 |
case Some(fullPriceTable) ⇒ |
258 |
fullPriceTable |
259 |
} |
260 |
} |
261 |
|
262 |
/** |
263 |
* Computes the initial user agreement for the given role and reference time. Also, |
264 |
* records the ID from a potential related IMEvent. |
265 |
* |
266 |
* @param role The role in the agreement |
267 |
* @param referenceTimeMillis The reference time to consider for the agreement |
268 |
*/ |
269 |
def initialUserAgreement( |
270 |
role: String, |
271 |
referenceTimeMillis: Long, |
272 |
relatedIMEventID: Option[String] |
273 |
): UserAgreementModel = { |
274 |
|
275 |
// Just checking |
276 |
assert(null ne unsafePriceTableForRoleAt(role, referenceTimeMillis)) |
277 |
|
278 |
StdUserAgreement( |
279 |
"<StandardUserAgreement>", |
280 |
relatedIMEventID, |
281 |
0, |
282 |
Long.MaxValue, |
283 |
role, |
284 |
PolicyDefinedFullPriceTableRef |
285 |
) |
286 |
} |
287 |
|
288 |
def initialUserBalance(role: String, referenceTimeMillis: Long): Double = { |
289 |
// FIXME: Where is the mapping? |
290 |
1000.0 |
291 |
} |
292 |
|
293 |
def chargingBehaviorOf(resourceType: ResourceType): ChargingBehavior = { |
294 |
val className = resourceType.chargingBehavior |
295 |
_chargingBehaviorMap.get(className) match { |
296 |
case Some(chargingBehavior) ⇒ |
297 |
chargingBehavior |
298 |
|
299 |
case _ ⇒ |
300 |
// It does not matter if this is entered by multiple threads and more than one instance of the same class |
301 |
// is created. The returned instance is not meant to be cached. |
302 |
try { |
303 |
val chargingBehavior = newInstance[ChargingBehavior](className) |
304 |
_chargingBehaviorMap synchronized { |
305 |
_chargingBehaviorMap = _chargingBehaviorMap.updated(className, chargingBehavior) |
306 |
} |
307 |
|
308 |
chargingBehavior |
309 |
} |
310 |
catch { |
311 |
case e: Exception ⇒ |
312 |
throw new AquariumInternalError("Could not load charging behavior %s".format(className), e) |
313 |
} |
314 |
} |
315 |
} |
316 |
|
317 |
def defaultPolicyModel = apply(EnvKeys.defaultPolicyModel) |
318 |
|
319 |
def defaultClassLoader = apply(EnvKeys.defaultClassLoader) |
320 |
|
321 |
def resourceEventStore = apply(EnvKeys.storeProvider).resourceEventStore |
322 |
|
323 |
def imEventStore = apply(EnvKeys.storeProvider).imEventStore |
324 |
|
325 |
def userStateStore = apply(EnvKeys.storeProvider).userStateStore |
326 |
|
327 |
def policyStore = this.cachingPolicyStore |
328 |
|
329 |
def eventsStoreFolder = apply(EnvKeys.eventsStoreFolder) |
330 |
|
331 |
def eventBus = apply(EnvKeys.eventBus) |
332 |
|
333 |
def chargingService = apply(EnvKeys.chargingService) |
334 |
|
335 |
def userStateTimestampThreshold = apply(EnvKeys.userStateTimestampThreshold) |
336 |
|
337 |
def adminCookie = apply(EnvKeys.adminCookie) |
338 |
|
339 |
def converters = apply(EnvKeys.converters) |
340 |
|
341 |
def saveResourceEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveRCEvents) |
342 |
|
343 |
def saveIMEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveIMEvents) |
344 |
|
345 |
def timerService = apply(EnvKeys.timerService) |
346 |
|
347 |
def restPort = apply(EnvKeys.restPort) |
348 |
|
349 |
def akkaService = apply(EnvKeys.akkaService) |
350 |
|
351 |
def version = apply(EnvKeys.version) |
352 |
} |
353 |
|
354 |
object Aquarium { |
355 |
final val PropsToShow = List( |
356 |
SysProp.JavaVMName, |
357 |
SysProp.JavaVersion, |
358 |
SysProp.JavaHome, |
359 |
SysProp.JavaClassVersion, |
360 |
SysProp.JavaLibraryPath, |
361 |
SysProp.JavaClassPath, |
362 |
SysProp.JavaIOTmpDir, |
363 |
SysProp.UserName, |
364 |
SysProp.UserHome, |
365 |
SysProp.UserDir, |
366 |
SysProp.FileEncoding |
367 |
) |
368 |
|
369 |
object HTTP { |
370 |
final val RESTAdminHeaderName = "X-Aquarium-Admin-Cookie" |
371 |
final val RESTAdminHeaderNameLowerCase = RESTAdminHeaderName.toLowerCase |
372 |
} |
373 |
|
374 |
final class AquariumEnvKey[T: Manifest](override val name: String) extends TypedKeySkeleton[T](name) { |
375 |
override def toString = "%s(%s)".format(manifest[T], name) |
376 |
} |
377 |
|
378 |
final val ServiceKeys: List[TypedKey[_ <: Lifecycle]] = List( |
379 |
EnvKeys.timerService, |
380 |
EnvKeys.akkaService, |
381 |
EnvKeys.eventBus, |
382 |
EnvKeys.restService, |
383 |
EnvKeys.rabbitMQService, |
384 |
EnvKeys.storeWatcherService |
385 |
) |
386 |
|
387 |
object EnvKeys { |
388 |
/** |
389 |
* The Aquarium version. Will be reported in any due occasion. |
390 |
*/ |
391 |
final val version = StringKey("version") |
392 |
|
393 |
final val originalProps: TypedKey[Props] = |
394 |
new AquariumEnvKey[Props]("originalProps") |
395 |
|
396 |
/** |
397 |
* The fully qualified name of the class that implements the `StoreProvider`. |
398 |
* Will be instantiated reflectively and should have a public default constructor. |
399 |
*/ |
400 |
final val storeProvider: TypedKey[StoreProvider] = |
401 |
new AquariumEnvKey[StoreProvider]("store.provider.class") |
402 |
|
403 |
/** |
404 |
* If a value is given to this property, then it represents a folder where all events coming to aquarium are |
405 |
* saved. |
406 |
* |
407 |
* This is for debugging purposes. |
408 |
*/ |
409 |
final val eventsStoreFolder: TypedKey[Option[File]] = |
410 |
new AquariumEnvKey[Option[File]]("events.store.folder") |
411 |
|
412 |
/** |
413 |
* If this is `true` and `events.store.folder` is defined, then all resource events are |
414 |
* also stored in `events.store.folder`. |
415 |
* |
416 |
* This is for debugging purposes. |
417 |
*/ |
418 |
|
419 |
final val eventsStoreSaveRCEvents = BooleanKey("events.store.save.rc.events") |
420 |
|
421 |
/** |
422 |
* If this is `true` and `events.store.folder` is defined, then all IM events are |
423 |
* also stored in `events.store.folder`. |
424 |
* |
425 |
* This is for debugging purposes. |
426 |
*/ |
427 |
final val eventsStoreSaveIMEvents = BooleanKey("events.store.save.im.events") |
428 |
|
429 |
/** |
430 |
* A time period in milliseconds for which we can tolerate stale parts regarding user state. |
431 |
* |
432 |
* The smaller the value, the more accurate the user credits and other state parts are. |
433 |
* |
434 |
* If a request for user state (e.g. balance) is received and the request timestamp exceeds |
435 |
* the timestamp of the last known balance amount by this value, then a re-computation for |
436 |
* the balance is triggered. |
437 |
*/ |
438 |
final val userStateTimestampThreshold = LongKey("user.state.timestamp.threshold") |
439 |
|
440 |
/** |
441 |
* REST service listening port. |
442 |
*/ |
443 |
final val restPort = IntKey("rest.port") |
444 |
|
445 |
final val restShutdownTimeoutMillis = LongKey("rest.shutdown.timeout.millis") |
446 |
|
447 |
/** |
448 |
* A cookie used in every administrative REST API call, so that Aquarium knows it comes from |
449 |
* an authorised client. |
450 |
*/ |
451 |
final val adminCookie: TypedKey[Option[String]] = |
452 |
new AquariumEnvKey[Option[String]]("admin.cookie") |
453 |
|
454 |
/** |
455 |
* The class that initializes the REST service |
456 |
*/ |
457 |
final val restService: TypedKey[Lifecycle] = |
458 |
new AquariumEnvKey[Lifecycle]("rest.service.class") |
459 |
|
460 |
final val akkaService: TypedKey[AkkaService] = |
461 |
new AquariumEnvKey[AkkaService]("akka.service") |
462 |
|
463 |
final val eventBus: TypedKey[EventBusService] = |
464 |
new AquariumEnvKey[EventBusService]("event.bus.service") |
465 |
|
466 |
final val timerService: TypedKey[TimerService] = |
467 |
new AquariumEnvKey[TimerService]("timer.service") |
468 |
|
469 |
final val rabbitMQService: TypedKey[RabbitMQService] = |
470 |
new AquariumEnvKey[RabbitMQService]("rabbitmq.service") |
471 |
|
472 |
final val storeWatcherService: TypedKey[StoreWatcherService] = |
473 |
new AquariumEnvKey[StoreWatcherService]("store.watcher.service") |
474 |
|
475 |
final val converters: TypedKey[Converters] = |
476 |
new AquariumEnvKey[Converters]("converters") |
477 |
|
478 |
final val chargingService: TypedKey[ChargingService] = |
479 |
new AquariumEnvKey[ChargingService]("charging.service") |
480 |
|
481 |
final val defaultClassLoader: TypedKey[ClassLoader] = |
482 |
new AquariumEnvKey[ClassLoader]("default.class.loader") |
483 |
|
484 |
final val defaultPolicyModel: TypedKey[PolicyModel] = |
485 |
new AquariumEnvKey[PolicyModel]("default.policy.model") |
486 |
} |
487 |
} |