Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add auto-configuration for Reactor Kafka #30567

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions spring-boot-project/spring-boot-autoconfigure/build.gradle
Expand Up @@ -188,6 +188,7 @@ dependencies {
optional("org.thymeleaf.extras:thymeleaf-extras-java8time")
optional("org.thymeleaf.extras:thymeleaf-extras-springsecurity6")
optional("redis.clients:jedis")
optional("io.projectreactor.kafka:reactor-kafka")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT] Wdyt about moving the dep to alpha-order by the other io.projectreactor or another option, closer to spring-kafka? It seems out of place here.


testImplementation(project(":spring-boot-project:spring-boot-tools:spring-boot-test-support"))
testImplementation(project(":spring-boot-project:spring-boot-test"))
Expand Down
@@ -0,0 +1,37 @@
/*
* Copyright 2012-2020 the original author or authors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT] Copyrights need updating.

*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.boot.autoconfigure.kafka;

import reactor.kafka.receiver.internals.DefaultKafkaReceiver;

/**
* Callback interface for customizing
* {@code reactor.kafka.receiver.internals.DefaultKafkaReceiver} beans.
*
* @author Almog Tavor
* @since 2.7.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT] This ship has sailed :) - @since needs to be updated

*/
@FunctionalInterface
public interface DefaultKafkaReceiverCustomizer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are only auto-configuring the Receiver/SenderOptions I think we should drop these customizers out of this proposal. They also are not currently being called nor tested.

Instead we could add the Receiver/SenderOptionsCustomizer called out in spring-cloud/spring-cloud-stream#2296. We could also just add the options customizers in a separate PR under the SCS ticket.

If we do any customizers in this PR though they will need to be invoked in an order manner in the AC and have tests for this.


/**
* Customize the {@link DefaultKafkaReceiver}.
* @param receiverFactory the receiver factory to customize
*/
void customize(DefaultKafkaReceiver<?, ?> receiverFactory);

}
@@ -0,0 +1,37 @@
/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.boot.autoconfigure.kafka;

import reactor.kafka.sender.internals.DefaultKafkaSender;

/**
* Callback interface for customizing
* {@code reactor.kafka.sender.internals.DefaultKafkaSender} beans.
*
* @author Almog Tavor
* @since 2.7.0
*/
@FunctionalInterface
public interface DefaultKafkaSenderCustomizer {

/**
* Customize the {@link DefaultKafkaSender}.
* @param senderFactory the consumer factory to customize
*/
void customize(DefaultKafkaSender<?, ?> senderFactory);

}
@@ -0,0 +1,118 @@
/*
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.boot.autoconfigure.kafka;

import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;

import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.context.annotation.Bean;

/**
* {@link EnableAutoConfiguration Auto-configuration} for the Reactive client of Apache
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT] We have a bit of inconsistent description of the "thing" we are doing. We call it several things, "Reactive client of Apache Kafka" here and slightly different things elsewhere in this PR - maybe consolidate on one? I like ReactorKafka personally.

* Kafka.
*
* @author Almog Tavor
* @since 2.7.0
*/
@AutoConfiguration
@ConditionalOnClass({ KafkaReceiver.class, KafkaSender.class })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be more direct to guard on the thing we are auto-configuring (ReceiverOptions/SenderOptions) rather than something that happens to use them and is in the same library.

@ConditionalOnBean(KafkaProperties.class)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess something like @AutoConfigureAfter(KafkaAutoConfiguration.class) is better otherwise we cannot predict the order of auto-configurations.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea

@EnableConfigurationProperties(ReactiveKafkaProperties.class)
public class ReactiveKafkaAutoConfiguration {

private final KafkaProperties kafkaProperties;

private final ReactiveKafkaProperties reactiveKafkaProperties;

private static final PropertyMapper map = PropertyMapper.get();

public ReactiveKafkaAutoConfiguration(KafkaProperties kafkaProperties,
ReactiveKafkaProperties reactiveKafkaProperties) {
this.kafkaProperties = kafkaProperties;
this.reactiveKafkaProperties = reactiveKafkaProperties;
}

@Bean
@ConditionalOnMissingBean(ReceiverOptions.class)
public <K, V> ReceiverOptions<K, V> receiverOptions() {
Map<String, Object> properties = this.kafkaProperties.buildConsumerProperties();
ReceiverOptions<K, V> receiverOptions = ReceiverOptions.create(properties);
ReactiveKafkaProperties.Receiver receiverProperties = this.reactiveKafkaProperties.getReceiver();
receiverOptions = setPropertyWhenGreaterThanZero(receiverProperties.getAtmostOnceCommitAheadSize(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT-PREFERENCE] I would prefer to use the PropertyMapper API inline w/o the convenience methods below. There are many examples that do this same thing such as CassandraAutoConfiguration and JettyWebServerFactoryCustomizer

receiverOptions::atmostOnceCommitAheadSize, receiverOptions);
receiverOptions = setPropertyWhenGreaterThanZero(receiverProperties.getMaxDeferredCommits(),
receiverOptions::maxDeferredCommits, receiverOptions);
receiverOptions = setPropertyWhenGreaterThanZero(receiverProperties.getMaxCommitAttempts(),
receiverOptions::maxCommitAttempts, receiverOptions);
receiverOptions = setPropertyWhenGreaterThanZero(receiverProperties.getCommitBatchSize(),
receiverOptions::commitBatchSize, receiverOptions);
receiverOptions = setPropertyWhenNonNull(receiverProperties.getCloseTimeout(), receiverOptions::closeTimeout,
receiverOptions);
receiverOptions = setPropertyWhenNonNull(receiverProperties.getPollTimeout(), receiverOptions::pollTimeout,
receiverOptions);
receiverOptions = setPropertyWhenNonNull(receiverProperties.getCommitInterval(),
receiverOptions::commitInterval, receiverOptions);
receiverOptions = setPropertyWhenNonNull(receiverProperties.getSubscribeTopics(), receiverOptions::subscription,
receiverOptions);
if (Optional.ofNullable(receiverProperties.getSubscribeTopics()).isEmpty()) {
receiverOptions = setPropertyWhenNonNull(receiverProperties.getSubscribePattern(),
receiverOptions::subscription, receiverOptions);
}
return receiverOptions;
}

@Bean
@ConditionalOnMissingBean(SenderOptions.class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bean type is redundant here as the return type of the method is the default.

public <K, V> SenderOptions<K, V> senderOptions() {
Map<String, Object> properties = this.kafkaProperties.buildProducerProperties();
SenderOptions<K, V> senderOptions = SenderOptions.create(properties);
ReactiveKafkaProperties.Sender senderProperties = this.reactiveKafkaProperties.getSender();
senderOptions = map.from(senderProperties.getCloseTimeout()).toInstance(senderOptions::closeTimeout);
senderOptions = setPropertyWhenGreaterThanZero(senderProperties.getMaxInFlight(), senderOptions::maxInFlight,
senderOptions);
senderOptions = map.from(senderProperties.isStopOnError()).toInstance(senderOptions::stopOnError);
return senderOptions;
}

private <T> T setPropertyWhenGreaterThanZero(Integer property, Function<Integer, T> function, T options) {
if (property <= 0) {
return options;
}
return map.from(property).when((i) -> i > 0).toInstance(function);
}

private <S, T> T setPropertyWhenNonNull(S property, Function<S, T> function, T options) {
if (Optional.ofNullable(property).isEmpty()) {
return options;
}
return map.from(property).whenNonNull().toInstance(function);
}

}