handlerResult
}
- val onSuccessBasicStepF: (HandlerResult ⇒ Unit) = {
- case HandlerResultSuccess ⇒
+ val onSuccessBasicStepF: (HandlerResult ⇒ HandlerResult) = {
+ case result @ HandlerResultSuccess ⇒
doWithChannel(_.basicAck(deliveryTag, false))
+ result
- case HandlerResultResend ⇒
+ case result @ HandlerResultResend ⇒
doWithChannel(_.basicNack(deliveryTag, false, true))
+ result
- case HandlerResultReject(_) ⇒
+ case result @ HandlerResultReject(_) ⇒
doWithChannel(_.basicReject(deliveryTag, false))
+ result
- case HandlerResultRequeue(_) ⇒
+ case result @ HandlerResultRequeue(_) ⇒
doWithChannel(_.basicReject(deliveryTag, true))
+ result
- case HandlerResultPanic ⇒
+ case result @ HandlerResultPanic ⇒
// Just inform RabbitMQ and subsequent actions will be made by the notifier.
// So, this is a `HandlerResultResend` with extra semantics.
doWithChannel(_.basicNack(deliveryTag, false, true))
+ result
}
- val onSuccessF = notifierF andThen onSuccessBasicStepF
+ val onSuccessF = onSuccessBasicStepF andThen notifierF
executor.exec(body, handler) (onSuccessF) (onErrorF)
}
(jsonParser: Array[Byte] ⇒ MaybeEither[JsonTextFormat],
jsonParserErrorAction: (Array[Byte], Throwable) ⇒ Unit,
eventParser: JsonTextFormat ⇒ E,
+ eventParserErrorAction: (Array[Byte], Throwable) ⇒ Unit,
saveAction: E ⇒ S,
forwardAction: S ⇒ Unit) extends PayloadHandler {
// 1. try to parse as json
jsonParser(payload) match {
case Failed(e) ⇒
- jsonParserErrorAction(payload, e)
+ safeUnit(jsonParserErrorAction(payload, e))
HandlerResultReject(e.getMessage)
// 2. try to parse as model
MaybeEither { eventParser(jsonTextFormat) } match {
case Failed(e) ⇒
+ safeUnit(eventParserErrorAction(payload, e))
+
HandlerResultReject(e.getMessage)
case Just(event) ⇒
val rcHandler = new GenericPayloadHandler[ResourceEventModel, ResourceEventStore#ResourceEvent](
jsonParser,
- (payload, error) ⇒ LocalFSEventStore.storeUnparsedResourceEvent(aquarium, payload, error),
+ (payload, error) ⇒ {
+ LocalFSEventStore.storeUnparsedResourceEvent(aquarium, payload, error)
+ logger.error("Error creating JSON from %s payload".format(Tags.ResourceEventTag), error)
+ },
rcEventParser,
+ (payload, error) ⇒ {
+ logger.error("Error creating object model from %s payload".format(Tags.ResourceEventTag), error)
+ },
rcEvent ⇒ resourceEventStore.insertResourceEvent(rcEvent),
rcDebugForwardAction
)
val imHandler = new GenericPayloadHandler[IMEventModel, IMEventStore#IMEvent](
jsonParser,
- (payload, error) ⇒ LocalFSEventStore.storeUnparsedIMEvent(aquarium, payload, error),
+ (payload, error) ⇒ {
+ LocalFSEventStore.storeUnparsedIMEvent(aquarium, payload, error)
+ logger.error("Error parsing JSON from %s payload".format(Tags.IMEventTag), error)
+ },
imEventParser,
+ (payload, error) ⇒ {
+ logger.error("Error creating object model from %s payload".format(Tags.IMEventTag), error)
+ },
imEvent ⇒ imEventStore.insertIMEvent(imEvent),
imDebugForwardAction
)