Refine event payload handling
[aquarium] / src / main / scala / gr / grnet / aquarium / connector / rabbitmq / RabbitMQConsumer.scala
index 052d149..37c9cdb 100644 (file)
@@ -45,8 +45,8 @@ import gr.grnet.aquarium.connector.rabbitmq.eventbus.RabbitMQError
 import gr.grnet.aquarium.service.event.BusEvent
 import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.{RabbitMQConKeys, RabbitMQQueueKeys, RabbitMQExchangeKeys, RabbitMQChannelKeys}
 import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
-import gr.grnet.aquarium.connector.handler.{HandlerResult, PayloadHandlerExecutor, HandlerResultPanic, HandlerResultRequeue, HandlerResultReject, HandlerResultSuccess, PayloadHandler}
 import com.ckkloverdos.maybe.{Maybe, Just, Failed, MaybeEither}
+import gr.grnet.aquarium.connector.handler.{HandlerResultResend, HandlerResult, PayloadHandlerExecutor, HandlerResultPanic, HandlerResultRequeue, HandlerResultReject, HandlerResultSuccess, PayloadHandler}
 
 /**
  * A basic `RabbitMQ` consumer. Sufficiently generalized, sufficiently tied to Aquarium.
@@ -333,6 +333,9 @@ class RabbitMQConsumer(val conf: RabbitMQConsumerConf,
           case HandlerResultSuccess ⇒
             doWithChannel(_.basicAck(deliveryTag, false))
 
+          case HandlerResultResend ⇒
+            doWithChannel(_.basicNack(deliveryTag, false, true))
+
           case HandlerResultReject(_) ⇒
             doWithChannel(_.basicReject(deliveryTag, false))
 
@@ -340,7 +343,9 @@ class RabbitMQConsumer(val conf: RabbitMQConsumerConf,
             doWithChannel(_.basicReject(deliveryTag, true))
 
           case HandlerResultPanic ⇒
-            // We do not handle panic here. It will be handled by the notifier.
+            // Just inform RabbitMQ and subsequent actions will be made by the notifier.
+            // So, this is a `HandlerResultResend` with extra semantics.
+            doWithChannel(_.basicNack(deliveryTag, false, true))
         }
 
         val onSuccessF = notifierF andThen onSuccessBasicStepF