import service.pinger.PingerActor
import service.rest.RESTActor
import service.user.{UserActor}
-import message.config.{AquariumPropertiesLoaded, ActorProviderConfigured, ActorConfigurationMessage}
import cc.spray.can.{Timeout, RequestContext}
import gr.grnet.aquarium.actor.message.event.{ProcessIMEvent, ProcessResourceEvent}
import gr.grnet.aquarium.actor.message.admin.PingAllRequest
import gr.grnet.aquarium.actor.message.{GetUserStateRequest, GetUserBalanceRequest}
+import gr.grnet.aquarium.actor.message.config.{InitializeUserState, AquariumPropertiesLoaded, ActorProviderConfigured, ActorConfigurationMessage}
/**
* Each actor within Aquarium plays one role.
classOf[ProcessIMEvent],
classOf[GetUserBalanceRequest],
classOf[GetUserStateRequest]),
- Set(classOf[ActorProviderConfigured],
+ Set(classOf[InitializeUserState],
+ classOf[ActorProviderConfigured],
classOf[AquariumPropertiesLoaded]))
--- /dev/null
+/*
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.actor.message.config
+
+import gr.grnet.aquarium.actor.message.{UserActorRequestMessage, ActorMessage}
+
+
+/**
+ * This is sent to a [[gr.grnet.aquarium.actor.service.user.UserActor]] in order to initialize its internal state.
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+
+case class InitializeUserState(userID: String) extends ActorMessage with ActorConfigurationMessage
import gr.grnet.aquarium.service.RoleableActorProviderService
import akka.actor.ActorRef
import user.{UserActorCache}
-import message.config.{AquariumPropertiesLoaded, ActorProviderConfigured}
import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
import gr.grnet.aquarium.actor.message.admin.PingAllRequest
import gr.grnet.aquarium.actor.message.{UserActorRequestMessage, GetUserStateRequest, GetUserBalanceRequest}
import gr.grnet.aquarium.{AquariumException, AquariumInternalError}
+import gr.grnet.aquarium.actor.message.config.{InitializeUserState, AquariumPropertiesLoaded, ActorProviderConfigured}
/**
* Business logic router. Incoming messages are routed to appropriate destinations. Replies are routed back
val userActor = _actorProvider.actorForRole(UserActorRole)
UserActorCache.put(userID, userActor)
+ userActor ! InitializeUserState(userID)
+
userActor
}
import gr.grnet.aquarium.actor._
import gr.grnet.aquarium.util.shortClassNameOf
-import message.config.{ActorProviderConfigured, AquariumPropertiesLoaded}
import akka.config.Supervision.Temporary
import gr.grnet.aquarium.Aquarium
-import gr.grnet.aquarium.util.date.{TimeHelpers, MutableDateCalc}
import gr.grnet.aquarium.actor.message.event.{ProcessResourceEvent, ProcessIMEvent}
-import gr.grnet.aquarium.actor.message.{GetUserStateResponse, GetUserBalanceResponse, GetUserStateRequest, GetUserBalanceRequest}
+import gr.grnet.aquarium.actor.message.{GetUserStateRequest, GetUserBalanceRequest}
import gr.grnet.aquarium.computation.data.IMStateSnapshot
-import gr.grnet.aquarium.computation.UserState
import gr.grnet.aquarium.event.model.im.IMEventModel
+import gr.grnet.aquarium.computation.NewUserState
+import gr.grnet.aquarium.actor.message.config.{InitializeUserState, ActorProviderConfigured, AquariumPropertiesLoaded}
/**
*
class UserActor extends ReflectiveRoleableActor {
private[this] var _imState: IMStateSnapshot = _
- private[this] var _userState: UserState = _
+// private[this] var _userState: UserState = _
+// private[this] var _newUserState: NewUserState = _
self.lifeCycle = Temporary
- private[this] def _userID = this._userState.userID
+// private[this] def _userID = this._newUserState.userID
private[this] def _shutmedown(): Unit = {
- if(_haveUserState) {
- UserActorCache.invalidate(_userID)
- }
+// if(_haveUserState) {
+// UserActorCache.invalidate(_userID)
+// }
self.stop()
}
override protected def onThrowable(t: Throwable, message: AnyRef) = {
logChainOfCauses(t)
- ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
+// ERROR(t, "Terminating due to: %s(%s)", shortClassNameOf(t), t.getMessage)
_shutmedown()
}
def role = UserActorRole
- private[this] def _configurator: Aquarium = Aquarium.Instance
+ private[this] def aquarium: Aquarium = Aquarium.Instance
private[this] def _timestampTheshold =
- _configurator.props.getLong(Aquarium.Keys.user_state_timestamp_threshold).getOr(10000)
+ aquarium.props.getLong(Aquarium.Keys.user_state_timestamp_threshold).getOr(10000)
- private[this] def _haveUserState = {
- this._userState ne null
- }
+// private[this] def _haveUserState = {
+// this._newUserState ne null
+// }
private[this] def _haveIMState = {
this._imState ne null
def onActorProviderConfigured(event: ActorProviderConfigured): Unit = {
}
+ private[this] def reloadIMState(userID: String): Unit = {
+ val store = aquarium.imEventStore
+ store.replayIMEventsInOccurrenceOrder(userID) { imEvent ⇒
+ logger.debug("Replaying %s".format(imEvent))
+
+ val newState = this._imState match {
+ case null ⇒
+ IMStateSnapshot.initial(imEvent)
+
+ case currentState ⇒
+ currentState.copyWithEvent(imEvent)
+ }
+
+ this._imState = newState
+ }
+
+ logger.debug("Recomputed %s".format(this._imState))
+ }
+
+ def onInitializeUserState(event: InitializeUserState): Unit = {
+ logger.debug("Got %s".format(event))
+ reloadIMState(event.userID)
+ }
+
private[this] def _getAgreementNameForNewUser(imEvent: IMEventModel): String = {
// FIXME: Implement based on the role
"default"
val hadIMState = _haveIMState
if(hadIMState) {
-
- this._imState = this._imState.addMostRecentEvent(imEvent)
+ if(this._imState.latestIMEvent.id == imEvent.id) {
+ // This happens when the actor is brought to life, then immediately initialized, and then
+ // sent the first IM event. But from the initialization procedure, this IM event will have
+ // already been loaded from DB!
+ logger.debug("Ignoring first %s after birth".format(imEvent.toDebugString))
+ return
+ }
+
+ this._imState = this._imState.copyWithEvent(imEvent)
} else {
this._imState = IMStateSnapshot.initial(imEvent)
}
- DEBUG("%s %s = %s", if(hadIMState) "Update" else "Set", shortClassNameOf(this._imState), this._imState)
+// DEBUG("%s %s = %s", if(hadIMState) "Update" else "Set", shortClassNameOf(this._imState), this._imState)
}
def onGetUserBalanceRequest(event: GetUserBalanceRequest): Unit = {
val userId = event.userID
// FIXME: Implement
- self reply GetUserBalanceResponse(userId, Right(_userState.creditsSnapshot.creditAmount))
+// self reply GetUserBalanceResponse(userId, Right(_userState.creditsSnapshot.creditAmount))
}
def onGetUserStateRequest(event: GetUserStateRequest): Unit = {
val userId = event.userID
// FIXME: Implement
- self reply GetUserStateResponse(userId, Right(this._userState))
+// self reply GetUserStateResponse(userId, Right(this._userState))
}
def onProcessResourceEvent(event: ProcessResourceEvent): Unit = {
}
- private[this] def D_userID = {
- if(this._userState eq null)
- if(this._imState eq null)
- "<NOT INITIALIZED>"
- else
- this._imState.imEvent.userID
- else
- this._userState.userID
- }
-
- private[this] def DEBUG(fmt: String, args: Any*) =
- logger.debug("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
-
- private[this] def INFO(fmt: String, args: Any*) =
- logger.info("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
-
- private[this] def WARN(fmt: String, args: Any*) =
- logger.warn("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
-
- private[this] def ERROR(fmt: String, args: Any*) =
- logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
-
- private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
- logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)), t)
+// private[this] def D_userID = {
+// if(this._newUserState eq null)
+// if(this._imState eq null)
+// "<NOT INITIALIZED>"
+// else
+// this._imState.latestIMEvent.userID
+// else
+// this._newUserState.userID
+// }
+//
+// private[this] def DEBUG(fmt: String, args: Any*) =
+// logger.debug("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
+//
+// private[this] def INFO(fmt: String, args: Any*) =
+// logger.info("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
+//
+// private[this] def WARN(fmt: String, args: Any*) =
+// logger.warn("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
+//
+// private[this] def ERROR(fmt: String, args: Any*) =
+// logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)))
+//
+// private[this] def ERROR(t: Throwable, fmt: String, args: Any*) =
+// logger.error("UserActor[%s]: %s".format(D_userID, fmt.format(args: _*)), t)
}
*/
roleHistory: RoleHistory) {
- def addMostRecentEvent(newEvent: IMEventModel) = {
+ def copyWithEvent(imEvent: IMEventModel) = {
copy(
- latestIMEvent = newEvent,
- roleHistory = roleHistory.addMostRecentRole(newEvent.role, newEvent.occurredMillis)
+ hasBeenActivated = this.hasBeenActivated || imEvent.isActive,
+ latestIMEvent = imEvent,
+ roleHistory = this.roleHistory.copyWithRole(imEvent.role, imEvent.occurredMillis)
)
}
}
import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
import scala.collection.immutable.{TreeMap, SortedMap}
+import scala.collection.mutable.ListBuffer
+import scala.annotation.tailrec
/**
*
TreeMap(roles.map(role => (role.timeslot, role.name)): _*)
}
- /**
- * Insert the new role in front of the other ones and adjust the `validTo` limit of the previous role.
- */
- def addMostRecentRole(role: String, validFrom: Long) = {
- val newItem = RoleHistoryItem(role, validFrom)
- val newRoles = roles match {
+ def copyWithRole(role: String, validFrom: Long) = {
+ val newItems = roles match {
+ case Nil ⇒
+ RoleHistoryItem(role, validFrom) :: Nil
+
case head :: tail ⇒
- newItem :: head.withNewValidTo(validFrom) :: tail
+ if(head.isStrictlyAfter(validFrom)) {
+ // must search history items to find where this fits in
+ @tailrec
+ def check(allChecked: ListBuffer[RoleHistoryItem],
+ lastCheck: RoleHistoryItem,
+ toCheck: List[RoleHistoryItem]): List[RoleHistoryItem] = {
- case Nil ⇒
- newItem :: Nil
+ toCheck match {
+ case Nil ⇒
+ allChecked.append(RoleHistoryItem(role, validFrom, lastCheck.validFrom))
+ allChecked.toList
+
+ case toCheckHead :: toCheckTail ⇒
+ if(toCheckHead.isStrictlyAfter(validFrom)) {
+ allChecked.append(toCheckHead)
+
+ check(allChecked, toCheckHead, toCheckTail)
+ } else {
+ allChecked.append(RoleHistoryItem(role, validFrom, lastCheck.validFrom))
+ allChecked.toList
+ }
+ }
+ }
+
+ val buffer = new ListBuffer[RoleHistoryItem]
+ buffer.append(head)
+ check(buffer, head, tail)
+ } else {
+ // assume head.validTo goes to infinity,
+ RoleHistoryItem(role, validFrom) :: head.copyWithValidTo(validFrom) :: tail
+ }
}
- RoleHistory(newRoles)
+ RoleHistory(newItems)
}
}
import gr.grnet.aquarium.util.date.MutableDateCalc
import gr.grnet.aquarium.logic.accounting.dsl.Timeslot
+import gr.grnet.aquarium.Aquarium
/**
*
*/
validTo: Long = Long.MaxValue) {
- require(
- validFrom <= validTo,
- "validFrom(%s) <= validTo(%s)".format(new MutableDateCalc(validFrom), new MutableDateCalc(validFrom)))
+ try {
+ require(
+ validFrom <= validTo,
+ "validFrom(%s) <= validTo(%s)".format(new MutableDateCalc(validFrom), new MutableDateCalc(validTo)))
+ }
+ catch {
+ case e: IllegalArgumentException ⇒
+ Aquarium.Instance.debug(this, "!! validFrom = %s, validTo = %s, dx=%s", validFrom, validTo, validTo-validFrom)
+ throw e
+ }
require(name ne null, "Name is not null")
def timeslot = Timeslot(validFrom, validTo)
- def withNewValidTo(newValidTo: Long) = copy(validTo = newValidTo)
+ def copyWithValidTo(newValidTo: Long) = copy(validTo = newValidTo)
+
+ def isUpperBounded = {
+ validTo != Long.MaxValue
+ }
+
+ def contains(time: Long) = {
+ validFrom <= time && time < validTo
+ }
+
+ def isStrictlyAfter(time: Long) = {
+ validFrom > time
+ }
override def toString =
"RoleHistoryItem(%s, [%s, %s))".
import gr.grnet.aquarium.util.shortClassNameOf
import gr.grnet.aquarium.util.json.JsonSupport
import gr.grnet.aquarium.util.xml.XmlSupport
+import gr.grnet.aquarium.util.date.MutableDateCalc
/**
* The base model for all events coming from external systems.
def withReceivedMillis(newReceivedMillis: Long): ExternalEventModel
- def toDebugString = "%s(userID=%s, id=%s)".format(shortClassNameOf(this), userID, id)
+ def toDebugString = "%s(userID=%s, id=%s, occurred=%s)".format(
+ shortClassNameOf(this),
+ userID,
+ id,
+ new MutableDateCalc(occurredMillis).toString)
}
object ExternalEventModel {
package im
import gr.grnet.aquarium.util._
+import gr.grnet.aquarium.util.date.MutableDateCalc
/**
* The model of any event sent from the `Identity Management` (IM) external system.
def isModifyUser = eventType.equalsIgnoreCase(IMEventModel.EventTypeNames.modify)
override def toDebugString = {
- "%s(userID=%s, id=%s, isActive=%s, role='%s')".format(shortClassNameOf(this), userID, id, isActive, role)
+ "%s(userID=%s, id=%s, isActive=%s, role='%s', occurred=%s)".format(
+ shortClassNameOf(this),
+ userID,
+ id,
+ isActive,
+ role,
+ new MutableDateCalc(occurredMillis).toString)
}
}
*
*/
def findFirstIsActiveIMEventByUserID(userID: String): Option[IMEvent]
+
+ /**
+ * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
+ * the given function `f`.
+ *
+ * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
+ */
+ def replayIMEventsInOccurrenceOrder(userID: String)(f: IMEvent ⇒ Unit): Unit
}
\ No newline at end of file
None
}
}
+
+ /**
+ * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
+ * the given function `f`.
+ *
+ * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
+ */
+ def replayIMEventsInOccurrenceOrder(userID: String)(f: (IMEvent) => Unit) = {
+ imEventById.valuesIterator.filter(_.userID == userID).toSeq.sortWith {
+ case (ev1, ev2) ⇒ ev1.occurredMillis <= ev2.occurredMillis
+ } foreach(f)
+ }
//- IMEventStore
def loadPolicyEntriesAfter(after: Long) =
}
}
}
+
+ /**
+ * Scans events for the given user, sorted by `occurredMillis` in ascending order and runs them through
+ * the given function `f`.
+ *
+ * Any exception is propagated to the caller. The underlying DB resources are properly disposed in any case.
+ */
+ def replayIMEventsInOccurrenceOrder(userID: String)(f: (IMEvent) => Unit) = {
+ val query = new BasicDBObject(IMEventNames.userID, userID)
+ val cursor = imEvents.find(query).sort(new BasicDBObject(IMEventNames.occurredMillis, 1))
+
+ withCloseable(cursor) { cursor ⇒
+ while(cursor.hasNext) {
+ val model = MongoDBIMEvent.fromDBObject(cursor.next())
+ f(model)
+ }
+ }
+ }
//-IMEventStore
+
+
//+PolicyStore
def loadPolicyEntriesAfter(after: Long): List[PolicyEntry] = {
val query = new BasicDBObject(PolicyEntry.JsonNames.validFrom,