Statistics
| Branch: | Tag: | Revision:

root / src / main / scala / gr / grnet / aquarium / Aquarium.scala @ 10c87819

History | View | Annotate | Download (15.5 kB)

1
/*
2
 * Copyright 2011-2012 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *
12
 *   2. Redistributions in binary form must reproduce the above
13
 *      copyright notice, this list of conditions and the following
14
 *      disclaimer in the documentation and/or other materials
15
 *      provided with the distribution.
16
 *
17
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
18
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
19
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
21
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
24
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
25
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
26
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
27
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28
 * POSSIBILITY OF SUCH DAMAGE.
29
 *
30
 * The views and conclusions contained in the software and
31
 * documentation are those of the authors and should not be
32
 * interpreted as representing official policies, either expressed
33
 * or implied, of GRNET S.A.
34
 */
35

    
36
package gr.grnet.aquarium
37

    
38
import com.ckkloverdos.env.Env
39
import com.ckkloverdos.key.{IntKey, StringKey, LongKey, TypedKeySkeleton, TypedKey, BooleanKey}
40
import com.ckkloverdos.props.Props
41
import connector.rabbitmq.RabbitMQProducer
42
import gr.grnet.aquarium.store.{PolicyStore, StoreProvider}
43
import java.io.File
44
import gr.grnet.aquarium.util.{Loggable, Lifecycle}
45
import gr.grnet.aquarium.service.{StoreWatcherService, RabbitMQService, TimerService, EventBusService, AkkaService}
46
import com.ckkloverdos.convert.Converters
47
import java.util.concurrent.atomic.AtomicBoolean
48
import org.slf4j.{LoggerFactory, Logger}
49
import com.ckkloverdos.maybe._
50
import com.ckkloverdos.sys.SysProp
51
import gr.grnet.aquarium.service.event.AquariumCreatedEvent
52
import gr.grnet.aquarium.policy.{FullPriceTable, PolicyModel, CachingPolicyStore, PolicyDefinedFullPriceTableRef, StdUserAgreement, UserAgreementModel, ResourceType}
53
import gr.grnet.aquarium.charging.{ChargingService, ChargingBehavior}
54
import gr.grnet.aquarium.util.date.TimeHelpers
55

    
56
/**
57
 *
58
 * @author Christos KK Loverdos <loverdos@gmail.com>
59
 */
60

    
61
final class Aquarium(env: Env) extends Lifecycle with Loggable {
62
  import Aquarium.EnvKeys
63

    
64
  @volatile private[this] var _chargingBehaviorMap = Map[String, ChargingBehavior]()
65

    
66
  private[this] lazy val cachingPolicyStore = new CachingPolicyStore(
67
    apply(EnvKeys.defaultPolicyModel),
68
    apply(EnvKeys.storeProvider).policyStore
69
  )
70

    
71
  private[this] val _isStopping = new AtomicBoolean(false)
72

    
73
  override def toString = "%s/v%s".format(getClass.getName, version)
74

    
75
  def isStopping() = _isStopping.get()
76

    
77
  @inline
78
  def getClientLogger(client: AnyRef): Logger = {
79
    client match {
80
      case null ⇒
81
        this.logger
82

    
83
      case _ ⇒
84
        LoggerFactory.getLogger(client.getClass)
85
    }
86
  }
87

    
88
  def debug(client: AnyRef, fmt: String, args: Any*) = {
89
    getClientLogger(client).debug(fmt.format(args: _*))
90
  }
91

    
92
  def info(client: AnyRef, fmt: String, args: Any*) = {
93
    getClientLogger(client).info(fmt.format(args: _*))
94
  }
95

    
96
  def warn(client: AnyRef, fmt: String, args: Any*) = {
97
    getClientLogger(client).warn(fmt.format(args: _*))
98
  }
99

    
100
  @throws(classOf[AquariumInternalError])
101
  def apply[T: Manifest](key: TypedKey[T]): T = {
102
    try {
103
     env.getEx(key)
104
    } catch {
105
      case e: Exception ⇒
106
        throw new AquariumInternalError("Could not locate %s in Aquarium environment".format(key))
107
    }
108
  }
109

    
110
  private[this] lazy val _allServices = Aquarium.ServiceKeys.map(this.apply(_))
111

    
112
  private[this] def startServices(): Unit = {
113
    for(service ← _allServices) {
114
      logStartingF(service.toString) {
115
        service.start()
116
      } {}
117
    }
118
  }
119

    
120
  private[this] def stopServices(): Unit = {
121
    val services = _allServices.reverse
122

    
123
    for(service ← services) {
124
      logStoppingF(service.toString) {
125
        safeUnit(service.stop())
126
      } {}
127
    }
128
  }
129

    
130
  private[this] def showBasicConfiguration(): Unit = {
131
    for(folder ← this.eventsStoreFolder) {
132
      logger.info("{} = {}", EnvKeys.eventsStoreFolder.name, folder)
133
    }
134
    this.eventsStoreFolder.throwMe // on error
135

    
136
    logger.info("default policy = {}", defaultPolicyModel.toJsonString)
137
  }
138

    
139
  private[this] def addShutdownHooks(): Unit = {
140
    Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
141
      def run = {
142
        if(!_isStopping.get()) {
143
          logStoppingF("Aquarium") {
144
            stop()
145
          } {}
146
        }
147
      }
148
    }))
