* or implied, of GRNET S.A.
*/
-package gr.grnet.aquarium.connector.rabbitmq.service
-
-import com.ckkloverdos.maybe.{Just, Failed, MaybeEither}
+package gr.grnet.aquarium.connector.handler
import gr.grnet.aquarium.converter.JsonTextFormat
import gr.grnet.aquarium.connector.handler._
import gr.grnet.aquarium.util.safeUnit
import gr.grnet.aquarium.service.EventBusService
import gr.grnet.aquarium.Aquarium
+import com.ckkloverdos.maybe.{NoVal, Just, Failed, MaybeEither}
/**
* Generic handler of events arriving to Aquarium.
*/
class GenericPayloadHandler[E <: ExternalEventModel, S <: ExternalEventModel]
- (jsonParser: Array[Byte] ⇒ JsonTextFormat,
- onJsonParserSuccess: (Array[Byte], JsonTextFormat) ⇒ Unit,
- onJsonParserError: (Array[Byte], Throwable) ⇒ Unit,
- eventParser: JsonTextFormat ⇒ E,
- onEventParserSuccess: (Array[Byte], E) ⇒ Unit,
- onEventParserError: (Array[Byte], Throwable) ⇒ Unit,
- saveAction: E ⇒ S,
- forwardAction: S ⇒ Unit) extends PayloadHandler {
+(jsonParser: Array[Byte] ⇒ JsonTextFormat,
+ onJsonParserSuccess: (Array[Byte], JsonTextFormat) ⇒ Unit,
+ onJsonParserError: (Array[Byte], Throwable) ⇒ Unit,
+ eventParser: JsonTextFormat ⇒ E,
+ onEventParserSuccess: (Array[Byte], E) ⇒ Unit,
+ onEventParserError: (Array[Byte], Throwable) ⇒ Unit,
+
+ /**
+ * If a Some(handlerResult) is returned, then the handlerResult
+ * is also returned from this payload handler.
+ *
+ * This is a business check to ensure that everything is OK before saving to DB.
+ */
+ preSaveAction: E ⇒ Option[HandlerResult],
+ saveAction: E ⇒ S,
+ forwardAction: S ⇒ Unit) extends PayloadHandler {
def handlePayload(payload: Array[Byte]): HandlerResult = {
// 1. try to parse as json
- MaybeEither { jsonParser(payload) } match {
+ MaybeEither {
+ jsonParser(payload)
+ } match {
case Failed(e) ⇒
safeUnit(onJsonParserError(payload, e))
safeUnit(onJsonParserSuccess(payload, jsonTextFormat))
// 2. try to parse as model
- MaybeEither { eventParser(jsonTextFormat) } match {
+ MaybeEither {
+ eventParser(jsonTextFormat)
+ } match {
case Failed(e) ⇒
safeUnit(onEventParserError(payload, e))
case Just(event) ⇒
safeUnit(onEventParserSuccess(payload, event))
- // 3. try to save to DB
- MaybeEither { saveAction(event) } match {
+ // 3. See if we are ready to save to DB
+ MaybeEither {
+ preSaveAction(event)
+ } match {
case Failed(e) ⇒
- HandlerResultPanic
+ // oops. must resend this message due to unexpected result
+ HandlerResultResend
+
+ case Just(Some(handlerResult)) ⇒
+ // Nope. Not ready to save.
+ handlerResult
- case Just(s) ⇒
- // 4. try forward but it's OK if something bad happens here.
- safeUnit { forwardAction(s) }
+ case Just(None) ⇒
+ // Yep. Ready to save
+ // 4. try to save to DB
+ MaybeEither {
+ saveAction(event)
+ } match {
+ case Failed(e) ⇒
+ HandlerResultPanic
- HandlerResultSuccess
+ case Just(s) ⇒
+ // 4. try forward but it's OK if something bad happens here.
+ safeUnit {
+ forwardAction(s)
+ }
+
+ HandlerResultSuccess
+ }
}
+
}
}
}
--- /dev/null
+/*
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.connector.handler
+
+import gr.grnet.aquarium.Aquarium
+import org.slf4j.Logger
+import gr.grnet.aquarium.converter.JsonTextFormat
+import gr.grnet.aquarium.util.{Tags, shortClassNameOf}
+import gr.grnet.aquarium.actor.RouterRole
+import gr.grnet.aquarium.store.{IMEventStore, LocalFSEventStore}
+import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
+import gr.grnet.aquarium.actor.message.event.ProcessIMEvent
+
+/**
+ * A [[gr.grnet.aquarium.connector.handler.PayloadHandler]] for
+ * [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+
+class IMEventPayloadHandler(aquarium: Aquarium, logger: Logger)
+ extends GenericPayloadHandler[IMEventModel, IMEventStore#IMEvent](
+ payload ⇒ {
+ aquarium.converters.convertEx[JsonTextFormat](payload)
+ },
+
+ (payload, jsonTextFormat) ⇒ {
+ },
+
+ (payload, error) ⇒ {
+ logger.error("Error creating JSON from %s payload".format(Tags.IMEventTag), error)
+
+ LocalFSEventStore.storeUnparsedIMEvent(aquarium, payload, error)
+ },
+
+ jsonTextFormat ⇒ {
+ StdIMEvent.fromJsonTextFormat(jsonTextFormat)
+ },
+
+ (payload, event) ⇒ {
+ LocalFSEventStore.storeIMEvent(aquarium, event, payload)
+ },
+
+ (payload, error) ⇒ {
+ logger.error("Error creating object model from %s payload".format(Tags.IMEventTag), error)
+ },
+
+ imEvent ⇒ {
+ val className = shortClassNameOf(imEvent)
+ val id = imEvent.id
+
+ // Let's decide if it is OK to store the event
+ // Remember that OK == None as the returning result
+ //
+ // NOTE: If anything goes wrong with this function, then the handler will issue a Resend, so
+ // do not bother to catch exceptions here.
+
+ // 1. Check if the same ID exists. Note that we use the ID sent by the event producer.
+ // It is a requirement that this ID is unique.
+ val store = aquarium.imEventStore
+ store.findIMEventById(id) match {
+ case Some(_) ⇒
+ // Reject the duplicate
+ Some(HandlerResultReject("Duplicate %s with id = %s".format(className, id)))
+
+ case None ⇒
+ // 2. Check that the new event is not older than our most recent event in DB.
+ // Sorry. We cannot tolerate out-of-order events here, since they really mess with the
+ // agreements selection and thus with the charging procedure.
+ //
+ // TODO: We really need to store these events anyway but somewhere else (BadEventsStore?)
+ val userID = imEvent.userID
+
+ store.findLatestIMEventByUserID(userID) match {
+ case Some(latestStoredEvent) ⇒
+ val occurredMillis = imEvent.occurredMillis
+ val latestOccurredMillis = latestStoredEvent.occurredMillis
+ if(occurredMillis < latestOccurredMillis) {
+ Some(HandlerResultReject(
+ "Out of order %s (%s < %s)".format(className, occurredMillis, latestOccurredMillis)))
+ } else {
+ None
+ }
+
+ case None ⇒
+ None
+ }
+ }
+ },
+
+ imEvent ⇒ {
+ aquarium.imEventStore.insertIMEvent(imEvent)
+ },
+
+ imEvent ⇒ {
+ aquarium.actorProvider.actorForRole(RouterRole) ! ProcessIMEvent(imEvent)
+ }
+ )
--- /dev/null
+/*
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.connector.handler
+
+import org.slf4j.Logger
+import gr.grnet.aquarium.store.{LocalFSEventStore, ResourceEventStore}
+import gr.grnet.aquarium.converter.JsonTextFormat
+import gr.grnet.aquarium.Aquarium
+import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
+import gr.grnet.aquarium.actor.message.event.ProcessResourceEvent
+import gr.grnet.aquarium.actor.RouterRole
+import gr.grnet.aquarium.util._
+
+/**
+ * A [[gr.grnet.aquarium.connector.handler.PayloadHandler]] for
+ * [[gr.grnet.aquarium.event.model.resource.ResourceEventModel]]s.
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+
+class ResourceEventPayloadHandler(aquarium: Aquarium, logger: Logger)
+ extends GenericPayloadHandler[ResourceEventModel, ResourceEventStore#ResourceEvent](
+ payload ⇒ {
+ aquarium.converters.convertEx[JsonTextFormat](payload)
+ },
+
+ (payload, jsonTextFormat) ⇒ {
+ },
+
+ (payload, error) ⇒ {
+ logger.error("Error creating JSON from %s payload".format(Tags.ResourceEventTag), error)
+
+ LocalFSEventStore.storeUnparsedResourceEvent(aquarium, payload, error)
+ },
+
+ jsonTextFormat ⇒ {
+ StdResourceEvent.fromJsonTextFormat(jsonTextFormat)
+ },
+
+ (payload, event) ⇒ {
+ LocalFSEventStore.storeResourceEvent(aquarium, event, payload)
+ },
+
+ (payload, error) ⇒ {
+ logger.error("Error creating object model from %s payload".format(Tags.ResourceEventTag), error)
+ },
+
+ rcEvent ⇒ {
+ val className = shortClassNameOf(rcEvent)
+ val id = rcEvent.id
+
+ // Let's decide if it is OK to store the event
+ // Remember that OK == None as the returning result
+ //
+ // NOTE: If anything goes wrong with this function, then the handler will issue a Resend, so
+ // do not bother to catch exceptions here.
+
+ // 1. Check if the same ID exists. Note that we use the ID sent by the event producer.
+ // It is a requirement that this ID is unique.
+ val store = aquarium.resourceEventStore
+ store.findResourceEventById(id) match {
+ case Some(_) ⇒
+ // Reject the duplicate
+ Some(HandlerResultReject("Duplicate %s with id = %s".format(className, id)))
+
+ case None ⇒
+ None
+ }
+ },
+
+ rcEvent ⇒ {
+ aquarium.resourceEventStore.insertResourceEvent(rcEvent)
+ },
+
+ rcEvent ⇒ {
+ aquarium.actorProvider.actorForRole(RouterRole) ! ProcessResourceEvent(rcEvent)
+ }
+ )
import gr.grnet.aquarium.{Aquarium, Configurable}
import gr.grnet.aquarium.connector.rabbitmq.conf.{TopicExchange, RabbitMQConsumerConf, RabbitMQExchangeType}
import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.RabbitMQConfKeys
-import gr.grnet.aquarium.converter.{JsonTextFormat, StdConverters}
-import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
-import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
+import gr.grnet.aquarium.converter.StdConverters
import gr.grnet.aquarium.actor.RouterRole
-import gr.grnet.aquarium.actor.message.event.{ProcessIMEvent, ProcessResourceEvent}
-import gr.grnet.aquarium.store.{LocalFSEventStore, IMEventStore, ResourceEventStore}
import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle, Tag}
-import gr.grnet.aquarium.util.shortInfoOf
import gr.grnet.aquarium.util.sameTags
-import gr.grnet.aquarium.connector.handler.{HandlerResultPanic, HandlerResult}
-import com.ckkloverdos.maybe.{Failed, Just, Maybe, MaybeEither}
import gr.grnet.aquarium.service.event.{StoreIsAliveBusEvent, StoreIsDeadBusEvent}
+import gr.grnet.aquarium.connector.handler.{ResourceEventPayloadHandler, IMEventPayloadHandler}
/**
*
}
private[this] def doConfigure(): Unit = {
- val jsonParser: (Array[Byte] ⇒ JsonTextFormat) = { payload ⇒
- converters.convertEx[JsonTextFormat](payload)
- }
-
- val rcEventParser: (JsonTextFormat ⇒ ResourceEventModel) = { jsonTextFormat ⇒
- StdResourceEvent.fromJsonTextFormat(jsonTextFormat)
- }
-
- val imEventParser: (JsonTextFormat ⇒ IMEventModel) = { jsonTextFormat ⇒
- StdIMEvent.fromJsonTextFormat(jsonTextFormat)
- }
-
- val rcForwardAction: (ResourceEventStore#ResourceEvent ⇒ Unit) = { rcEvent ⇒
- router ! ProcessResourceEvent(rcEvent)
- }
-
- val rcDebugForwardAction: (ResourceEventStore#ResourceEvent ⇒ Unit) = { rcEvent ⇒
- logger.info("Forwarding {}", rcEvent)
- }
+ val postNotifier = new PayloadHandlerPostNotifier(logger)
- val imForwardAction: (IMEventStore#IMEvent ⇒ Unit) = { imEvent ⇒
- router ! ProcessIMEvent(imEvent)
- }
-
- val imDebugForwardAction: (IMEventStore#IMEvent ⇒ Unit) = { imEvent ⇒
- logger.info("Forwarding {}", imEvent)
- }
+ val rcHandler = new ResourceEventPayloadHandler(aquarium, logger)
- val postNotifier = (consumer: RabbitMQConsumer, maybeResult: Maybe[HandlerResult]) ⇒ {
- maybeResult match {
- case Just(hr @ HandlerResultPanic) ⇒
- // The other end is crucial to the overall operation and it is in panic mode,
- // so we stop delivering messages until further notice
- logger.warn("Shutting down %s due to [%s]".format(consumer.toString, hr))
- consumer.setAllowReconnects(false)
- consumer.safeStop()
-
- case Failed(e) ⇒
- logger.warn("Shutting down %s due to [%s]".format(consumer.toString, shortInfoOf(e)))
- consumer.setAllowReconnects(false)
- consumer.safeStop()
-
- case _ ⇒
- }
- }
-
- val rcHandler = new GenericPayloadHandler[ResourceEventModel, ResourceEventStore#ResourceEvent](
- jsonParser,
- (payload, jsonTextFormat) ⇒ {},
- (payload, error) ⇒ {
- logger.error("Error creating JSON from %s payload".format(Tags.ResourceEventTag), error)
-
- LocalFSEventStore.storeUnparsedResourceEvent(aquarium, payload, error)
- },
- rcEventParser,
- (payload, event) ⇒ {
- LocalFSEventStore.storeResourceEvent(aquarium, event, payload)
- },
- (payload, error) ⇒ {
- logger.error("Error creating object model from %s payload".format(Tags.ResourceEventTag), error)
- },
- rcEvent ⇒ resourceEventStore.insertResourceEvent(rcEvent),
- rcForwardAction
- )
-
- val imHandler = new GenericPayloadHandler[IMEventModel, IMEventStore#IMEvent](
- jsonParser,
- (payload, jsonTextFormat) ⇒ {},
- (payload, error) ⇒ {
- logger.error("Error parsing JSON from %s payload".format(Tags.IMEventTag), error)
-
- LocalFSEventStore.storeUnparsedIMEvent(aquarium, payload, error)
- },
- imEventParser,
- (payload, event) ⇒ {
- LocalFSEventStore.storeIMEvent(aquarium, event, payload)
- },
- (payload, error) ⇒ {
- logger.error("Error creating object model from %s payload".format(Tags.IMEventTag), error)
- },
- imEvent ⇒ imEventStore.insertIMEvent(imEvent),
- imForwardAction
- )
+ val imHandler = new IMEventPayloadHandler(aquarium, logger)
val futureExecutor = new PayloadHandlerFutureExecutor