Statistics
| Branch: | Tag: | Revision:

root / src / main / scala / gr / grnet / aquarium / Aquarium.scala @ 7dbaeb04

History | View | Annotate | Download (15.3 kB)

1
/*
2
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *
12
 *   2. Redistributions in binary form must reproduce the above
13
 *      copyright notice, this list of conditions and the following
14
 *      disclaimer in the documentation and/or other materials
15
 *      provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
 * POSSIBILITY OF SUCH DAMAGE.
29
 *
30
 * The views and conclusions contained in the software and
31
 * documentation are those of the authors and should not be
32
 * interpreted as representing official policies, either expressed
33
 * or implied, of GRNET S.A.
34
 */
35

    
36
package gr.grnet.aquarium
37

    
38
import com.ckkloverdos.env.Env
39
import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey}
40
import com.ckkloverdos.props.Props
41
import gr.grnet.aquarium.store.{PolicyStore, StoreProvider}
42
import java.io.File
43
import gr.grnet.aquarium.util.{Loggable, Lifecycle}
44
import gr.grnet.aquarium.service.{StoreWatcherService, RabbitMQService, TimerService, EventBusService, AkkaService}
45
import com.ckkloverdos.convert.Converters
46
import java.util.concurrent.atomic.AtomicBoolean
47
import org.slf4j.{LoggerFactory, Logger}
48
import com.ckkloverdos.maybe._
49
import com.ckkloverdos.sys.SysProp
50
import gr.grnet.aquarium.service.event.AquariumCreatedEvent
51
import gr.grnet.aquarium.policy.{FullPriceTable, PolicyModel, CachingPolicyStore, PolicyDefinedFullPriceTableRef, StdUserAgreement, UserAgreementModel, ResourceType}
52
import gr.grnet.aquarium.charging.{ChargingService, ChargingBehavior}
53
import gr.grnet.aquarium.util.date.TimeHelpers
54

    
55
/**
56
 *
57
 * @author Christos KK Loverdos <loverdos@gmail.com>
58
 */
59

    
60
final class Aquarium(env: Env) extends Lifecycle with Loggable {
61
  import Aquarium.EnvKeys
62

    
63
  @volatile private[this] var _chargingBehaviorMap = Map[String, ChargingBehavior]()
64

    
65
  private[this] lazy val cachingPolicyStore = new CachingPolicyStore(
66
    apply(EnvKeys.defaultPolicyModel),
67
    apply(EnvKeys.storeProvider).policyStore
68
  )
69

    
70
  private[this] val _isStopping = new AtomicBoolean(false)
71

    
72
  override def toString = "%s/v%s".format(getClass.getName, version)
73

    
74
  def isStopping() = _isStopping.get()
75

    
76
  @inline
77
  def getClientLogger(client: AnyRef): Logger = {
78
    client match {
79
      case null ⇒
80
        this.logger
81

    
82
      case _ ⇒
83
        LoggerFactory.getLogger(client.getClass)
84
    }
85
  }
86

    
87
  def debug(client: AnyRef, fmt: String, args: Any*) = {
88
    getClientLogger(client).debug(fmt.format(args: _*))
89
  }
90

    
91
  def info(client: AnyRef, fmt: String, args: Any*) = {
92
    getClientLogger(client).info(fmt.format(args: _*))
93
  }
94

    
95
  def warn(client: AnyRef, fmt: String, args: Any*) = {
96
    getClientLogger(client).warn(fmt.format(args: _*))
97
  }
98

    
99
  @throws(classOf[AquariumInternalError])
100
  def apply[T: Manifest](key: TypedKey[T]): T = {
101
    try {
102
     env.getEx(key)
103
    } catch {
104
      case e: Exception ⇒
105
        throw new AquariumInternalError("Could not locate %s in Aquarium environment".format(key))
106
    }
107
  }
108

    
109
  private[this] lazy val _allServices = Aquarium.ServiceKeys.map(this.apply(_))
110

    
111
  private[this] def startServices(): Unit = {
112
    for(service ← _allServices) {
113
      logStartingF(service.toString) {
114
        service.start()
115
      } {}
116
    }
117
  }
118

    
119
  private[this] def stopServices(): Unit = {
120
    val services = _allServices.reverse
121

    
122
    for(service ← services) {
123
      logStoppingF(service.toString) {
124
        safeUnit(service.stop())
125
      } {}
126
    }
127
  }
128

    
129
  private[this] def showBasicConfiguration(): Unit = {
130
    for(folder ← this.eventsStoreFolder) {
131
      logger.info("{} = {}", EnvKeys.eventsStoreFolder.name, folder)
132
    }
133
    this.eventsStoreFolder.throwMe // on error
134
  }
135

    
136
  private[this] def addShutdownHooks(): Unit = {
137
    Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
138
      def run = {
139
        if(!_isStopping.get()) {
140
          logStoppingF("Aquarium") {
141
            stop()
142
          } {}
143
        }
144
      }
145
    }))