149
  }
150

    
151
  def start(): Unit = {
152
    this._isStopping.set(false)
153
    showBasicConfiguration()
154
    addShutdownHooks()
155
    startServices()
156
  }
157

    
158
  def stop(): Unit = {
159
    this._isStopping.set(true)
160
    stopServices()
161
  }
162

    
163
  /**
164
   * Stops Aquarium after the given millis. Used during testing.
165
   */
166
  def stopAfterMillis(millis: Long) {
167
    Thread sleep millis
168
    stop()
169
  }
170

    
171
  /**
172
   * Reflectively provide a new instance of a class and configure it appropriately.
173
   */
174
  def newInstance[C <: AnyRef](_class: Class[C]): C = {
175
    newInstance(_class.getName)
176
  }
177

    
178
  /**
179
   * Reflectively provide a new instance of a class and configure it appropriately.
180
   */
181
  def newInstance[C <: AnyRef](className: String): C = {
182
    val originalProps = apply(EnvKeys.originalProps)
183

    
184
    val instanceM = MaybeEither(defaultClassLoader.loadClass(className).newInstance().asInstanceOf[C])
185
    instanceM match {
186
      case Just(instance) ⇒
187
//        eventBus.addSubscriber[C](instance)
188
        instance match {
189
          case aquariumAware: AquariumAware ⇒
190
            aquariumAware.awareOfAquarium(AquariumCreatedEvent(this))
191

    
192
          case _ ⇒
193
        }
194

    
195
        instance match {
196
          case configurable: Configurable if (originalProps ne null) ⇒
197
            val localProps = configurable.propertyPrefix match {
198
              case somePrefix @ Some(prefix) ⇒
199
                if(prefix.length == 0) {
200
                  logger.warn(
201
                    "Property prefix for %s is %s. Consider using None".format(instance, somePrefix))
202
                }
203

    
204
                originalProps.subsetForKeyPrefix(prefix)
205

    
206
              case None ⇒
207
                originalProps
208
            }
209

    
210
            logger.debug("Configuring {} with props (prefix={})", configurable.getClass.getName, configurable.propertyPrefix)
211
            MaybeEither(configurable configure localProps) match {
212
              case Just(_) ⇒
213
                logger.info("Configured {} with props (prefix={})", configurable.getClass.getName, configurable.propertyPrefix)
214

    
215
              case Failed(e) ⇒
216
                throw new AquariumInternalError("Could not configure instance of %s".format(className), e)
217
            }
218

    
219
          case _ ⇒
220
        }
221

    
222
        instance
223

    
224
      case Failed(e) ⇒
225
        throw new AquariumInternalError("Could not instantiate %s".format(className), e)
226
    }
227

    
228
  }
229

    
230
  def currentResourceTypesMap: Map[String, ResourceType] = {
231
    val policyOpt = policyStore.loadValidPolicyAt(TimeHelpers.nowMillis())
232
    if(policyOpt.isEmpty) {
233
      throw new AquariumInternalError("Not even the default policy found")
234
    }
235

    
236
    policyOpt.get.resourceTypesMap
237
  }
238

    
239
  def unsafeValidPolicyAt(referenceTimeMillis: Long): PolicyModel = {
240
    policyStore.loadValidPolicyAt(referenceTimeMillis) match {
241
      case None ⇒
242
        throw new AquariumInternalError(
243
          "No policy found at %s".format(TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis))
244
        )
245

    
246
      case Some(policy) ⇒
247
        policy
248
    }
249
  }
250

    
251
  def unsafePriceTableForRoleAt(role: String, referenceTimeMillis: Long): FullPriceTable = {
252
    val policyAtReferenceTime = unsafeValidPolicyAt(referenceTimeMillis)
253
    policyAtReferenceTime.roleMapping.get(role) match {
254
      case None ⇒
255
        throw new AquariumInternalError("Unknown price table for role %s at %s".format(
256
          role,
257
          TimeHelpers.toYYYYMMDDHHMMSSSSS(referenceTimeMillis)
258
        ))
259

    
260
      case Some(fullPriceTable) ⇒
261
        fullPriceTable
262
    }
263
  }
