Skip to content

Commit

Permalink
spring-projectsGH-2195: Fix Remaining ConsumerRecords and Test
Browse files Browse the repository at this point in the history
Use a `LinkedHashMap` for the remaining records so that the order is
retained.
Fix test after removing some logic in the last commit.
  • Loading branch information
garyrussell committed Jul 6, 2022
1 parent b2b41d9 commit ad48491
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2776,7 +2776,7 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
else {
boolean handled = this.commonErrorHandler.handleOne(rte, record, this.consumer,
KafkaMessageListenerContainer.this.thisOrParentContainer);
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
if (!handled) {
records.computeIfAbsent(new TopicPartition(record.topic(), record.partition()),
tp -> new ArrayList<ConsumerRecord<K, V>>()).add(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

Expand Down Expand Up @@ -110,7 +108,6 @@ public void doesNotResumeIfPartitionPaused() throws Exception {
verify(this.consumer, never()).resume(any());
assertThat(this.config.count).isEqualTo(4);
assertThat(this.config.contents).contains("foo", "bar", "baz", "qux");
assertThat(this.config.deliveries).contains(1, 1, 1, 1);
verify(this.consumer, never()).seek(any(), anyLong());
}

Expand All @@ -120,8 +117,6 @@ public static class Config {

final List<String> contents = new ArrayList<>();

final List<Integer> deliveries = new ArrayList<>();

final CountDownLatch pollLatch = new CountDownLatch(4);

final CountDownLatch deliveryLatch = new CountDownLatch(4);
Expand All @@ -135,9 +130,8 @@ public static class Config {
@KafkaListener(id = "id", groupId = "grp",
topicPartitions = @org.springframework.kafka.annotation.TopicPartition(topic = "foo",
partitions = "#{'0,1,2'.split(',')}"))
public void foo(String in, @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery) {
public void foo(String in) {
this.contents.add(in);
this.deliveries.add(delivery);
this.deliveryLatch.countDown();
if (++this.count == 4 || this.count == 5) { // part 1, offset 1, first and second times
throw new RuntimeException("foo");
Expand Down
2 changes: 1 addition & 1 deletion spring-kafka/src/test/resources/log4j2-test.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
</Console>
</Appenders>
<Loggers>
<Logger name="org.springframework.kafka" level="warn"/>
<Logger name="org.springframework.kafka" level="debug"/>
<Logger name="org.springframework.kafka.ReplyingKafkaTemplate" level="warn"/>
<Logger name="org.springframework.kafka.retrytopic" level="warn"/>
<Logger name="org.apache.kafka.clients" level="warn"/>
Expand Down

0 comments on commit ad48491

Please sign in to comment.