146
  }
147

    
148
  def start(): Unit = {
149
    this._isStopping.set(false)
150
    showBasicConfiguration()
151
    addShutdownHooks()
152
    startServices()
153
  }
154

    
155
  def stop(): Unit = {
156
    this._isStopping.set(true)
157
    stopServices()
158
  }
159

    
160
  /**
161
   * Stops Aquarium after the given millis. Used during testing.
162
   */
163
  def stopAfterMillis(millis: Long) {
164
    Thread sleep millis
165
    stop()
166
  }
167

    
168
  /**
169
   * Reflectively provide a new instance of a class and configure it appropriately.
170
   */
171
  def newInstance[C <: AnyRef](_class: Class[C]): C = {
172
    newInstance(_class.getName)
173
  }
174

    
175
  /**
176
   * Reflectively provide a new instance of a class and configure it appropriately.
177
   */
178
  def newInstance[C <: AnyRef](className: String): C = {
179
    val originalProps = apply(EnvKeys.originalProps)
180

    
181
    val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
182
    instanceM match {
183
      case Just(instance) ⇒
184
//        eventBus.addSubscriber[C](instance)
185
        instance match {
186
          case aquariumAware: AquariumAware ⇒
187
            aquariumAware.awareOfAquarium(AquariumCreatedEvent(this))
188

    
189
          case _ ⇒
190
        }
191

    
192
        instance match {
193
          case configurable: Configurable if (originalProps ne null) ⇒
194
            val localProps = configurable.propertyPrefix match {
195
              case somePrefix @ Some(prefix) ⇒
196
                if(prefix.length == 0) {
197
                  logger.warn(
198
                    "Property prefix for %s is %s. Consider using None".format(instance, somePrefix))
199
                }
200

    
201
                originalProps.subsetForKeyPrefix(prefix)
202

    
203
              case None ⇒
204
                originalProps
205
            }
206

    
207
            logger.debug("Configuring {} with props (prefix={})", configurable.getClass.getName, configurable.propertyPrefix)
208
            MaybeEither(configurable configure localProps) match {
209
              case Just(_) ⇒
210
                logger.info("Configured {} with props (prefix={})", configurable.getClass.getName, configurable.propertyPrefix)
211

    
212
              case Failed(e) ⇒
213
                throw new AquariumInternalError("Could not configure instance of %s".format(className), e)
214
            }
215

    
216
          case _ ⇒
217
        }
218

    
219
        instance
220

    
221
      case Failed(e) ⇒
222
        throw new AquariumInternalError("Could not instantiate %s".format(className), e)
223
    }
224

    
225
  }
226

    
227
  def currentResourceTypesMap: Map[String, ResourceType] = {
228
    val policyOpt = policyStore.loadValidPolicyAt(TimeHelpers.nowMillis())
229
    if(policyOpt.isEmpty) {
230
      throw new AquariumInternalError("Not even the default policy found")
231
    }
232

    
233
    policyOpt.get.resourceTypesMap
234
  }
235

    
236
  def unsafeValidPolicyAt(referenceTimeMillis: Long): PolicyModel = {
237
    policyStore.loadValidPolicyAt(referenceTimeMillis) match {
238
      case None ⇒
239
        throw new AquariumInternalError(
240
          "No policy found at %s".format(TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis))
241
        )
242

    
243
      case Some(policy) ⇒
244
        policy
245
    }
246
  }
247

    
248
  def unsafePriceTableForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTable = {
249
    val policyAtReferenceTime = unsafeValidPolicyAt(referenceTimeMillis)
250
    policyAtReferenceTime.roleMapping.get(role) match {
251
      case None ⇒
252
        throw new AquariumInternalError("Unknown price table for role %s at %s".format(
253
          role,
254
          TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis)
255
        ))
256

    
257
      case Some(fullPriceTable) ⇒
258
        fullPriceTable
259
    }
260
  }
