Log errors during the payload handling sequence
authorChristos KK Loverdos <loverdos@gmail.com>
Mon, 21 May 2012 13:29:50 +0000 (16:29 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Mon, 21 May 2012 13:29:50 +0000 (16:29 +0300)
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQConsumer.scala
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/GenericPayloadHandler.scala
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/RabbitMQService.scala

index 37c9cdb..d630b44 100644 (file)
@@ -329,26 +329,31 @@ class RabbitMQConsumer(val conf: RabbitMQConsumerConf,
           handlerResult
         }
 
-        val onSuccessBasicStepF: (HandlerResult ⇒ Unit) = {
-          case HandlerResultSuccess ⇒
+        val onSuccessBasicStepF: (HandlerResult ⇒ HandlerResult) = {
+          case result @ HandlerResultSuccess ⇒
             doWithChannel(_.basicAck(deliveryTag, false))
+            result
 
-          case HandlerResultResend ⇒
+          case result @ HandlerResultResend ⇒
             doWithChannel(_.basicNack(deliveryTag, false, true))
+            result
 
-          case HandlerResultReject(_) ⇒
+          case result @ HandlerResultReject(_) ⇒
             doWithChannel(_.basicReject(deliveryTag, false))
+            result
 
-          case HandlerResultRequeue(_) ⇒
+          case result @ HandlerResultRequeue(_) ⇒
             doWithChannel(_.basicReject(deliveryTag, true))
+            result
 
-          case HandlerResultPanic ⇒
+          case result @ HandlerResultPanic ⇒
             // Just inform RabbitMQ and subsequent actions will be made by the notifier.
             // So, this is a `HandlerResultResend` with extra semantics.
             doWithChannel(_.basicNack(deliveryTag, false, true))
+            result
         }
 
-        val onSuccessF = notifierF andThen onSuccessBasicStepF
+        val onSuccessF = onSuccessBasicStepF andThen notifierF
 
         executor.exec(body, handler) (onSuccessF) (onErrorF)
       }
index ed87153..7c35e3b 100644 (file)
@@ -62,6 +62,7 @@ class GenericPayloadHandler[E <: ExternalEventModel, S <: ExternalEventModel]
     (jsonParser: Array[Byte] ⇒ MaybeEither[JsonTextFormat],
      jsonParserErrorAction: (Array[Byte], Throwable) ⇒ Unit,
      eventParser: JsonTextFormat ⇒ E,
+     eventParserErrorAction: (Array[Byte], Throwable) ⇒ Unit,
      saveAction: E ⇒ S,
      forwardAction: S ⇒ Unit) extends PayloadHandler {
 
@@ -69,7 +70,7 @@ class GenericPayloadHandler[E <: ExternalEventModel, S <: ExternalEventModel]
     // 1. try to parse as json
     jsonParser(payload) match {
       case Failed(e) ⇒
-        jsonParserErrorAction(payload, e)
+        safeUnit(jsonParserErrorAction(payload, e))
 
         HandlerResultReject(e.getMessage)
 
@@ -77,6 +78,8 @@ class GenericPayloadHandler[E <: ExternalEventModel, S <: ExternalEventModel]
         // 2. try to parse as model
         MaybeEither { eventParser(jsonTextFormat) } match {
           case Failed(e) ⇒
+            safeUnit(eventParserErrorAction(payload, e))
+
             HandlerResultReject(e.getMessage)
 
           case Just(event) ⇒
index a2b925b..4412f5e 100644 (file)
@@ -140,16 +140,28 @@ class RabbitMQService extends Loggable with Lifecycle with Configurable {
 
     val rcHandler = new GenericPayloadHandler[ResourceEventModel, ResourceEventStore#ResourceEvent](
       jsonParser,
-      (payload, error) ⇒ LocalFSEventStore.storeUnparsedResourceEvent(aquarium, payload, error),
+      (payload, error) ⇒ {
+        LocalFSEventStore.storeUnparsedResourceEvent(aquarium, payload, error)
+        logger.error("Error creating JSON from %s payload".format(Tags.ResourceEventTag), error)
+      },
       rcEventParser,
+      (payload, error) ⇒ {
+        logger.error("Error creating object model from %s payload".format(Tags.ResourceEventTag), error)
+      },
       rcEvent ⇒ resourceEventStore.insertResourceEvent(rcEvent),
       rcDebugForwardAction
     )
 
     val imHandler = new GenericPayloadHandler[IMEventModel, IMEventStore#IMEvent](
       jsonParser,
-      (payload, error) ⇒ LocalFSEventStore.storeUnparsedIMEvent(aquarium, payload, error),
+      (payload, error) ⇒ {
+        LocalFSEventStore.storeUnparsedIMEvent(aquarium, payload, error)
+        logger.error("Error parsing JSON from %s payload".format(Tags.IMEventTag), error)
+      },
       imEventParser,
+      (payload, error) ⇒ {
+        logger.error("Error creating object model from %s payload".format(Tags.IMEventTag), error)
+      },
       imEvent ⇒ imEventStore.insertIMEvent(imEvent),
       imDebugForwardAction
     )