From b1fdb47617205093c57d11c9634e2eaea0604682 Mon Sep 17 00:00:00 2001 From: Christos KK Loverdos Date: Thu, 24 May 2012 13:43:21 +0300 Subject: [PATCH] Payload handlers made into classes --- .../GenericPayloadHandler.scala | 71 ++++++++--- .../connector/handler/IMEventPayloadHandler.scala | 131 ++++++++++++++++++++ .../handler/ResourceEventPayloadHandler.scala | 111 +++++++++++++++++ .../rabbitmq/service/RabbitMQService.scala | 94 +------------- 4 files changed, 298 insertions(+), 109 deletions(-) rename src/main/scala/gr/grnet/aquarium/connector/{rabbitmq/service => handler}/GenericPayloadHandler.scala (63%) create mode 100644 src/main/scala/gr/grnet/aquarium/connector/handler/IMEventPayloadHandler.scala create mode 100644 src/main/scala/gr/grnet/aquarium/connector/handler/ResourceEventPayloadHandler.scala diff --git a/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/GenericPayloadHandler.scala b/src/main/scala/gr/grnet/aquarium/connector/handler/GenericPayloadHandler.scala similarity index 63% rename from src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/GenericPayloadHandler.scala rename to src/main/scala/gr/grnet/aquarium/connector/handler/GenericPayloadHandler.scala index 74922cf..8e1d01c 100644 --- a/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/GenericPayloadHandler.scala +++ b/src/main/scala/gr/grnet/aquarium/connector/handler/GenericPayloadHandler.scala @@ -33,9 +33,7 @@ * or implied, of GRNET S.A. */ -package gr.grnet.aquarium.connector.rabbitmq.service - -import com.ckkloverdos.maybe.{Just, Failed, MaybeEither} +package gr.grnet.aquarium.connector.handler import gr.grnet.aquarium.converter.JsonTextFormat import gr.grnet.aquarium.connector.handler._ @@ -43,6 +41,7 @@ import gr.grnet.aquarium.event.model.ExternalEventModel import gr.grnet.aquarium.util.safeUnit import gr.grnet.aquarium.service.EventBusService import gr.grnet.aquarium.Aquarium +import com.ckkloverdos.maybe.{NoVal, Just, Failed, MaybeEither} /** * Generic handler of events arriving to Aquarium. @@ -59,18 +58,28 @@ import gr.grnet.aquarium.Aquarium */ class GenericPayloadHandler[E <: ExternalEventModel, S <: ExternalEventModel] - (jsonParser: Array[Byte] ⇒ JsonTextFormat, - onJsonParserSuccess: (Array[Byte], JsonTextFormat) ⇒ Unit, - onJsonParserError: (Array[Byte], Throwable) ⇒ Unit, - eventParser: JsonTextFormat ⇒ E, - onEventParserSuccess: (Array[Byte], E) ⇒ Unit, - onEventParserError: (Array[Byte], Throwable) ⇒ Unit, - saveAction: E ⇒ S, - forwardAction: S ⇒ Unit) extends PayloadHandler { +(jsonParser: Array[Byte] ⇒ JsonTextFormat, + onJsonParserSuccess: (Array[Byte], JsonTextFormat) ⇒ Unit, + onJsonParserError: (Array[Byte], Throwable) ⇒ Unit, + eventParser: JsonTextFormat ⇒ E, + onEventParserSuccess: (Array[Byte], E) ⇒ Unit, + onEventParserError: (Array[Byte], Throwable) ⇒ Unit, + + /** + * If a Some(handlerResult) is returned, then the handlerResult + * is also returned from this payload handler. + * + * This is a business check to ensure that everything is OK before saving to DB. + */ + preSaveAction: E ⇒ Option[HandlerResult], + saveAction: E ⇒ S, + forwardAction: S ⇒ Unit) extends PayloadHandler { def handlePayload(payload: Array[Byte]): HandlerResult = { // 1. try to parse as json - MaybeEither { jsonParser(payload) } match { + MaybeEither { + jsonParser(payload) + } match { case Failed(e) ⇒ safeUnit(onJsonParserError(payload, e)) @@ -80,7 +89,9 @@ class GenericPayloadHandler[E <: ExternalEventModel, S <: ExternalEventModel] safeUnit(onJsonParserSuccess(payload, jsonTextFormat)) // 2. try to parse as model - MaybeEither { eventParser(jsonTextFormat) } match { + MaybeEither { + eventParser(jsonTextFormat) + } match { case Failed(e) ⇒ safeUnit(onEventParserError(payload, e)) @@ -89,18 +100,38 @@ class GenericPayloadHandler[E <: ExternalEventModel, S <: ExternalEventModel] case Just(event) ⇒ safeUnit(onEventParserSuccess(payload, event)) - // 3. try to save to DB - MaybeEither { saveAction(event) } match { + // 3. See if we are ready to save to DB + MaybeEither { + preSaveAction(event) + } match { case Failed(e) ⇒ - HandlerResultPanic + // oops. must resend this message due to unexpected result + HandlerResultResend + + case Just(Some(handlerResult)) ⇒ + // Nope. Not ready to save. + handlerResult - case Just(s) ⇒ - // 4. try forward but it's OK if something bad happens here. - safeUnit { forwardAction(s) } + case Just(None) ⇒ + // Yep. Ready to save + // 4. try to save to DB + MaybeEither { + saveAction(event) + } match { + case Failed(e) ⇒ + HandlerResultPanic - HandlerResultSuccess + case Just(s) ⇒ + // 4. try forward but it's OK if something bad happens here. + safeUnit { + forwardAction(s) + } + + HandlerResultSuccess + } } + } } } diff --git a/src/main/scala/gr/grnet/aquarium/connector/handler/IMEventPayloadHandler.scala b/src/main/scala/gr/grnet/aquarium/connector/handler/IMEventPayloadHandler.scala new file mode 100644 index 0000000..cf3c19a --- /dev/null +++ b/src/main/scala/gr/grnet/aquarium/connector/handler/IMEventPayloadHandler.scala @@ -0,0 +1,131 @@ +/* + * Copyright 2011-2012 GRNET S.A. All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS + * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and + * documentation are those of the authors and should not be + * interpreted as representing official policies, either expressed + * or implied, of GRNET S.A. + */ + +package gr.grnet.aquarium.connector.handler + +import gr.grnet.aquarium.Aquarium +import org.slf4j.Logger +import gr.grnet.aquarium.converter.JsonTextFormat +import gr.grnet.aquarium.util.{Tags, shortClassNameOf} +import gr.grnet.aquarium.actor.RouterRole +import gr.grnet.aquarium.store.{IMEventStore, LocalFSEventStore} +import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel} +import gr.grnet.aquarium.actor.message.event.ProcessIMEvent + +/** + * A [[gr.grnet.aquarium.connector.handler.PayloadHandler]] for + * [[gr.grnet.aquarium.event.model.im.IMEventModel]]s. + * + * @author Christos KK Loverdos + */ + +class IMEventPayloadHandler(aquarium: Aquarium, logger: Logger) + extends GenericPayloadHandler[IMEventModel, IMEventStore#IMEvent]( + payload ⇒ { + aquarium.converters.convertEx[JsonTextFormat](payload) + }, + + (payload, jsonTextFormat) ⇒ { + }, + + (payload, error) ⇒ { + logger.error("Error creating JSON from %s payload".format(Tags.IMEventTag), error) + + LocalFSEventStore.storeUnparsedIMEvent(aquarium, payload, error) + }, + + jsonTextFormat ⇒ { + StdIMEvent.fromJsonTextFormat(jsonTextFormat) + }, + + (payload, event) ⇒ { + LocalFSEventStore.storeIMEvent(aquarium, event, payload) + }, + + (payload, error) ⇒ { + logger.error("Error creating object model from %s payload".format(Tags.IMEventTag), error) + }, + + imEvent ⇒ { + val className = shortClassNameOf(imEvent) + val id = imEvent.id + + // Let's decide if it is OK to store the event + // Remember that OK == None as the returning result + // + // NOTE: If anything goes wrong with this function, then the handler will issue a Resend, so + // do not bother to catch exceptions here. + + // 1. Check if the same ID exists. Note that we use the ID sent by the event producer. + // It is a requirement that this ID is unique. + val store = aquarium.imEventStore + store.findIMEventById(id) match { + case Some(_) ⇒ + // Reject the duplicate + Some(HandlerResultReject("Duplicate %s with id = %s".format(className, id))) + + case None ⇒ + // 2. Check that the new event is not older than our most recent event in DB. + // Sorry. We cannot tolerate out-of-order events here, since they really mess with the + // agreements selection and thus with the charging procedure. + // + // TODO: We really need to store these events anyway but somewhere else (BadEventsStore?) + val userID = imEvent.userID + + store.findLatestIMEventByUserID(userID) match { + case Some(latestStoredEvent) ⇒ + val occurredMillis = imEvent.occurredMillis + val latestOccurredMillis = latestStoredEvent.occurredMillis + if(occurredMillis < latestOccurredMillis) { + Some(HandlerResultReject( + "Out of order %s (%s < %s)".format(className, occurredMillis, latestOccurredMillis))) + } else { + None + } + + case None ⇒ + None + } + } + }, + + imEvent ⇒ { + aquarium.imEventStore.insertIMEvent(imEvent) + }, + + imEvent ⇒ { + aquarium.actorProvider.actorForRole(RouterRole) ! ProcessIMEvent(imEvent) + } + ) diff --git a/src/main/scala/gr/grnet/aquarium/connector/handler/ResourceEventPayloadHandler.scala b/src/main/scala/gr/grnet/aquarium/connector/handler/ResourceEventPayloadHandler.scala new file mode 100644 index 0000000..f23b4c5 --- /dev/null +++ b/src/main/scala/gr/grnet/aquarium/connector/handler/ResourceEventPayloadHandler.scala @@ -0,0 +1,111 @@ +/* + * Copyright 2011-2012 GRNET S.A. All rights reserved. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS + * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF + * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and + * documentation are those of the authors and should not be + * interpreted as representing official policies, either expressed + * or implied, of GRNET S.A. + */ + +package gr.grnet.aquarium.connector.handler + +import org.slf4j.Logger +import gr.grnet.aquarium.store.{LocalFSEventStore, ResourceEventStore} +import gr.grnet.aquarium.converter.JsonTextFormat +import gr.grnet.aquarium.Aquarium +import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel} +import gr.grnet.aquarium.actor.message.event.ProcessResourceEvent +import gr.grnet.aquarium.actor.RouterRole +import gr.grnet.aquarium.util._ + +/** + * A [[gr.grnet.aquarium.connector.handler.PayloadHandler]] for + * [[gr.grnet.aquarium.event.model.resource.ResourceEventModel]]s. + * + * @author Christos KK Loverdos + */ + +class ResourceEventPayloadHandler(aquarium: Aquarium, logger: Logger) + extends GenericPayloadHandler[ResourceEventModel, ResourceEventStore#ResourceEvent]( + payload ⇒ { + aquarium.converters.convertEx[JsonTextFormat](payload) + }, + + (payload, jsonTextFormat) ⇒ { + }, + + (payload, error) ⇒ { + logger.error("Error creating JSON from %s payload".format(Tags.ResourceEventTag), error) + + LocalFSEventStore.storeUnparsedResourceEvent(aquarium, payload, error) + }, + + jsonTextFormat ⇒ { + StdResourceEvent.fromJsonTextFormat(jsonTextFormat) + }, + + (payload, event) ⇒ { + LocalFSEventStore.storeResourceEvent(aquarium, event, payload) + }, + + (payload, error) ⇒ { + logger.error("Error creating object model from %s payload".format(Tags.ResourceEventTag), error) + }, + + rcEvent ⇒ { + val className = shortClassNameOf(rcEvent) + val id = rcEvent.id + + // Let's decide if it is OK to store the event + // Remember that OK == None as the returning result + // + // NOTE: If anything goes wrong with this function, then the handler will issue a Resend, so + // do not bother to catch exceptions here. + + // 1. Check if the same ID exists. Note that we use the ID sent by the event producer. + // It is a requirement that this ID is unique. + val store = aquarium.resourceEventStore + store.findResourceEventById(id) match { + case Some(_) ⇒ + // Reject the duplicate + Some(HandlerResultReject("Duplicate %s with id = %s".format(className, id))) + + case None ⇒ + None + } + }, + + rcEvent ⇒ { + aquarium.resourceEventStore.insertResourceEvent(rcEvent) + }, + + rcEvent ⇒ { + aquarium.actorProvider.actorForRole(RouterRole) ! ProcessResourceEvent(rcEvent) + } + ) diff --git a/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/RabbitMQService.scala b/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/RabbitMQService.scala index f4d5e45..924a0f2 100644 --- a/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/RabbitMQService.scala +++ b/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/RabbitMQService.scala @@ -43,19 +43,13 @@ import com.rabbitmq.client.Address import gr.grnet.aquarium.{Aquarium, Configurable} import gr.grnet.aquarium.connector.rabbitmq.conf.{TopicExchange, RabbitMQConsumerConf, RabbitMQExchangeType} import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.RabbitMQConfKeys -import gr.grnet.aquarium.converter.{JsonTextFormat, StdConverters} -import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel} -import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel} +import gr.grnet.aquarium.converter.StdConverters import gr.grnet.aquarium.actor.RouterRole -import gr.grnet.aquarium.actor.message.event.{ProcessIMEvent, ProcessResourceEvent} -import gr.grnet.aquarium.store.{LocalFSEventStore, IMEventStore, ResourceEventStore} import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle, Tag} -import gr.grnet.aquarium.util.shortInfoOf import gr.grnet.aquarium.util.sameTags -import gr.grnet.aquarium.connector.handler.{HandlerResultPanic, HandlerResult} -import com.ckkloverdos.maybe.{Failed, Just, Maybe, MaybeEither} import gr.grnet.aquarium.service.event.{StoreIsAliveBusEvent, StoreIsDeadBusEvent} +import gr.grnet.aquarium.connector.handler.{ResourceEventPayloadHandler, IMEventPayloadHandler} /** * @@ -92,89 +86,11 @@ class RabbitMQService extends Loggable with Lifecycle with Configurable { } private[this] def doConfigure(): Unit = { - val jsonParser: (Array[Byte] ⇒ JsonTextFormat) = { payload ⇒ - converters.convertEx[JsonTextFormat](payload) - } - - val rcEventParser: (JsonTextFormat ⇒ ResourceEventModel) = { jsonTextFormat ⇒ - StdResourceEvent.fromJsonTextFormat(jsonTextFormat) - } - - val imEventParser: (JsonTextFormat ⇒ IMEventModel) = { jsonTextFormat ⇒ - StdIMEvent.fromJsonTextFormat(jsonTextFormat) - } - - val rcForwardAction: (ResourceEventStore#ResourceEvent ⇒ Unit) = { rcEvent ⇒ - router ! ProcessResourceEvent(rcEvent) - } - - val rcDebugForwardAction: (ResourceEventStore#ResourceEvent ⇒ Unit) = { rcEvent ⇒ - logger.info("Forwarding {}", rcEvent) - } + val postNotifier = new PayloadHandlerPostNotifier(logger) - val imForwardAction: (IMEventStore#IMEvent ⇒ Unit) = { imEvent ⇒ - router ! ProcessIMEvent(imEvent) - } - - val imDebugForwardAction: (IMEventStore#IMEvent ⇒ Unit) = { imEvent ⇒ - logger.info("Forwarding {}", imEvent) - } + val rcHandler = new ResourceEventPayloadHandler(aquarium, logger) - val postNotifier = (consumer: RabbitMQConsumer, maybeResult: Maybe[HandlerResult]) ⇒ { - maybeResult match { - case Just(hr @ HandlerResultPanic) ⇒ - // The other end is crucial to the overall operation and it is in panic mode, - // so we stop delivering messages until further notice - logger.warn("Shutting down %s due to [%s]".format(consumer.toString, hr)) - consumer.setAllowReconnects(false) - consumer.safeStop() - - case Failed(e) ⇒ - logger.warn("Shutting down %s due to [%s]".format(consumer.toString, shortInfoOf(e))) - consumer.setAllowReconnects(false) - consumer.safeStop() - - case _ ⇒ - } - } - - val rcHandler = new GenericPayloadHandler[ResourceEventModel, ResourceEventStore#ResourceEvent]( - jsonParser, - (payload, jsonTextFormat) ⇒ {}, - (payload, error) ⇒ { - logger.error("Error creating JSON from %s payload".format(Tags.ResourceEventTag), error) - - LocalFSEventStore.storeUnparsedResourceEvent(aquarium, payload, error) - }, - rcEventParser, - (payload, event) ⇒ { - LocalFSEventStore.storeResourceEvent(aquarium, event, payload) - }, - (payload, error) ⇒ { - logger.error("Error creating object model from %s payload".format(Tags.ResourceEventTag), error) - }, - rcEvent ⇒ resourceEventStore.insertResourceEvent(rcEvent), - rcForwardAction - ) - - val imHandler = new GenericPayloadHandler[IMEventModel, IMEventStore#IMEvent]( - jsonParser, - (payload, jsonTextFormat) ⇒ {}, - (payload, error) ⇒ { - logger.error("Error parsing JSON from %s payload".format(Tags.IMEventTag), error) - - LocalFSEventStore.storeUnparsedIMEvent(aquarium, payload, error) - }, - imEventParser, - (payload, event) ⇒ { - LocalFSEventStore.storeIMEvent(aquarium, event, payload) - }, - (payload, error) ⇒ { - logger.error("Error creating object model from %s payload".format(Tags.IMEventTag), error) - }, - imEvent ⇒ imEventStore.insertIMEvent(imEvent), - imForwardAction - ) + val imHandler = new IMEventPayloadHandler(aquarium, logger) val futureExecutor = new PayloadHandlerFutureExecutor -- 1.7.10.4