261

    
262
  /**
263
   * Computes the initial user agreement for the given role and reference time. Also,
264
   * records the ID from a potential related IMEvent.
265
   *
266
   * @param role                The role in the agreement
267
   * @param referenceTimeMillis The reference time to consider for the agreement
268
   */
269
  def initialUserAgreement(
270
      role: String,
271
      referenceTimeMillis: Long,
272
      relatedIMEventID: Option[String]
273
  ): UserAgreementModel = {
274

    
275
    // Just checking
276
    assert(null ne unsafePriceTableForRoleAt(role, referenceTimeMillis))
277

    
278
    StdUserAgreement(
279
      "<StandardUserAgreement>",
280
      relatedIMEventID,
281
      0,
282
      Long.MaxValue,
283
      role,
284
      PolicyDefinedFullPriceTableRef
285
    )
286
  }
287

    
288
  def initialUserBalance(role: String, referenceTimeMillis: Long): Double = {
289
    // FIXME: Where is the mapping?
290
    1000.0
291
  }
292

    
293
  def chargingBehaviorOf(resourceType: ResourceType): ChargingBehavior = {
294
    val className = resourceType.chargingBehavior
295
    _chargingBehaviorMap.get(className) match {
296
      case Some(chargingBehavior) ⇒
297
        chargingBehavior
298

    
299
      case _ ⇒
300
        // It does not matter if this is entered by multiple threads and more than one instance of the same class
301
        // is created. The returned instance is not meant to be cached.
302
        try {
303
          val chargingBehavior = newInstance[ChargingBehavior](className)
304
          _chargingBehaviorMap synchronized {
305
            _chargingBehaviorMap = _chargingBehaviorMap.updated(className, chargingBehavior)
306
          }
307

    
308
          chargingBehavior
309
        }
310
        catch {
311
          case e: Exception ⇒
312
            throw new AquariumInternalError("Could not load charging behavior %s".format(className), e)
313
        }
314
    }
315
  }
316

    
317
  def defaultPolicyModel = apply(EnvKeys.defaultPolicyModel)
318

    
319
  def defaultClassLoader = apply(EnvKeys.defaultClassLoader)
320

    
321
  def resourceEventStore = apply(EnvKeys.storeProvider).resourceEventStore
322

    
323
  def imEventStore = apply(EnvKeys.storeProvider).imEventStore
324

    
325
  def userStateStore = apply(EnvKeys.storeProvider).userStateStore
326

    
327
  def policyStore = this.cachingPolicyStore
328

    
329
  def eventsStoreFolder = apply(EnvKeys.eventsStoreFolder)
330

    
331
  def eventBus = apply(EnvKeys.eventBus)
332

    
333
  def chargingService = apply(EnvKeys.chargingService)
334

    
335
  def userStateTimestampThreshold = apply(EnvKeys.userStateTimestampThreshold)
336

    
337
  def adminCookie = apply(EnvKeys.adminCookie)
338

    
339
  def converters = apply(EnvKeys.converters)
340

    
341
  def saveResourceEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveRCEvents)
342

    
343
  def saveIMEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveIMEvents)
344

    
345
  def timerService = apply(EnvKeys.timerService)
346

    
347
  def restPort = apply(EnvKeys.restPort)
348

    
349
  def akkaService = apply(EnvKeys.akkaService)
350

    
351
  def version = apply(EnvKeys.version)
