Payload handlers made into classes
authorChristos KK Loverdos <loverdos@gmail.com>
Thu, 24 May 2012 10:43:21 +0000 (13:43 +0300)
committerChristos KK Loverdos <loverdos@gmail.com>
Thu, 24 May 2012 10:43:21 +0000 (13:43 +0300)
src/main/scala/gr/grnet/aquarium/connector/handler/GenericPayloadHandler.scala [moved from src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/GenericPayloadHandler.scala with 63% similarity]
src/main/scala/gr/grnet/aquarium/connector/handler/IMEventPayloadHandler.scala [new file with mode: 0644]
src/main/scala/gr/grnet/aquarium/connector/handler/ResourceEventPayloadHandler.scala [new file with mode: 0644]
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/RabbitMQService.scala

@@ -33,9 +33,7 @@
  * or implied, of GRNET S.A.
  */
 
-package gr.grnet.aquarium.connector.rabbitmq.service
-
-import com.ckkloverdos.maybe.{Just, Failed, MaybeEither}
+package gr.grnet.aquarium.connector.handler
 
 import gr.grnet.aquarium.converter.JsonTextFormat
 import gr.grnet.aquarium.connector.handler._
@@ -43,6 +41,7 @@ import gr.grnet.aquarium.event.model.ExternalEventModel
 import gr.grnet.aquarium.util.safeUnit
 import gr.grnet.aquarium.service.EventBusService
 import gr.grnet.aquarium.Aquarium
+import com.ckkloverdos.maybe.{NoVal, Just, Failed, MaybeEither}
 
 /**
  * Generic handler of events arriving to Aquarium.
@@ -59,18 +58,28 @@ import gr.grnet.aquarium.Aquarium
  */
 
 class GenericPayloadHandler[E <: ExternalEventModel, S <: ExternalEventModel]
