New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add auto-configuration for Reactor Kafka #30567
base: main
Are you sure you want to change the base?
Changes from all commits
5af4e0f
a73dc3d
518e7c2
091b4d5
ff8f265
e8d2547
e2a3023
abe59b4
ce390b0
4da3c69
c644529
37059a5
9fb8bfd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright 2012-2020 the original author or authors. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [NIT] Copyrights need updating. |
||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.springframework.boot.autoconfigure.kafka; | ||
|
||
import reactor.kafka.receiver.internals.DefaultKafkaReceiver; | ||
|
||
/** | ||
* Callback interface for customizing | ||
* {@code reactor.kafka.receiver.internals.DefaultKafkaReceiver} beans. | ||
* | ||
* @author Almog Tavor | ||
* @since 2.7.0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [NIT] This ship has sailed :) - |
||
*/ | ||
@FunctionalInterface | ||
public interface DefaultKafkaReceiverCustomizer { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we are only auto-configuring the Instead we could add the If we do any customizers in this PR though they will need to be invoked in an order manner in the AC and have tests for this. |
||
|
||
/** | ||
* Customize the {@link DefaultKafkaReceiver}. | ||
* @param receiverFactory the receiver factory to customize | ||
*/ | ||
void customize(DefaultKafkaReceiver<?, ?> receiverFactory); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright 2012-2020 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.springframework.boot.autoconfigure.kafka; | ||
|
||
import reactor.kafka.sender.internals.DefaultKafkaSender; | ||
|
||
/** | ||
* Callback interface for customizing | ||
* {@code reactor.kafka.sender.internals.DefaultKafkaSender} beans. | ||
* | ||
* @author Almog Tavor | ||
* @since 2.7.0 | ||
*/ | ||
@FunctionalInterface | ||
public interface DefaultKafkaSenderCustomizer { | ||
|
||
/** | ||
* Customize the {@link DefaultKafkaSender}. | ||
* @param senderFactory the consumer factory to customize | ||
*/ | ||
void customize(DefaultKafkaSender<?, ?> senderFactory); | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
/* | ||
* Copyright 2012-2022 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.springframework.boot.autoconfigure.kafka; | ||
|
||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.function.Function; | ||
|
||
import reactor.kafka.receiver.KafkaReceiver; | ||
import reactor.kafka.receiver.ReceiverOptions; | ||
import reactor.kafka.sender.KafkaSender; | ||
import reactor.kafka.sender.SenderOptions; | ||
|
||
import org.springframework.boot.autoconfigure.AutoConfiguration; | ||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; | ||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; | ||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; | ||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; | ||
import org.springframework.boot.context.properties.EnableConfigurationProperties; | ||
import org.springframework.boot.context.properties.PropertyMapper; | ||
import org.springframework.context.annotation.Bean; | ||
|
||
/** | ||
* {@link EnableAutoConfiguration Auto-configuration} for the Reactive client of Apache | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [NIT] We have a bit of inconsistent description of the "thing" we are doing. We call it several things, "Reactive client of Apache Kafka" here and slightly different things elsewhere in this PR - maybe consolidate on one? I like |
||
* Kafka. | ||
* | ||
* @author Almog Tavor | ||
* @since 2.7.0 | ||
*/ | ||
@AutoConfiguration | ||
@ConditionalOnClass({ KafkaReceiver.class, KafkaSender.class }) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be more direct to guard on the thing we are auto-configuring ( |
||
@ConditionalOnBean(KafkaProperties.class) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea |
||
@EnableConfigurationProperties(ReactiveKafkaProperties.class) | ||
public class ReactiveKafkaAutoConfiguration { | ||
|
||
private final KafkaProperties kafkaProperties; | ||
|
||
private final ReactiveKafkaProperties reactiveKafkaProperties; | ||
|
||
private static final PropertyMapper map = PropertyMapper.get(); | ||
|
||
public ReactiveKafkaAutoConfiguration(KafkaProperties kafkaProperties, | ||
ReactiveKafkaProperties reactiveKafkaProperties) { | ||
this.kafkaProperties = kafkaProperties; | ||
this.reactiveKafkaProperties = reactiveKafkaProperties; | ||
} | ||
|
||
@Bean | ||
@ConditionalOnMissingBean(ReceiverOptions.class) | ||
public <K, V> ReceiverOptions<K, V> receiverOptions() { | ||
Map<String, Object> properties = this.kafkaProperties.buildConsumerProperties(); | ||
ReceiverOptions<K, V> receiverOptions = ReceiverOptions.create(properties); | ||
ReactiveKafkaProperties.Receiver receiverProperties = this.reactiveKafkaProperties.getReceiver(); | ||
receiverOptions = setPropertyWhenGreaterThanZero(receiverProperties.getAtmostOnceCommitAheadSize(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [NIT-PREFERENCE] I would prefer to use the PropertyMapper API inline w/o the convenience methods below. There are many examples that do this same thing such as CassandraAutoConfiguration and JettyWebServerFactoryCustomizer |
||
receiverOptions::atmostOnceCommitAheadSize, receiverOptions); | ||
receiverOptions = setPropertyWhenGreaterThanZero(receiverProperties.getMaxDeferredCommits(), | ||
receiverOptions::maxDeferredCommits, receiverOptions); | ||
receiverOptions = setPropertyWhenGreaterThanZero(receiverProperties.getMaxCommitAttempts(), | ||
receiverOptions::maxCommitAttempts, receiverOptions); | ||
receiverOptions = setPropertyWhenGreaterThanZero(receiverProperties.getCommitBatchSize(), | ||
receiverOptions::commitBatchSize, receiverOptions); | ||
receiverOptions = setPropertyWhenNonNull(receiverProperties.getCloseTimeout(), receiverOptions::closeTimeout, | ||
receiverOptions); | ||
receiverOptions = setPropertyWhenNonNull(receiverProperties.getPollTimeout(), receiverOptions::pollTimeout, | ||
receiverOptions); | ||
receiverOptions = setPropertyWhenNonNull(receiverProperties.getCommitInterval(), | ||
receiverOptions::commitInterval, receiverOptions); | ||
receiverOptions = setPropertyWhenNonNull(receiverProperties.getSubscribeTopics(), receiverOptions::subscription, | ||
receiverOptions); | ||
if (Optional.ofNullable(receiverProperties.getSubscribeTopics()).isEmpty()) { | ||
receiverOptions = setPropertyWhenNonNull(receiverProperties.getSubscribePattern(), | ||
receiverOptions::subscription, receiverOptions); | ||
} | ||
return receiverOptions; | ||
} | ||
|
||
@Bean | ||
@ConditionalOnMissingBean(SenderOptions.class) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The bean type is redundant here as the return type of the method is the default. |
||
public <K, V> SenderOptions<K, V> senderOptions() { | ||
Map<String, Object> properties = this.kafkaProperties.buildProducerProperties(); | ||
SenderOptions<K, V> senderOptions = SenderOptions.create(properties); | ||
ReactiveKafkaProperties.Sender senderProperties = this.reactiveKafkaProperties.getSender(); | ||
senderOptions = map.from(senderProperties.getCloseTimeout()).toInstance(senderOptions::closeTimeout); | ||
senderOptions = setPropertyWhenGreaterThanZero(senderProperties.getMaxInFlight(), senderOptions::maxInFlight, | ||
senderOptions); | ||
senderOptions = map.from(senderProperties.isStopOnError()).toInstance(senderOptions::stopOnError); | ||
return senderOptions; | ||
} | ||
|
||
private <T> T setPropertyWhenGreaterThanZero(Integer property, Function<Integer, T> function, T options) { | ||
if (property <= 0) { | ||
return options; | ||
} | ||
return map.from(property).when((i) -> i > 0).toInstance(function); | ||
} | ||
|
||
private <S, T> T setPropertyWhenNonNull(S property, Function<S, T> function, T options) { | ||
if (Optional.ofNullable(property).isEmpty()) { | ||
return options; | ||
} | ||
return map.from(property).whenNonNull().toInstance(function); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[NIT] Wdyt about moving the dep to alpha-order by the other
io.projectreactor
or another option, closer tospring-kafka
? It seems out of place here.