projects
/
aquarium
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
e97f6bd
)
Log errors during the payload handling sequence
author
Christos KK Loverdos
<loverdos@gmail.com>
Mon, 21 May 2012 13:29:50 +0000
(16:29 +0300)
committer
Christos KK Loverdos
<loverdos@gmail.com>
Mon, 21 May 2012 13:29:50 +0000
(16:29 +0300)
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQConsumer.scala
patch
|
blob
|
history
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/GenericPayloadHandler.scala
patch
|
blob
|
history
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/RabbitMQService.scala
patch
|
blob
|
history
diff --git
a/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQConsumer.scala
b/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQConsumer.scala
index
37c9cdb
..
d630b44
100644
(file)
--- a/
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQConsumer.scala
+++ b/
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/RabbitMQConsumer.scala
@@
-329,26
+329,31
@@
class RabbitMQConsumer(val conf: RabbitMQConsumerConf,
handlerResult
}
handlerResult
}
- val onSuccessBasicStepF: (HandlerResult ⇒ Unit) = {
- case HandlerResultSuccess ⇒
+ val onSuccessBasicStepF: (HandlerResult ⇒ HandlerResult) = {
+ case result @ HandlerResultSuccess ⇒
doWithChannel(_.basicAck(deliveryTag, false))
doWithChannel(_.basicAck(deliveryTag, false))
+ result
- case HandlerResultResend ⇒
+ case result @ HandlerResultResend ⇒
doWithChannel(_.basicNack(deliveryTag, false, true))
doWithChannel(_.basicNack(deliveryTag, false, true))
+ result
- case HandlerResultReject(_) ⇒
+ case result @ HandlerResultReject(_) ⇒
doWithChannel(_.basicReject(deliveryTag, false))
doWithChannel(_.basicReject(deliveryTag, false))
+ result
- case HandlerResultRequeue(_) ⇒
+ case result @ HandlerResultRequeue(_) ⇒
doWithChannel(_.basicReject(deliveryTag, true))
doWithChannel(_.basicReject(deliveryTag, true))
+ result
- case HandlerResultPanic ⇒
+ case result @ HandlerResultPanic ⇒
// Just inform RabbitMQ and subsequent actions will be made by the notifier.
// So, this is a `HandlerResultResend` with extra semantics.
doWithChannel(_.basicNack(deliveryTag, false, true))
// Just inform RabbitMQ and subsequent actions will be made by the notifier.
// So, this is a `HandlerResultResend` with extra semantics.
doWithChannel(_.basicNack(deliveryTag, false, true))
+ result
}
}
- val onSuccessF = notifierF andThen onSuccessBasicStepF
+ val onSuccessF = onSuccessBasicStepF andThen notifierF
executor.exec(body, handler) (onSuccessF) (onErrorF)
}
executor.exec(body, handler) (onSuccessF) (onErrorF)
}
diff --git
a/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/GenericPayloadHandler.scala
b/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/GenericPayloadHandler.scala
index
ed87153
..
7c35e3b
100644
(file)
--- a/
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/GenericPayloadHandler.scala
+++ b/
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/GenericPayloadHandler.scala
@@
-62,6
+62,7
@@
class GenericPayloadHandler[E <: ExternalEventModel, S <: ExternalEventModel]
(jsonParser: Array[Byte] ⇒ MaybeEither[JsonTextFormat],
jsonParserErrorAction: (Array[Byte], Throwable) ⇒ Unit,
eventParser: JsonTextFormat ⇒ E,
(jsonParser: Array[Byte] ⇒ MaybeEither[JsonTextFormat],
jsonParserErrorAction: (Array[Byte], Throwable) ⇒ Unit,
eventParser: JsonTextFormat ⇒ E,
+ eventParserErrorAction: (Array[Byte], Throwable) ⇒ Unit,
saveAction: E ⇒ S,
forwardAction: S ⇒ Unit) extends PayloadHandler {
saveAction: E ⇒ S,
forwardAction: S ⇒ Unit) extends PayloadHandler {
@@
-69,7
+70,7
@@
class GenericPayloadHandler[E <: ExternalEventModel, S <: ExternalEventModel]
// 1. try to parse as json
jsonParser(payload) match {
case Failed(e) ⇒
// 1. try to parse as json
jsonParser(payload) match {
case Failed(e) ⇒
- jsonParserErrorAction(payload, e)
+ safeUnit(jsonParserErrorAction(payload, e))
HandlerResultReject(e.getMessage)
HandlerResultReject(e.getMessage)
@@
-77,6
+78,8
@@
class GenericPayloadHandler[E <: ExternalEventModel, S <: ExternalEventModel]
// 2. try to parse as model
MaybeEither { eventParser(jsonTextFormat) } match {
case Failed(e) ⇒
// 2. try to parse as model
MaybeEither { eventParser(jsonTextFormat) } match {
case Failed(e) ⇒
+ safeUnit(eventParserErrorAction(payload, e))
+
HandlerResultReject(e.getMessage)
case Just(event) ⇒
HandlerResultReject(e.getMessage)
case Just(event) ⇒
diff --git
a/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/RabbitMQService.scala
b/src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/RabbitMQService.scala
index
a2b925b
..
4412f5e
100644
(file)
--- a/
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/RabbitMQService.scala
+++ b/
src/main/scala/gr/grnet/aquarium/connector/rabbitmq/service/RabbitMQService.scala
@@
-140,16
+140,28
@@
class RabbitMQService extends Loggable with Lifecycle with Configurable {
val rcHandler = new GenericPayloadHandler[ResourceEventModel, ResourceEventStore#ResourceEvent](
jsonParser,
val rcHandler = new GenericPayloadHandler[ResourceEventModel, ResourceEventStore#ResourceEvent](
jsonParser,
- (payload, error) ⇒ LocalFSEventStore.storeUnparsedResourceEvent(aquarium, payload, error),
+ (payload, error) ⇒ {
+ LocalFSEventStore.storeUnparsedResourceEvent(aquarium, payload, error)
+ logger.error("Error creating JSON from %s payload".format(Tags.ResourceEventTag), error)
+ },
rcEventParser,
rcEventParser,
+ (payload, error) ⇒ {
+ logger.error("Error creating object model from %s payload".format(Tags.ResourceEventTag), error)
+ },
rcEvent ⇒ resourceEventStore.insertResourceEvent(rcEvent),
rcDebugForwardAction
)
val imHandler = new GenericPayloadHandler[IMEventModel, IMEventStore#IMEvent](
jsonParser,
rcEvent ⇒ resourceEventStore.insertResourceEvent(rcEvent),
rcDebugForwardAction
)
val imHandler = new GenericPayloadHandler[IMEventModel, IMEventStore#IMEvent](
jsonParser,
- (payload, error) ⇒ LocalFSEventStore.storeUnparsedIMEvent(aquarium, payload, error),
+ (payload, error) ⇒ {
+ LocalFSEventStore.storeUnparsedIMEvent(aquarium, payload, error)
+ logger.error("Error parsing JSON from %s payload".format(Tags.IMEventTag), error)
+ },
imEventParser,
imEventParser,
+ (payload, error) ⇒ {
+ logger.error("Error creating object model from %s payload".format(Tags.IMEventTag), error)
+ },
imEvent ⇒ imEventStore.insertIMEvent(imEvent),
imDebugForwardAction
)
imEvent ⇒ imEventStore.insertIMEvent(imEvent),
imDebugForwardAction
)