Skip to content

Commit

Permalink
GH-2332: Fix Partitions in Pause Event
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell committed Jul 7, 2022
1 parent a6cce8a commit a63359c
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1700,7 +1700,7 @@ private void doPauseConsumerIfNecessary() {
this.consumerPaused = true;
this.pauseForPending = false;
this.logger.debug(() -> "Paused consumption from: " + this.consumer.paused());
publishConsumerPausedEvent(this.consumer.assignment());
publishConsumerPausedEvent(assigned);

This comment has been minimized.

Copy link
@artembilan

artembilan Jul 7, 2022

Member

Yes, I also had a doubt about this code when I tried to solve that conflict, but didn't think it could be a problem since it works on main and even 2.9.x 😢

Thank you for spotting and fixing!

}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.event.ConsumerPausedEvent;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
Expand Down Expand Up @@ -114,6 +116,9 @@ public void pausesWithManualAssignment() throws Exception {
assertThat(this.config.count).isEqualTo(4);
assertThat(this.config.contents).contains("foo", "bar", "baz", "qux");
verify(this.consumer, never()).seek(any(), anyLong());
assertThat(this.config.eventLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.config.event.getPartitions()).contains(
new TopicPartition("foo", 0), new TopicPartition("foo", 1), new TopicPartition("foo", 2));
}

@Configuration
Expand All @@ -130,8 +135,12 @@ public static class Config {

final CountDownLatch commitLatch = new CountDownLatch(3);

final CountDownLatch eventLatch = new CountDownLatch(1);

int count;

volatile ConsumerPausedEvent event;

@KafkaListener(id = "id", groupId = "grp",
topicPartitions = @org.springframework.kafka.annotation.TopicPartition(topic = "foo",
partitions = "#{'0,1,2'.split(',')}"))
Expand Down Expand Up @@ -228,6 +237,12 @@ ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(KafkaListe
return factory;
}

@EventListener
public void paused(ConsumerPausedEvent event) {
this.event = event;
this.eventLatch.countDown();
}

}

}

0 comments on commit a63359c

Please sign in to comment.