264

    
265
  /**
266
   * Computes the initial user agreement for the given role and reference time. Also,
267
   * records the ID from a potential related IMEvent.
268
   *
269
   * @param role                The role in the agreement
270
   * @param referenceTimeMillis The reference time to consider for the agreement
271
   */
272
  def initialUserAgreement(
273
      role: String,
274
      referenceTimeMillis: Long,
275
      relatedIMEventID: Option[String]
276
  ): UserAgreementModel = {
277

    
278
    // Just checking
279
    assert(null ne unsafePriceTableForRoleAt(role, referenceTimeMillis))
280

    
281
    StdUserAgreement(
282
      "<StandardUserAgreement>",
283
      relatedIMEventID,
284
      0,
285
      Long.MaxValue,
286
      role,
287
      PolicyDefinedFullPriceTableRef()
288
    )
289
  }
290

    
291
  def initialUserBalance(role: String, referenceTimeMillis: Long): Double = {
292
    // FIXME: Where is the mapping?
293
    0.0
294
  }
295

    
296
  def chargingBehaviorOf(resourceType: ResourceType): ChargingBehavior = {
297
    // A resource type never changes charging behavior. By definition.
298
    val className = resourceType.chargingBehavior
299
    _chargingBehaviorMap.get(className) match {
300
      case Some(chargingBehavior) ⇒
301
        chargingBehavior
302

    
303
      case _ ⇒
304
        try {
305
          _chargingBehaviorMap synchronized {
306
            val chargingBehavior = newInstance[ChargingBehavior](className)
307
            _chargingBehaviorMap = _chargingBehaviorMap.updated(className, chargingBehavior)
308
            chargingBehavior
309
          }
310
        }
311
        catch {
312
          case e: Exception ⇒
313
            throw new AquariumInternalError("Could not load charging behavior %s".format(className), e)
314
        }
315
    }
316
  }
317

    
318
  def defaultPolicyModel = apply(EnvKeys.defaultPolicyModel)
319

    
320
  def defaultClassLoader = apply(EnvKeys.defaultClassLoader)
321

    
322
  def resourceEventStore = apply(EnvKeys.storeProvider).resourceEventStore
323

    
324
  def imEventStore = apply(EnvKeys.storeProvider).imEventStore
325

    
326
  def userStateStore = apply(EnvKeys.storeProvider).userStateStore
327

    
328
  def policyStore = this.cachingPolicyStore
329

    
330
  def eventsStoreFolder = apply(EnvKeys.eventsStoreFolder)
331

    
332
  def eventBus = apply(EnvKeys.eventBus)
333

    
334
  def chargingService = apply(EnvKeys.chargingService)
335

    
336
  def userStateTimestampThreshold = apply(EnvKeys.userStateTimestampThreshold)
337

    
338
  def adminCookie = apply(EnvKeys.adminCookie)
339

    
340
  def converters = apply(EnvKeys.converters)
341

    
342
  def saveResourceEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveRCEvents)
343

    
344
  def saveIMEventsToEventsStoreFolder = apply(EnvKeys.eventsStoreSaveIMEvents)
345

    
346
  def timerService = apply(EnvKeys.timerService)
347

    
348
  def restPort = apply(EnvKeys.restPort)
349

    
350
  def akkaService = apply(EnvKeys.akkaService)
351

    
352
  def version = apply(EnvKeys.version)
