Skip to content

Commit

Permalink
GH-2288: Delegating EH - Traverse Causes
Browse files Browse the repository at this point in the history
Resolves #2288

GH-2288: Add option to deeply traverse exc chain

Add possibility to deeply traverse cause chain in order to find proper delegate
for handling thrown exception. Keep old way of cause chain traversing as default
one.

Cover new code with unit test.

GH-2288: Make cause traversing more extensible

Use BinaryExceptionClassifier while traversing cause chain to make it possible
to classify throwables for handling based on inheritance etc.

GH-2288: Remove custom BinaryExceptionClassifier

Polish Javadocs.
  • Loading branch information
breader124 authored and garyrussell committed Jul 7, 2022
1 parent ed0f2e5 commit 948db6e
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@

package org.springframework.kafka.listener;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.springframework.classify.BinaryExceptionClassifier;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

Expand All @@ -34,6 +37,7 @@
* {@link #deliveryAttemptHeader()} is not supported - always returns false.
*
* @author Gary Russell
* @author Adrian Chlebosz
* @since 2.8
*
*/
Expand All @@ -43,6 +47,10 @@ public class CommonDelegatingErrorHandler implements CommonErrorHandler {

private final Map<Class<? extends Throwable>, CommonErrorHandler> delegates = new LinkedHashMap<>();

private boolean causeChainTraversing = false;

private BinaryExceptionClassifier classifier = new BinaryExceptionClassifier(new HashMap<>());

/**
* Construct an instance with a default error handler that will be invoked if the
* exception has no matches.
Expand All @@ -59,11 +67,30 @@ public CommonDelegatingErrorHandler(CommonErrorHandler defaultErrorHandler) {
* @param delegates the delegates.
*/
public void setErrorHandlers(Map<Class<? extends Throwable>, CommonErrorHandler> delegates) {
Assert.notNull(delegates, "'delegates' cannot be null");
this.delegates.clear();
this.delegates.putAll(delegates);
checkDelegates();
updateClassifier(delegates);
}

private void updateClassifier(Map<Class<? extends Throwable>, CommonErrorHandler> delegates) {
Map<Class<? extends Throwable>, Boolean> classifications = delegates.keySet().stream()
.map(commonErrorHandler -> Map.entry(commonErrorHandler, true))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
this.classifier = new BinaryExceptionClassifier(classifications);
}

/**
* Set the flag enabling deep exception's cause chain traversing. If true, the
* delegate for the first exception classified by {@link BinaryExceptionClassifier}
* will be retrieved.
* @param causeChainTraversing the causeChainTraversing flag.
* @since 2.8.8
*/
public void setCauseChainTraversing(boolean causeChainTraversing) {
this.causeChainTraversing = causeChainTraversing;
}

@SuppressWarnings("deprecation")
@Override
Expand All @@ -79,7 +106,7 @@ public boolean seeksAfterHandling() {
@Override
public void clearThreadState() {
this.defaultErrorHandler.clearThreadState();
this.delegates.values().forEach(handler -> handler.clearThreadState());
this.delegates.values().forEach(CommonErrorHandler::clearThreadState);
}

@Override
Expand Down Expand Up @@ -158,10 +185,7 @@ public void handleOtherException(Exception thrownException, Consumer<?, ?> consu

@Nullable
private CommonErrorHandler findDelegate(Throwable thrownException) {
Throwable cause = thrownException;
if (cause instanceof ListenerExecutionFailedException) {
cause = thrownException.getCause();
}
Throwable cause = findCause(thrownException);
if (cause != null) {
Class<? extends Throwable> causeClass = cause.getClass();
for (Entry<Class<? extends Throwable>, CommonErrorHandler> entry : this.delegates.entrySet()) {
Expand All @@ -173,4 +197,32 @@ private CommonErrorHandler findDelegate(Throwable thrownException) {
return null;
}

@Nullable
private Throwable findCause(Throwable thrownException) {
if (this.causeChainTraversing) {
return traverseCauseChain(thrownException);
}
return shallowTraverseCauseChain(thrownException);
}

@Nullable
private Throwable shallowTraverseCauseChain(Throwable thrownException) {
Throwable cause = thrownException;
if (cause instanceof ListenerExecutionFailedException) {
cause = thrownException.getCause();
}
return cause;
}

@Nullable
private Throwable traverseCauseChain(Throwable thrownException) {
while (thrownException != null && thrownException.getCause() != null) {
if (this.classifier.classify(thrownException)) { // NOSONAR using Boolean here is not dangerous
return thrownException;
}
thrownException = thrownException.getCause();
}
return thrownException;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 the original author or authors.
* Copyright 2021-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

import java.io.IOException;
Expand All @@ -29,12 +30,13 @@
import org.junit.jupiter.api.Test;

import org.springframework.kafka.KafkaException;
import org.springframework.kafka.core.KafkaProducerException;

/**
* Tests for {@link CommonDelegatingErrorHandler}. Copied from
* {@link ConditionalDelegatingErrorHandlerTests} with changed handler type.
* Tests for {@link CommonDelegatingErrorHandler}.
*
* @author Gary Russell
* @author Adrian Chlebosz
* @since 2.8
*
*/
Expand Down Expand Up @@ -88,6 +90,83 @@ void testBatchDelegates() {
verify(one).handleBatch(any(), any(), any(), any(), any());
}

@Test
void testDelegateForThrowableIsAppliedWhenCauseTraversingIsEnabled() {
var defaultHandler = mock(CommonErrorHandler.class);

var directCauseErrorHandler = mock(CommonErrorHandler.class);
var directCauseExc = new IllegalArgumentException();
var errorHandler = mock(CommonErrorHandler.class);
var exc = new UnsupportedOperationException(directCauseExc);

var delegatingErrorHandler = new CommonDelegatingErrorHandler(defaultHandler);
delegatingErrorHandler.setCauseChainTraversing(true);
delegatingErrorHandler.setErrorHandlers(Map.of(
exc.getClass(), errorHandler,
directCauseExc.getClass(), directCauseErrorHandler
));

delegatingErrorHandler.handleRemaining(directCauseExc, Collections.emptyList(), mock(Consumer.class),
mock(MessageListenerContainer.class));

verify(directCauseErrorHandler).handleRemaining(any(), any(), any(), any());
verify(errorHandler, never()).handleRemaining(any(), any(), any(), any());
}

@Test
void testDelegateForThrowableCauseIsAppliedWhenCauseTraversingIsEnabled() {
var defaultHandler = mock(CommonErrorHandler.class);

var directCauseErrorHandler = mock(CommonErrorHandler.class);
var directCauseExc = new IllegalArgumentException();
var exc = new UnsupportedOperationException(directCauseExc);

var delegatingErrorHandler = new CommonDelegatingErrorHandler(defaultHandler);
delegatingErrorHandler.setCauseChainTraversing(true);
delegatingErrorHandler.setErrorHandlers(Map.of(
directCauseExc.getClass(), directCauseErrorHandler
));

delegatingErrorHandler.handleRemaining(exc, Collections.emptyList(), mock(Consumer.class),
mock(MessageListenerContainer.class));

verify(directCauseErrorHandler).handleRemaining(any(), any(), any(), any());
}

@Test
@SuppressWarnings("ConstantConditions")
void testDelegateForClassifiableThrowableCauseIsAppliedWhenCauseTraversingIsEnabled() {
var defaultHandler = mock(CommonErrorHandler.class);

var directCauseErrorHandler = mock(CommonErrorHandler.class);
var directCauseExc = new KafkaProducerException(null, null, null);
var exc = new UnsupportedOperationException(directCauseExc);

var delegatingErrorHandler = new CommonDelegatingErrorHandler(defaultHandler);
delegatingErrorHandler.setCauseChainTraversing(true);
delegatingErrorHandler.setErrorHandlers(Map.of(
KafkaException.class, directCauseErrorHandler
));

delegatingErrorHandler.handleRemaining(exc, Collections.emptyList(), mock(Consumer.class),
mock(MessageListenerContainer.class));

verify(directCauseErrorHandler).handleRemaining(any(), any(), any(), any());
}

@Test
@SuppressWarnings("ConstantConditions")
void testDefaultDelegateIsApplied() {
var defaultHandler = mock(CommonErrorHandler.class);
var delegatingErrorHandler = new CommonDelegatingErrorHandler(defaultHandler);
delegatingErrorHandler.setCauseChainTraversing(true);

delegatingErrorHandler.handleRemaining(null, Collections.emptyList(), mock(Consumer.class),
mock(MessageListenerContainer.class));

verify(defaultHandler).handleRemaining(any(), any(), any(), any());
}

private Exception wrap(Exception ex) {
return new ListenerExecutionFailedException("test", ex);
}
Expand Down

0 comments on commit 948db6e

Please sign in to comment.