352
}
353

    
354
object Aquarium {
355
  final val PropsToShow = List(
356
    SysProp.JavaVMName,
357
    SysProp.JavaVersion,
358
    SysProp.JavaHome,
359
    SysProp.JavaClassVersion,
360
    SysProp.JavaLibraryPath,
361
    SysProp.JavaClassPath,
362
    SysProp.JavaIOTmpDir,
363
    SysProp.UserName,
364
    SysProp.UserHome,
365
    SysProp.UserDir,
366
    SysProp.FileEncoding
367
  )
368

    
369
  object HTTP {
370
   final val RESTAdminHeaderName = "X-Aquarium-Admin-Cookie"
371
   final val RESTAdminHeaderNameLowerCase = RESTAdminHeaderName.toLowerCase
372
 }
373

    
374
  final class AquariumEnvKey[T: Manifest](override val name: String) extends TypedKeySkeleton[T](name) {
375
    override def toString = "%s(%s)".format(manifest[T], name)
376
  }
377

    
378
  final val ServiceKeys: List[TypedKey[_ <: Lifecycle]] = List(
379
    EnvKeys.timerService,
380
    EnvKeys.akkaService,
381
    EnvKeys.eventBus,
382
    EnvKeys.restService,
383
    EnvKeys.rabbitMQService,
384
    EnvKeys.storeWatcherService
385
  )
386

    
387
  object EnvKeys {
388
    /**
389
     * The Aquarium version. Will be reported in any due occasion.
390
     */
391
    final val version = StringKey("version")
392

    
393
    final val originalProps: TypedKey[Props] =
394
      new AquariumEnvKey[Props]("originalProps")
395

    
396
    /**
397
     * The fully qualified name of the class that implements the `StoreProvider`.
398
     * Will be instantiated reflectively and should have a public default constructor.
399
     */
400
    final val storeProvider: TypedKey[StoreProvider] =
401
      new AquariumEnvKey[StoreProvider]("store.provider.class")
402

    
403
    /**
404
     * If a value is given to this property, then it represents a folder where all events coming to aquarium are
405
     * saved.
406
     *
407
     * This is for debugging purposes.
408
     */
409
    final val eventsStoreFolder: TypedKey[Option[File]] =
410
      new AquariumEnvKey[Option[File]]("events.store.folder")
411

    
412
    /**
413
     * If this is `true` and `events.store.folder` is defined, then all resource events are
414
     * also stored in `events.store.folder`.
415
     *
416
     * This is for debugging purposes.
417
     */
418

    
419
    final val eventsStoreSaveRCEvents = BooleanKey("events.store.save.rc.events")
420

    
421
    /**
422
     * If this is `true` and `events.store.folder` is defined, then all IM events are
423
     * also stored in `events.store.folder`.
424
     *
425
     * This is for debugging purposes.
426
     */
427
    final val eventsStoreSaveIMEvents = BooleanKey("events.store.save.im.events")
428

    
429
    /**
430
     * A time period in milliseconds for which we can tolerate stale parts regarding user state.
431
     *
432
     * The smaller the value, the more accurate the user credits and other state parts are.
433
     *
434
     * If a request for user state (e.g. balance) is received and the request timestamp exceeds
435
     * the timestamp of the last known balance amount by this value, then a re-computation for
436
     * the balance is triggered.
437
     */
438
    final val userStateTimestampThreshold = LongKey("user.state.timestamp.threshold")
439

    
440
    /**
441
     * REST service listening port.
442
     */
443
    final val restPort = IntKey("rest.port")
444

    
445
    final val restShutdownTimeoutMillis = LongKey("rest.shutdown.timeout.millis")
446

    
447
    /**
448
     * A cookie used in every administrative REST API call, so that Aquarium knows it comes from
449
     * an authorised client.
450
     */
451
    final val adminCookie: TypedKey[Option[String]] =
452
      new AquariumEnvKey[Option[String]]("admin.cookie")
453

    
454
    /**
455
     * The class that initializes the REST service
456
     */
457
    final val restService: TypedKey[Lifecycle] =
458
      new AquariumEnvKey[Lifecycle]("rest.service.class")
459

    
460
    final val akkaService: TypedKey[AkkaService] =
461
      new AquariumEnvKey[AkkaService]("akka.service")
462

    
463
    final val eventBus: TypedKey[EventBusService] =
464
      new AquariumEnvKey[EventBusService]("event.bus.service")
465

    
466
    final val timerService: TypedKey[TimerService] =
467
      new AquariumEnvKey[TimerService]("timer.service")
468

    
469
    final val rabbitMQService: TypedKey[RabbitMQService] =
470
      new AquariumEnvKey[RabbitMQService]("rabbitmq.service")
471

    
472
    final val storeWatcherService: TypedKey[StoreWatcherService] =
473
      new AquariumEnvKey[StoreWatcherService]("store.watcher.service")
474

    
475
    final val converters: TypedKey[Converters] =
476
      new AquariumEnvKey[Converters]("converters")
477

    
478
    final val chargingService: TypedKey[ChargingService] =
479
      new AquariumEnvKey[ChargingService]("charging.service")
480

    
481
    final val defaultClassLoader: TypedKey[ClassLoader] =
482
      new AquariumEnvKey[ClassLoader]("default.class.loader")
483

    
484
    final val defaultPolicyModel: TypedKey[PolicyModel] =
485
      new AquariumEnvKey[PolicyModel]("default.policy.model")
486
  }
487
}