-    (jsonParser: Array[Byte] ⇒ JsonTextFormat,
-     onJsonParserSuccess: (Array[Byte], JsonTextFormat) ⇒ Unit,
-     onJsonParserError: (Array[Byte], Throwable) ⇒ Unit,
-     eventParser: JsonTextFormat ⇒ E,
-     onEventParserSuccess: (Array[Byte], E) ⇒ Unit,
-     onEventParserError: (Array[Byte], Throwable) ⇒ Unit,
-     saveAction: E ⇒ S,
-     forwardAction: S ⇒ Unit) extends PayloadHandler {
+(jsonParser: Array[Byte] ⇒ JsonTextFormat,
+ onJsonParserSuccess: (Array[Byte], JsonTextFormat) ⇒ Unit,
+ onJsonParserError: (Array[Byte], Throwable) ⇒ Unit,
+ eventParser: JsonTextFormat ⇒ E,
+ onEventParserSuccess: (Array[Byte], E) ⇒ Unit,
+ onEventParserError: (Array[Byte], Throwable) ⇒ Unit,
+
+ /**
+  * If a Some(handlerResult) is returned, then the handlerResult
+  * is also returned from this payload handler.
+  *
+  * This is a business check to ensure that everything is OK before saving to DB.
+  */
+ preSaveAction: E ⇒ Option[HandlerResult],
+ saveAction: E ⇒ S,
+ forwardAction: S ⇒ Unit) extends PayloadHandler {
 
   def handlePayload(payload: Array[Byte]): HandlerResult = {
     // 1. try to parse as json
-    MaybeEither { jsonParser(payload) } match {
+    MaybeEither {
+      jsonParser(payload)
+    } match {
       case Failed(e) ⇒
         safeUnit(onJsonParserError(payload, e))
 
@@ -80,7 +89,9 @@ class GenericPayloadHandler[E <: ExternalEventModel, S <: ExternalEventModel]
         safeUnit(onJsonParserSuccess(payload, jsonTextFormat))
 
         // 2. try to parse as model
-        MaybeEither { eventParser(jsonTextFormat) } match {
+        MaybeEither {
+          eventParser(jsonTextFormat)
+        } match {
           case Failed(e) ⇒
             safeUnit(onEventParserError(payload, e))
 
@@ -89,18 +100,38 @@ class GenericPayloadHandler[E <: ExternalEventModel, S <: ExternalEventModel]
           case Just(event) ⇒
             safeUnit(onEventParserSuccess(payload, event))
 
-            // 3. try to save to DB
-            MaybeEither { saveAction(event) } match {
+            // 3. See if we are ready to save to DB
+            MaybeEither {
+              preSaveAction(event)
+            } match {
               case Failed(e) ⇒
-                HandlerResultPanic
+                // oops. must resend this message due to unexpected result
+                HandlerResultResend
+
+              case Just(Some(handlerResult)) ⇒
+                // Nope. Not ready to save.
+                handlerResult
 
-              case Just(s) ⇒
-                // 4. try forward but it's OK if something bad happens here.
-                safeUnit { forwardAction(s) }
+              case Just(None) ⇒
+                // Yep. Ready to save
+                // 4. try to save to DB
+                MaybeEither {
+                  saveAction(event)
+                } match {
+                  case Failed(e) ⇒
+                    HandlerResultPanic
 
-                HandlerResultSuccess
+                  case Just(s) ⇒
+                    // 4. try forward but it's OK if something bad happens here.
+                    safeUnit {
+                      forwardAction(s)
+                    }
+
+                    HandlerResultSuccess
+                }
             }
 
+
         }
     }
   }
diff --git a/src/main/scala/gr/grnet/aquarium/connector/handler/IMEventPayloadHandler.scala b/src/main/scala/gr/grnet/aquarium/connector/handler/IMEventPayloadHandler.scala
new file mode 100644 (file)
index 0000000..cf3c19a
--- /dev/null
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.connector.handler
+
+import gr.grnet.aquarium.Aquarium
+import org.slf4j.Logger
+import gr.grnet.aquarium.converter.JsonTextFormat
+import gr.grnet.aquarium.util.{Tags, shortClassNameOf}
+import gr.grnet.aquarium.actor.RouterRole
+import gr.grnet.aquarium.store.{IMEventStore, LocalFSEventStore}
+import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
+import gr.grnet.aquarium.actor.message.event.ProcessIMEvent
+
+/**
+ * A [[gr.grnet.aquarium.connector.handler.PayloadHandler]] for
+ * [[gr.grnet.aquarium.event.model.im.IMEventModel]]s.
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+
+class IMEventPayloadHandler(aquarium: Aquarium, logger: Logger)
+  extends GenericPayloadHandler[IMEventModel, IMEventStore#IMEvent](
+      payload ⇒ {
+        aquarium.converters.convertEx[JsonTextFormat](payload)
+      },
+
+      (payload, jsonTextFormat) ⇒ {
+      },
+
+      (payload, error) ⇒ {
+        logger.error("Error creating JSON from %s payload".format(Tags.IMEventTag), error)
+
+        LocalFSEventStore.storeUnparsedIMEvent(aquarium, payload, error)
+      },
+
+      jsonTextFormat ⇒ {
+        StdIMEvent.fromJsonTextFormat(jsonTextFormat)
+      },
+
+      (payload, event) ⇒ {
+        LocalFSEventStore.storeIMEvent(aquarium, event, payload)
+      },
+
+      (payload, error) ⇒ {
+        logger.error("Error creating object model from %s payload".format(Tags.IMEventTag), error)
+      },
+
+      imEvent ⇒ {
+        val className = shortClassNameOf(imEvent)
+        val id = imEvent.id
+
+        // Let's decide if it is OK to store the event
+        // Remember that OK == None as the returning result
+        //
+        // NOTE: If anything goes wrong with this function, then the handler will issue a Resend, so
+        //       do not bother to catch exceptions here.
+
+        // 1. Check if the same ID exists. Note that we use the ID sent by the event producer.
+        //    It is a requirement that this ID is unique.
+        val store = aquarium.imEventStore
+        store.findIMEventById(id) match {
+         case Some(_) ⇒
+           // Reject the duplicate
+           Some(HandlerResultReject("Duplicate %s with id = %s".format(className, id)))
+
+         case None ⇒
+           // 2. Check that the new event is not older than our most recent event in DB.
+           //    Sorry. We cannot tolerate out-of-order events here, since they really mess with the
+           //    agreements selection and thus with the charging procedure.
+           //
+           // TODO: We really need to store these events anyway but somewhere else (BadEventsStore?)
+           val userID = imEvent.userID
+
+           store.findLatestIMEventByUserID(userID) match {
+             case Some(latestStoredEvent) ⇒
+               val occurredMillis = imEvent.occurredMillis
+               val latestOccurredMillis = latestStoredEvent.occurredMillis
+               if(occurredMillis < latestOccurredMillis) {
+                 Some(HandlerResultReject(
+                   "Out of order %s (%s < %s)".format(className, occurredMillis, latestOccurredMillis)))
+               } else {
+                 None
+               }
+
+             case None ⇒
+               None
+           }
+       }
+      },
+
+      imEvent ⇒ {
+        aquarium.imEventStore.insertIMEvent(imEvent)
+      },
+
+      imEvent ⇒ {
+        aquarium.actorProvider.actorForRole(RouterRole) ! ProcessIMEvent(imEvent)
+      }
+    )
diff --git a/src/main/scala/gr/grnet/aquarium/connector/handler/ResourceEventPayloadHandler.scala b/src/main/scala/gr/grnet/aquarium/connector/handler/ResourceEventPayloadHandler.scala
new file mode 100644 (file)
index 0000000..f23b4c5
--- /dev/null
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2011-2012 GRNET S.A. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ *   1. Redistributions of source code must retain the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer.
+ *
+ *   2. Redistributions in binary form must reproduce the above
+ *      copyright notice, this list of conditions and the following
+ *      disclaimer in the documentation and/or other materials
+ *      provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
+ * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+ * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+ * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
+ * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ *
+ * The views and conclusions contained in the software and
+ * documentation are those of the authors and should not be
+ * interpreted as representing official policies, either expressed
+ * or implied, of GRNET S.A.
+ */
+
+package gr.grnet.aquarium.connector.handler
+
+import org.slf4j.Logger
+import gr.grnet.aquarium.store.{LocalFSEventStore, ResourceEventStore}
+import gr.grnet.aquarium.converter.JsonTextFormat
+import gr.grnet.aquarium.Aquarium
+import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
+import gr.grnet.aquarium.actor.message.event.ProcessResourceEvent
+import gr.grnet.aquarium.actor.RouterRole
+import gr.grnet.aquarium.util._
+
+/**
+ * A [[gr.grnet.aquarium.connector.handler.PayloadHandler]] for
+ * [[gr.grnet.aquarium.event.model.resource.ResourceEventModel]]s.
+ *
+ * @author Christos KK Loverdos <loverdos@gmail.com>
+ */
+
+class ResourceEventPayloadHandler(aquarium: Aquarium, logger: Logger)
+  extends GenericPayloadHandler[ResourceEventModel, ResourceEventStore#ResourceEvent](
+      payload ⇒ {
+        aquarium.converters.convertEx[JsonTextFormat](payload)
+      },
+
+      (payload, jsonTextFormat) ⇒ {
+      },
+
+      (payload, error) ⇒ {
+        logger.error("Error creating JSON from %s payload".format(Tags.ResourceEventTag), error)
+
+        LocalFSEventStore.storeUnparsedResourceEvent(aquarium, payload, error)
+      },
+
+      jsonTextFormat ⇒ {
+        StdResourceEvent.fromJsonTextFormat(jsonTextFormat)
+      },
+
+      (payload, event) ⇒ {
+        LocalFSEventStore.storeResourceEvent(aquarium, event, payload)
+      },
+
+      (payload, error) ⇒ {
+        logger.error("Error creating object model from %s payload".format(Tags.ResourceEventTag), error)
+      },
+
+      rcEvent ⇒ {
+        val className = shortClassNameOf(rcEvent)
+        val id = rcEvent.id
+
+        // Let's decide if it is OK to store the event
+        // Remember that OK == None as the returning result
+        //
+        // NOTE: If anything goes wrong with this function, then the handler will issue a Resend, so
+        //       do not bother to catch exceptions here.
+
+        // 1. Check if the same ID exists. Note that we use the ID sent by the event producer.
+        //    It is a requirement that this ID is unique.
+        val store = aquarium.resourceEventStore
+        store.findResourceEventById(id) match {
+          case Some(_) ⇒
+            // Reject the duplicate
+            Some(HandlerResultReject("Duplicate %s with id = %s".format(className, id)))
+
+          case None ⇒
+            None
+        }
+      },
+
+      rcEvent ⇒ {
+        aquarium.resourceEventStore.insertResourceEvent(rcEvent)
+      },
+
+      rcEvent ⇒ {
+        aquarium.actorProvider.actorForRole(RouterRole) ! ProcessResourceEvent(rcEvent)
+      }
+    )
index f4d5e45..924a0f2 100644 (file)
@@ -43,19 +43,13 @@ import com.rabbitmq.client.Address
 import gr.grnet.aquarium.{Aquarium, Configurable}
 import gr.grnet.aquarium.connector.rabbitmq.conf.{TopicExchange, RabbitMQConsumerConf, RabbitMQExchangeType}
 import gr.grnet.aquarium.connector.rabbitmq.service.RabbitMQService.RabbitMQConfKeys
-import gr.grnet.aquarium.converter.{JsonTextFormat, StdConverters}
-import gr.grnet.aquarium.event.model.resource.{StdResourceEvent, ResourceEventModel}
-import gr.grnet.aquarium.event.model.im.{StdIMEvent, IMEventModel}
+import gr.grnet.aquarium.converter.StdConverters
 import gr.grnet.aquarium.actor.RouterRole
-import gr.grnet.aquarium.actor.message.event.{ProcessIMEvent, ProcessResourceEvent}
-import gr.grnet.aquarium.store.{LocalFSEventStore, IMEventStore, ResourceEventStore}
 import gr.grnet.aquarium.connector.rabbitmq.RabbitMQConsumer
 import gr.grnet.aquarium.util.{Tags, Loggable, Lifecycle, Tag}
-import gr.grnet.aquarium.util.shortInfoOf
 import gr.grnet.aquarium.util.sameTags
-import gr.grnet.aquarium.connector.handler.{HandlerResultPanic, HandlerResult}
-import com.ckkloverdos.maybe.{Failed, Just, Maybe, MaybeEither}
 import gr.grnet.aquarium.service.event.{StoreIsAliveBusEvent, StoreIsDeadBusEvent}
+import gr.grnet.aquarium.connector.handler.{ResourceEventPayloadHandler, IMEventPayloadHandler}
 
 /**
  *
@@ -92,89 +86,11 @@ class RabbitMQService extends Loggable with Lifecycle with Configurable {
   }
 
   private[this] def doConfigure(): Unit = {
-    val jsonParser: (Array[Byte] ⇒ JsonTextFormat) = { payload ⇒
-      converters.convertEx[JsonTextFormat](payload)
-    }
-
-    val rcEventParser: (JsonTextFormat ⇒ ResourceEventModel) = { jsonTextFormat ⇒
-      StdResourceEvent.fromJsonTextFormat(jsonTextFormat)
-    }
-
-    val imEventParser: (JsonTextFormat ⇒ IMEventModel) = { jsonTextFormat ⇒
-      StdIMEvent.fromJsonTextFormat(jsonTextFormat)
-    }
-
-    val rcForwardAction: (ResourceEventStore#ResourceEvent ⇒ Unit) = { rcEvent ⇒
-      router ! ProcessResourceEvent(rcEvent)
-    }
-
-    val rcDebugForwardAction: (ResourceEventStore#ResourceEvent ⇒ Unit) = { rcEvent ⇒
-      logger.info("Forwarding {}", rcEvent)
-    }
+    val postNotifier = new PayloadHandlerPostNotifier(logger)
 
-    val imForwardAction: (IMEventStore#IMEvent ⇒ Unit) = { imEvent ⇒
-      router ! ProcessIMEvent(imEvent)
-    }
-
-    val imDebugForwardAction: (IMEventStore#IMEvent ⇒ Unit) = { imEvent ⇒
-      logger.info("Forwarding {}", imEvent)
-    }
+    val rcHandler = new ResourceEventPayloadHandler(aquarium, logger)
 
-    val postNotifier = (consumer: RabbitMQConsumer, maybeResult: Maybe[HandlerResult]) ⇒ {
-      maybeResult match {
-        case Just(hr @ HandlerResultPanic) ⇒
-          // The other end is crucial to the overall operation and it is in panic mode,
-          // so we stop delivering messages until further notice
-          logger.warn("Shutting down %s due to [%s]".format(consumer.toString, hr))
-          consumer.setAllowReconnects(false)
-          consumer.safeStop()
-
-        case Failed(e) ⇒
-          logger.warn("Shutting down %s due to [%s]".format(consumer.toString, shortInfoOf(e)))
-          consumer.setAllowReconnects(false)
-          consumer.safeStop()
-
-        case _ ⇒
-      }
-    }
-
-    val rcHandler = new GenericPayloadHandler[ResourceEventModel, ResourceEventStore#ResourceEvent](
-      jsonParser,
-      (payload, jsonTextFormat) ⇒ {},
-      (payload, error) ⇒ {
-        logger.error("Error creating JSON from %s payload".format(Tags.ResourceEventTag), error)
-
-        LocalFSEventStore.storeUnparsedResourceEvent(aquarium, payload, error)
-      },
-      rcEventParser,
-      (payload, event) ⇒ {
-        LocalFSEventStore.storeResourceEvent(aquarium, event, payload)
-      },
-      (payload, error) ⇒ {
-        logger.error("Error creating object model from %s payload".format(Tags.ResourceEventTag), error)
-      },
-      rcEvent ⇒ resourceEventStore.insertResourceEvent(rcEvent),
-      rcForwardAction
-    )
-
-    val imHandler = new GenericPayloadHandler[IMEventModel, IMEventStore#IMEvent](
-      jsonParser,
-      (payload, jsonTextFormat) ⇒ {},
-      (payload, error) ⇒ {
-        logger.error("Error parsing JSON from %s payload".format(Tags.IMEventTag), error)
-
-        LocalFSEventStore.storeUnparsedIMEvent(aquarium, payload, error)
-      },
-      imEventParser,
-      (payload, event) ⇒ {
-        LocalFSEventStore.storeIMEvent(aquarium, event, payload)
-      },
-      (payload, error) ⇒ {
-        logger.error("Error creating object model from %s payload".format(Tags.IMEventTag), error)
-      },
-      imEvent ⇒ imEventStore.insertIMEvent(imEvent),
-      imForwardAction
-    )
+    val imHandler = new IMEventPayloadHandler(aquarium, logger)
 
     val futureExecutor = new PayloadHandlerFutureExecutor