Refine event payload handling
authorChristos KK Loverdos <loverdos@gmail.com>
Mon, 21 May 2012 09:53:32 +0000 (12:53 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Mon, 21 May 2012 09:53:32 +0000 (12:53 +0300)
.gitignore
src/main/resources/logback.xml
src/main/scala/gr/grnet/aquarium/connector/handler/HandlerResult.scala
src/main/scala/gr/grnet/aquarium/connector/handler/HandlerResultPanic.scala
src/main/scala/gr/grnet/aquarium/connector/handler/HandlerResultReject.scala
src/main/scala/gr/grnet/aquarium/connector/handler/HandlerResultRequeue.scala
src/main/scala/gr/grnet/aquarium/connector/handler/HandlerResultResend.scala [new file with mode: 0644]
src/main/scala/gr/grnet/aquarium/connector/handler/HandlerResultSuccess.scala
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQConsumer.scala
src/main/scala/gr/grnet/aquarium/service/StoreWatcherService.scala

index 98df25b..0c92e4f 100644 (file)
@@ -43,3 +43,5 @@ events-store/
 __*
 AQUARIUM_HOME_IS_UNDEFINED
 aquarium.home_IS_UNDEFINED
+.gradle/
+
index 3db6f88..30b0b15 100644 (file)
@@ -14,7 +14,7 @@
       <maxHistory>30</maxHistory>
     </rollingPolicy>
     <encoder>
-      <pattern>%-5level %d{HH:mm:ss.SSS} %36logger{36} - %msg%n</pattern>
+      <pattern>%-5level %d{yyyy-MM-dd HH:mm:ss.SSS} %38logger{38} - %msg%n</pattern>
     </encoder>
   </appender>
 
index abbca21..b1259eb 100644 (file)
@@ -36,6 +36,9 @@
 package gr.grnet.aquarium.connector.handler
 
 /**
+ * Signifies what is the result of handling an event payload.
+ *
+ * This result is returned from a [[gr.grnet.aquarium.connector.handler.PayloadHandler]].
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
index fc9ec36..3aaa999 100644 (file)
 package gr.grnet.aquarium.connector.handler
 
 /**
+ * Signifies that the event payload must be sent again, preserving order.
+ *
+ * Additionally, it provides the extra info that the other end (the one that handled the message) had some very
+ * serious error.
+ *
+ * In effect, this is a refinement of [[gr.grnet.aquarium.connector.handler.HandlerResultResend]].
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
index 33de56c..4eae7f4 100644 (file)
@@ -36,6 +36,7 @@
 package gr.grnet.aquarium.connector.handler
 
 /**
+ * Signifies that the event payload must be rejected and never be sent again.
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
index 39b7ffa..3d0b909 100644 (file)
@@ -36,6 +36,7 @@
 package gr.grnet.aquarium.connector.handler
 
 /**
+ * Signifies that the event payload must be sent again, possibly in a different order.
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
diff --git a/src/main/scala/gr/grnet/aquarium/connector/handler/HandlerResultResend.scala b/src/main/scala/gr/grnet/aquarium/connector/handler/HandlerResultResend.scala
new file mode 100644 (file)
index 0000000..845e746
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.connector.handler
+
+/**
+ * Signifies that the event payload must be sent again, preserving order.
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+
+case object HandlerResultResend extends HandlerResult
index dc88840..1d2839f 100644 (file)
@@ -36,6 +36,7 @@
 package gr.grnet.aquarium.connector.handler
 
 /**
+ * Signifies that the event payload was processed successfully.
  *
  * @author Christos KK Loverdos <loverdos@gmail.com>
  */
index 052d149..37c9cdb 100644 (file)
@@ -45,8 +45,8 @@ import gr.grnet.aquarium.connector.rabbitmq.eventbus.RabbitMQError
 import gr.grnet.aquarium.service.event.BusEvent
 import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.{RabbitMQConKeys, RabbitMQQueueKeys, RabbitMQExchangeKeys, RabbitMQChannelKeys}
 import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean}
-import gr.grnet.aquarium.connector.handler.{HandlerResult, PayloadHandlerExecutor, HandlerResultPanic, HandlerResultRequeue, HandlerResultReject, HandlerResultSuccess, PayloadHandler}
 import com.ckkloverdos.maybe.{Maybe, Just, Failed, MaybeEither}
+import gr.grnet.aquarium.connector.handler.{HandlerResultResend, HandlerResult, PayloadHandlerExecutor, HandlerResultPanic, HandlerResultRequeue, HandlerResultReject, HandlerResultSuccess, PayloadHandler}
 
 /**
  * A basic `RabbitMQ` consumer. Sufficiently generalized, sufficiently tied to Aquarium.
@@ -333,6 +333,9 @@ class RabbitMQConsumer(val conf: RabbitMQConsumerConf,
           case HandlerResultSuccess ⇒
             doWithChannel(_.basicAck(deliveryTag, false))
 
+          case HandlerResultResend ⇒
+            doWithChannel(_.basicNack(deliveryTag, false, true))
+
           case HandlerResultReject(_) ⇒
             doWithChannel(_.basicReject(deliveryTag, false))
 
@@ -340,7 +343,9 @@ class RabbitMQConsumer(val conf: RabbitMQConsumerConf,
             doWithChannel(_.basicReject(deliveryTag, true))
 
           case HandlerResultPanic ⇒
-            // We do not handle panic here. It will be handled by the notifier.
+            // Just inform RabbitMQ and subsequent actions will be made by the notifier.
+            // So, this is a `HandlerResultResend` with extra semantics.
+            doWithChannel(_.basicNack(deliveryTag, false, true))
         }
 
         val onSuccessF = notifierF andThen onSuccessBasicStepF
index 4da7a31..2dd2812 100644 (file)
@@ -82,14 +82,14 @@ final class StoreWatcherService extends Lifecycle with Configurable with Loggabl
       // No exception happened, so we are alive
       setStatus(true)
       if(!wasAlive) {
-        logger.info("Reconnected %s store".format(tag))
+        logger.info("Reconnected store for %s".format(tag))
         aquarium.eventBus ! StoreIsAliveBusEvent(tag)
       }
     }
     catch {
       case e: Throwable ⇒
         setStatus(false)
-        logger.info("Store %s detected down".format(tag))
+        logger.info("Store for %s detected down".format(tag))
         aquarium.eventBus ! StoreIsDeadBusEvent(tag)
     }
   }