From: Christos KK Loverdos Date: Mon, 21 May 2012 13:29:50 +0000 (+0300) Subject: Log errors during the payload handling sequence X-Git-Url: https://code.grnet.gr/git/aquarium/commitdiff_plain/102c12a0eea30948e44e84e6489bce5c6d318cfa Log errors during the payload handling sequence --- diff --git a/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQConsumer.scala b/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQConsumer.scala index 37c9cdb..d630b44 100644 --- a/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQConsumer.scala +++ b/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQConsumer.scala @@ -329,26 +329,31 @@ class RabbitMQConsumer(val conf: RabbitMQConsumerConf, handlerResult } - val onSuccessBasicStepF: (HandlerResult ⇒ Unit) = { - case HandlerResultSuccess ⇒ + val onSuccessBasicStepF: (HandlerResult ⇒ HandlerResult) = { + case result @ HandlerResultSuccess ⇒ doWithChannel(_.basicAck(deliveryTag, false)) + result - case HandlerResultResend ⇒ + case result @ HandlerResultResend ⇒ doWithChannel(_.basicNack(deliveryTag, false, true)) + result - case HandlerResultReject(_) ⇒ + case result @ HandlerResultReject(_) ⇒ doWithChannel(_.basicReject(deliveryTag, false)) + result - case HandlerResultRequeue(_) ⇒ + case result @ HandlerResultRequeue(_) ⇒ doWithChannel(_.basicReject(deliveryTag, true)) + result - case HandlerResultPanic ⇒ + case result @ HandlerResultPanic ⇒ // Just inform RabbitMQ and subsequent actions will be made by the notifier. // So, this is a `HandlerResultResend` with extra semantics. doWithChannel(_.basicNack(deliveryTag, false, true)) + result } - val onSuccessF = notifierF andThen onSuccessBasicStepF + val onSuccessF = onSuccessBasicStepF andThen notifierF executor.exec(body, handler) (onSuccessF) (onErrorF) } diff --git a/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/GenericPayloadHandler.scala b/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/GenericPayloadHandler.scala index ed87153..7c35e3b 100644 --- a/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/GenericPayloadHandler.scala +++ b/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/GenericPayloadHandler.scala @@ -62,6 +62,7 @@ class GenericPayloadHandler[E <: ExternalEventModel, S <: ExternalEventModel] (jsonParser: Array[Byte] ⇒ MaybeEither[JsonTextFormat], jsonParserErrorAction: (Array[Byte], Throwable) ⇒ Unit, eventParser: JsonTextFormat ⇒ E, + eventParserErrorAction: (Array[Byte], Throwable) ⇒ Unit, saveAction: E ⇒ S, forwardAction: S ⇒ Unit) extends PayloadHandler { @@ -69,7 +70,7 @@ class GenericPayloadHandler[E <: ExternalEventModel, S <: ExternalEventModel] // 1. try to parse as json jsonParser(payload) match { case Failed(e) ⇒ - jsonParserErrorAction(payload, e) + safeUnit(jsonParserErrorAction(payload, e)) HandlerResultReject(e.getMessage) @@ -77,6 +78,8 @@ class GenericPayloadHandler[E <: ExternalEventModel, S <: ExternalEventModel] // 2. try to parse as model MaybeEither { eventParser(jsonTextFormat) } match { case Failed(e) ⇒ + safeUnit(eventParserErrorAction(payload, e)) + HandlerResultReject(e.getMessage) case Just(event) ⇒ diff --git a/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/RabbitMQService.scala b/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/RabbitMQService.scala index a2b925b..4412f5e 100644 --- a/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/RabbitMQService.scala +++ b/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/RabbitMQService.scala @@ -140,16 +140,28 @@ class RabbitMQService extends Loggable with Lifecycle with Configurable { val rcHandler = new GenericPayloadHandler[ResourceEventModel, ResourceEventStore#ResourceEvent]( jsonParser, - (payload, error) ⇒ LocalFSEventStore.storeUnparsedResourceEvent(aquarium, payload, error), + (payload, error) ⇒ { + LocalFSEventStore.storeUnparsedResourceEvent(aquarium, payload, error) + logger.error("Error creating JSON from %s payload".format(Tags.ResourceEventTag), error) + }, rcEventParser, + (payload, error) ⇒ { + logger.error("Error creating object model from %s payload".format(Tags.ResourceEventTag), error) + }, rcEvent ⇒ resourceEventStore.insertResourceEvent(rcEvent), rcDebugForwardAction ) val imHandler = new GenericPayloadHandler[IMEventModel, IMEventStore#IMEvent]( jsonParser, - (payload, error) ⇒ LocalFSEventStore.storeUnparsedIMEvent(aquarium, payload, error), + (payload, error) ⇒ { + LocalFSEventStore.storeUnparsedIMEvent(aquarium, payload, error) + logger.error("Error parsing JSON from %s payload".format(Tags.IMEventTag), error) + }, imEventParser, + (payload, error) ⇒ { + logger.error("Error creating object model from %s payload".format(Tags.IMEventTag), error) + }, imEvent ⇒ imEventStore.insertIMEvent(imEvent), imDebugForwardAction )