353
}
354

    
355
object Aquarium {
356
  final val PropsToShow = List(
357
    SysProp.JavaVMName,
358
    SysProp.JavaVersion,
359
    SysProp.JavaHome,
360
    SysProp.JavaClassVersion,
361
    SysProp.JavaLibraryPath,
362
    SysProp.JavaClassPath,
363
    SysProp.JavaIOTmpDir,
364
    SysProp.UserName,
365
    SysProp.UserHome,
366
    SysProp.UserDir,
367
    SysProp.FileEncoding
368
  )
369

    
370
  object HTTP {
371
   final val RESTAdminHeaderName = "X-Aquarium-Admin-Cookie"
372
   final val RESTAdminHeaderNameLowerCase = RESTAdminHeaderName.toLowerCase
373
 }
374

    
375
  final class AquariumEnvKey[T: Manifest](override val name: String) extends TypedKeySkeleton[T](name) {
376
    override def toString = "%s(%s)".format(manifest[T], name)
377
  }
378

    
379
  final val ServiceKeys: List[TypedKey[_ <: Lifecycle]] = List(
380
    EnvKeys.timerService,
381
    EnvKeys.akkaService,
382
    EnvKeys.eventBus,
383
    EnvKeys.restService,
384
    EnvKeys.rabbitMQService,
385
    EnvKeys.storeWatcherService
386
  )
387

    
388
  object EnvKeys {
389
    /**
390
     * The Aquarium version. Will be reported in any due occasion.
391
     */
392
    final val version = StringKey("version")
393

    
394
    final val originalProps: TypedKey[Props] =
395
      new AquariumEnvKey[Props]("originalProps")
396

    
397
    /**
398
     * The fully qualified name of the class that implements the `StoreProvider`.
399
     * Will be instantiated reflectively and should have a public default constructor.
400
     */
401
    final val storeProvider: TypedKey[StoreProvider] =
402
      new AquariumEnvKey[StoreProvider]("store.provider.class")
403

    
404
    /**
405
     * If a value is given to this property, then it represents a folder where all events coming to aquarium are
406
     * saved.
407
     *
408
     * This is for debugging purposes.
409
     */
410
    final val eventsStoreFolder: TypedKey[Option[File]] =
411
      new AquariumEnvKey[Option[File]]("events.store.folder")
412

    
413
    /**
414
     * If this is `true` and `events.store.folder` is defined, then all resource events are
415
     * also stored in `events.store.folder`.
416
     *
417
     * This is for debugging purposes.
418
     */
419

    
420
    final val eventsStoreSaveRCEvents = BooleanKey("events.store.save.rc.events")
421

    
422
    /**
423
     * If this is `true` and `events.store.folder` is defined, then all IM events are
424
     * also stored in `events.store.folder`.
425
     *
426
     * This is for debugging purposes.
427
     */
428
    final val eventsStoreSaveIMEvents = BooleanKey("events.store.save.im.events")
429

    
430
    /**
431
     * A time period in milliseconds for which we can tolerate stale parts regarding user state.
432
     *
433
     * The smaller the value, the more accurate the user credits and other state parts are.
434
     *
435
     * If a request for user state (e.g. balance) is received and the request timestamp exceeds
436
     * the timestamp of the last known balance amount by this value, then a re-computation for
437
     * the balance is triggered.
438
     */
439
    final val userStateTimestampThreshold = LongKey("user.state.timestamp.threshold")
440

    
441
    /**
442
     * REST service listening port.
443
     */
444
    final val restPort = IntKey("rest.port")
445

    
446
    final val restShutdownTimeoutMillis = LongKey("rest.shutdown.timeout.millis")
447

    
448
    /**
449
     * A cookie used in every administrative REST API call, so that Aquarium knows it comes from
450
     * an authorised client.
451
     */
452
    final val adminCookie: TypedKey[Option[String]] =
453
      new AquariumEnvKey[Option[String]]("admin.cookie")
454

    
455
    /**
456
     * The class that initializes the REST service
457
     */
458
    final val restService: TypedKey[Lifecycle] =
459
      new AquariumEnvKey[Lifecycle]("rest.service.class")
460

    
461
    final val akkaService: TypedKey[AkkaService] =
462
      new AquariumEnvKey[AkkaService]("akka.service")
463

    
464
    final val eventBus: TypedKey[EventBusService] =
465
      new AquariumEnvKey[EventBusService]("event.bus.service")
466

    
467
    final val timerService: TypedKey[TimerService] =
468
      new AquariumEnvKey[TimerService]("timer.service")
469

    
470
    final val rabbitMQService: TypedKey[RabbitMQService] =
471
      new AquariumEnvKey[RabbitMQService]("rabbitmq.service")
472

    
473
    final val rabbitMQProducer: TypedKey[RabbitMQProducer] =
474
      new AquariumEnvKey[RabbitMQProducer]("rabbitmq.client")
475

    
476
    final val storeWatcherService: TypedKey[StoreWatcherService] =
477
      new AquariumEnvKey[StoreWatcherService]("store.watcher.service")
478

    
479
    final val converters: TypedKey[Converters] =
480
      new AquariumEnvKey[Converters]("converters")
481

    
482
    final val chargingService: TypedKey[ChargingService] =
483
      new AquariumEnvKey[ChargingService]("charging.service")
484

    
485
    final val defaultClassLoader: TypedKey[ClassLoader] =
486
      new AquariumEnvKey[ClassLoader]("default.class.loader")
487

    
488
    final val defaultPolicyModel: TypedKey[PolicyModel] =
489
      new AquariumEnvKey[PolicyModel]("default.policy.model")
490
  }
491
}