diff --git a/spring-boot-project/spring-boot-autoconfigure/build.gradle b/spring-boot-project/spring-boot-autoconfigure/build.gradle index fc32ecfd4896..8a8a20c37aa5 100644 --- a/spring-boot-project/spring-boot-autoconfigure/build.gradle +++ b/spring-boot-project/spring-boot-autoconfigure/build.gradle @@ -188,6 +188,7 @@ dependencies { optional("org.thymeleaf.extras:thymeleaf-extras-java8time") optional("org.thymeleaf.extras:thymeleaf-extras-springsecurity6") optional("redis.clients:jedis") + optional("io.projectreactor.kafka:reactor-kafka") testImplementation(project(":spring-boot-project:spring-boot-tools:spring-boot-test-support")) testImplementation(project(":spring-boot-project:spring-boot-test")) diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/DefaultKafkaReceiverCustomizer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/DefaultKafkaReceiverCustomizer.java new file mode 100644 index 000000000000..e86785f5ea28 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/DefaultKafkaReceiverCustomizer.java @@ -0,0 +1,37 @@ +/* + * Copyright 2012-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.kafka; + +import reactor.kafka.receiver.internals.DefaultKafkaReceiver; + +/** + * Callback interface for customizing + * {@code reactor.kafka.receiver.internals.DefaultKafkaReceiver} beans. + * + * @author Almog Tavor + * @since 2.7.0 + */ +@FunctionalInterface +public interface DefaultKafkaReceiverCustomizer { + + /** + * Customize the {@link DefaultKafkaReceiver}. + * @param receiverFactory the receiver factory to customize + */ + void customize(DefaultKafkaReceiver receiverFactory); + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/DefaultKafkaSenderCustomizer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/DefaultKafkaSenderCustomizer.java new file mode 100644 index 000000000000..02cd4eff3709 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/DefaultKafkaSenderCustomizer.java @@ -0,0 +1,37 @@ +/* + * Copyright 2012-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.kafka; + +import reactor.kafka.sender.internals.DefaultKafkaSender; + +/** + * Callback interface for customizing + * {@code reactor.kafka.sender.internals.DefaultKafkaSender} beans. + * + * @author Almog Tavor + * @since 2.7.0 + */ +@FunctionalInterface +public interface DefaultKafkaSenderCustomizer { + + /** + * Customize the {@link DefaultKafkaSender}. + * @param senderFactory the consumer factory to customize + */ + void customize(DefaultKafkaSender senderFactory); + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ReactiveKafkaAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ReactiveKafkaAutoConfiguration.java new file mode 100644 index 000000000000..3a00a97a3d7f --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ReactiveKafkaAutoConfiguration.java @@ -0,0 +1,118 @@ +/* + * Copyright 2012-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.kafka; + +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; + +import reactor.kafka.receiver.KafkaReceiver; +import reactor.kafka.receiver.ReceiverOptions; +import reactor.kafka.sender.KafkaSender; +import reactor.kafka.sender.SenderOptions; + +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.context.properties.PropertyMapper; +import org.springframework.context.annotation.Bean; + +/** + * {@link EnableAutoConfiguration Auto-configuration} for the Reactive client of Apache + * Kafka. + * + * @author Almog Tavor + * @since 2.7.0 + */ +@AutoConfiguration +@ConditionalOnClass({ KafkaReceiver.class, KafkaSender.class }) +@ConditionalOnBean(KafkaProperties.class) +@EnableConfigurationProperties(ReactiveKafkaProperties.class) +public class ReactiveKafkaAutoConfiguration { + + private final KafkaProperties kafkaProperties; + + private final ReactiveKafkaProperties reactiveKafkaProperties; + + private static final PropertyMapper map = PropertyMapper.get(); + + public ReactiveKafkaAutoConfiguration(KafkaProperties kafkaProperties, + ReactiveKafkaProperties reactiveKafkaProperties) { + this.kafkaProperties = kafkaProperties; + this.reactiveKafkaProperties = reactiveKafkaProperties; + } + + @Bean + @ConditionalOnMissingBean(ReceiverOptions.class) + public ReceiverOptions receiverOptions() { + Map properties = this.kafkaProperties.buildConsumerProperties(); + ReceiverOptions receiverOptions = ReceiverOptions.create(properties); + ReactiveKafkaProperties.Receiver receiverProperties = this.reactiveKafkaProperties.getReceiver(); + receiverOptions = setPropertyWhenGreaterThanZero(receiverProperties.getAtmostOnceCommitAheadSize(), + receiverOptions::atmostOnceCommitAheadSize, receiverOptions); + receiverOptions = setPropertyWhenGreaterThanZero(receiverProperties.getMaxDeferredCommits(), + receiverOptions::maxDeferredCommits, receiverOptions); + receiverOptions = setPropertyWhenGreaterThanZero(receiverProperties.getMaxCommitAttempts(), + receiverOptions::maxCommitAttempts, receiverOptions); + receiverOptions = setPropertyWhenGreaterThanZero(receiverProperties.getCommitBatchSize(), + receiverOptions::commitBatchSize, receiverOptions); + receiverOptions = setPropertyWhenNonNull(receiverProperties.getCloseTimeout(), receiverOptions::closeTimeout, + receiverOptions); + receiverOptions = setPropertyWhenNonNull(receiverProperties.getPollTimeout(), receiverOptions::pollTimeout, + receiverOptions); + receiverOptions = setPropertyWhenNonNull(receiverProperties.getCommitInterval(), + receiverOptions::commitInterval, receiverOptions); + receiverOptions = setPropertyWhenNonNull(receiverProperties.getSubscribeTopics(), receiverOptions::subscription, + receiverOptions); + if (Optional.ofNullable(receiverProperties.getSubscribeTopics()).isEmpty()) { + receiverOptions = setPropertyWhenNonNull(receiverProperties.getSubscribePattern(), + receiverOptions::subscription, receiverOptions); + } + return receiverOptions; + } + + @Bean + @ConditionalOnMissingBean(SenderOptions.class) + public SenderOptions senderOptions() { + Map properties = this.kafkaProperties.buildProducerProperties(); + SenderOptions senderOptions = SenderOptions.create(properties); + ReactiveKafkaProperties.Sender senderProperties = this.reactiveKafkaProperties.getSender(); + senderOptions = map.from(senderProperties.getCloseTimeout()).toInstance(senderOptions::closeTimeout); + senderOptions = setPropertyWhenGreaterThanZero(senderProperties.getMaxInFlight(), senderOptions::maxInFlight, + senderOptions); + senderOptions = map.from(senderProperties.isStopOnError()).toInstance(senderOptions::stopOnError); + return senderOptions; + } + + private T setPropertyWhenGreaterThanZero(Integer property, Function function, T options) { + if (property <= 0) { + return options; + } + return map.from(property).when((i) -> i > 0).toInstance(function); + } + + private T setPropertyWhenNonNull(S property, Function function, T options) { + if (Optional.ofNullable(property).isEmpty()) { + return options; + } + return map.from(property).whenNonNull().toInstance(function); + } + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ReactiveKafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ReactiveKafkaProperties.java new file mode 100644 index 000000000000..29ca59942dff --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ReactiveKafkaProperties.java @@ -0,0 +1,238 @@ +/* + * Copyright 2012-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.kafka; + +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Pattern; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * Configuration properties for Project Reactor for Apache Kafka. + *

+ * Users should refer to Reactor Kafka documentation for complete descriptions of these + * properties. + * + * @author Almog Tavor + * @since 2.7.0 + */ +@ConfigurationProperties(prefix = "spring.reactor.kafka") +public class ReactiveKafkaProperties { + + /** + * Additional properties, common to producers and consumers, used to configure the + * client. + */ + private final Map properties = new HashMap<>(); + + private final Receiver receiver = new Receiver(); + + private final Sender sender = new Sender(); + + public Map getProperties() { + return this.properties; + } + + public Receiver getReceiver() { + return this.receiver; + } + + public Sender getSender() { + return this.sender; + } + + public static class Receiver { + + /** + * Sets the timeout for each KafkaConsumer's poll operation duration. + */ + private Duration pollTimeout; + + /** + * Sets timeout for graceful shutdown of the KafkaConsumer. + */ + private Duration closeTimeout; + + /** + * Sets subscription using group management to the specified collection of topics. + */ + private Collection subscribeTopics; + + /** + * Sets subscription using group management to the specified pattern. + */ + private Pattern subscribePattern; + + /** + * Configures commit interval for automatic commits. At least one commit operation + * is attempted within this interval if records are consumed and acknowledged. + */ + private Duration commitInterval; + + /** + * Configures commit batch size for automatic commits. At least one commit + * operation is attempted when the number of acknowledged uncommitted offsets + * reaches this batch size. + */ + private int commitBatchSize; + + /** + * Configures commit ahead size per partition for at-most-once delivery. Before + * dispatching each record, an offset ahead by this size may be committed. + */ + private int atmostOnceCommitAheadSize; + + /** + * Configures the maximum number of consecutive non-fatal + * RetriableCommitFailedException commit failures that are tolerated. + */ + private int maxCommitAttempts; + + /** + * The limit for the number of deferred commits to pause the consumer until the + * deferred commits are reduced. + */ + private int maxDeferredCommits; + + public Duration getPollTimeout() { + return this.pollTimeout; + } + + public void setPollTimeout(Duration pollTimeout) { + this.pollTimeout = pollTimeout; + } + + public Duration getCloseTimeout() { + return this.closeTimeout; + } + + public void setCloseTimeout(Duration closeTimeout) { + this.closeTimeout = closeTimeout; + } + + public Collection getSubscribeTopics() { + return this.subscribeTopics; + } + + public void setSubscribeTopics(Collection subscribeTopics) { + this.subscribeTopics = subscribeTopics; + } + + public Pattern getSubscribePattern() { + return this.subscribePattern; + } + + public void setSubscribePattern(Pattern subscribePattern) { + this.subscribePattern = subscribePattern; + } + + public Duration getCommitInterval() { + return this.commitInterval; + } + + public void setCommitInterval(Duration commitInterval) { + this.commitInterval = commitInterval; + } + + public int getCommitBatchSize() { + return this.commitBatchSize; + } + + public void setCommitBatchSize(int commitBatchSize) { + this.commitBatchSize = commitBatchSize; + } + + public int getAtmostOnceCommitAheadSize() { + return this.atmostOnceCommitAheadSize; + } + + public void setAtmostOnceCommitAheadSize(int atmostOnceCommitAheadSize) { + this.atmostOnceCommitAheadSize = atmostOnceCommitAheadSize; + } + + public int getMaxCommitAttempts() { + return this.maxCommitAttempts; + } + + public void setMaxCommitAttempts(int maxCommitAttempts) { + this.maxCommitAttempts = maxCommitAttempts; + } + + public int getMaxDeferredCommits() { + return this.maxDeferredCommits; + } + + public void setMaxDeferredCommits(int maxDeferredCommits) { + this.maxDeferredCommits = maxDeferredCommits; + } + + } + + public static class Sender { + + private final Map properties = new HashMap<>(); + + /** + * Configures the maximum number of in-flight records that are fetched from the + * outbound record publisher while acknowledgements are pending. + */ + private int maxInFlight; + + /** + * Configures error handling behaviour for the KafkaSender's send function. + */ + private boolean stopOnError; + + /** + * Configures the timeout for graceful shutdown of this sender. + */ + private Duration closeTimeout; + + public int getMaxInFlight() { + return this.maxInFlight; + } + + public void setMaxInFlight(int maxInFlight) { + this.maxInFlight = maxInFlight; + } + + public boolean isStopOnError() { + return this.stopOnError; + } + + public void setStopOnError(boolean stopOnError) { + this.stopOnError = stopOnError; + } + + public Duration getCloseTimeout() { + return this.closeTimeout; + } + + public void setCloseTimeout(Duration closeTimeout) { + this.closeTimeout = closeTimeout; + } + + public Map getProperties() { + return this.properties; + } + + } + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/ReactiveKafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/ReactiveKafkaAutoConfigurationTests.java new file mode 100644 index 000000000000..b77bb3d470dc --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/ReactiveKafkaAutoConfigurationTests.java @@ -0,0 +1,96 @@ +/* + * Copyright 2012-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.kafka; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Objects; +import java.util.regex.Pattern; + +import org.junit.jupiter.api.Test; +import reactor.kafka.receiver.ReceiverOptions; +import reactor.kafka.sender.SenderOptions; + +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link ReactiveKafkaAutoConfiguration}. + * + * @author Almog Tavor + */ +class ReactiveKafkaAutoConfigurationTests { + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class)) + .withConfiguration(AutoConfigurations.of(ReactiveKafkaAutoConfiguration.class)); + + @Test + void receiverProperties() { + this.contextRunner.withPropertyValues("spring.reactor.kafka.receiver.commit-interval=2000", + "spring.reactor.kafka.receiver.close-timeout=1500", + "spring.reactor.kafka.receiver.commit-batch-size=100", + "spring.reactor.kafka.receiver.poll-timeout=1000", + "spring.reactor.kafka.receiver.atmost-once-commit-ahead-size=42", + "spring.reactor.kafka.receiver.max-commit-attempts=3", + "spring.reactor.kafka.receiver.max-deferred-commits=5", + "spring.reactor.kafka.receiver.subscribe-topics=foo,bar").run((context) -> { + ReceiverOptions receiverOptions = context.getBean(ReceiverOptions.class); + assertThat(receiverOptions.commitInterval()).isEqualTo(Duration.ofSeconds(2)); + assertThat(receiverOptions.closeTimeout()).isEqualTo(Duration.ofMillis(1500)); + assertThat(receiverOptions.commitBatchSize()).isEqualTo(100); + assertThat(receiverOptions.pollTimeout()).isEqualTo(Duration.ofSeconds(1)); + assertThat(receiverOptions.atmostOnceCommitAheadSize()).isEqualTo(42); + assertThat(receiverOptions.maxCommitAttempts()).isEqualTo(3); + assertThat(receiverOptions.maxDeferredCommits()).isEqualTo(5); + assertThat(receiverOptions.subscriptionTopics()).containsAll(Arrays.asList("foo", "bar")); + }); + } + + @Test + void receiverPropertiesSubscribePattern() { + this.contextRunner.withPropertyValues("spring.reactor.kafka.receiver.subscribe-pattern=myTopic.+") + .run((context) -> { + ReceiverOptions receiverOptions = context.getBean(ReceiverOptions.class); + assertThat(receiverOptions.subscriptionPattern()) + .matches((p) -> Objects.equals(Pattern.compile("myTopic.+").pattern(), p.pattern())); + }); + } + + @Test + void receiverPropertiesDefaultValues() { + this.contextRunner.withPropertyValues().run((context) -> { + ReceiverOptions receiverOptions = context.getBean(ReceiverOptions.class); + assertThat(receiverOptions.commitInterval()).isEqualTo(Duration.ofSeconds(5)); + }); + } + + @Test + void producerProperties() { + this.contextRunner.withPropertyValues("spring.reactor.kafka.sender.max-in-flight=1500", + "spring.reactor.kafka.sender.stop-on-error=false", "spring.reactor.kafka.sender.close-timeout=500") + .run((context) -> { + SenderOptions senderOptions = context.getBean(SenderOptions.class); + assertThat(senderOptions.maxInFlight()).isEqualTo(1500); + assertThat(senderOptions.stopOnError()).isFalse(); + assertThat(senderOptions.closeTimeout()).isEqualTo(Duration.ofMillis(500)); + }); + } + +}