Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add auto-configuration for Reactor Kafka #30567

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

almogtavor
Copy link

@almogtavor almogtavor commented Apr 6, 2022

Some notes:

  • This PR resolves Add support for Reactor Kafka #29080 and Provide auto-configuration for Spring for Apache Kafka's reactive support #18804.
  • I've put the Reactor Kafka autoconfiguration under the kafka hierarchy and not reactor.kafka since there is no other component that its reactive equivalent has its folder. This can also be easier when a so-called "common kafka autoconfigurations superclass" will be implemented. This can also be easier to maintain when changes will be needed to get implemented since both of the autoconfigurations will sit at the same place.
  • The common Kafka autoconfigurations superclass has not been implemented in this PR, and therefore ReactiveKafkaProperties has a static Properties class of its own.
  • This PR is not resolving the need for Spring Kafka when using Reactor Kafka, i.e. this issue. To resolve this we need to extract all of the common properties that both of the clients use and expose them to both of them.
  • Since basic fields like bootstrapServers are defined in KafkaProperties, ReactiveKafkaProperties is annotated with @ConditionalOnBean(KafkaProperties.class).
  • The autoconfiguration functions use the ReceiverOptions's & SenderOptions's setters with null checks\ greater than zero checks, and not just building of a class like ImmutableReceiverOptions since this class is internal and not public.
  • I've been creating customizer interfaces, but I wasn't sure about the classes that I should customize since the DefaultKafkaReceiver & DefaultKafkaSender are classified as internals.

There is no autoconfiguration for the scheduler, assignListeners, revokeListeners and schedulerSupplier fields.

  • Since reactor.kafka.sender.ImmutableSenderOptions is not public it is currently hard to pass fields like Scheduler since it is being read from the configuration file as a field of type Class<?>, which cannot be easily passed to senderOptions.scheduler().
  • assignListeners and revokeListeners seemed unnecessary to be configured from a YAML, and even if we'd wanted to, I think it'd be pretty hard to accomplish.
  • schedulerSupplier also seemed unnecessary to me for autoconfigurations.
  • I couldn't bind assignTopicPartitions due to problems with binding a String & int KV pair from the YAML file as a TopicPartition class.
  • I thought of throwing an error when no topics have been configured, but I gave up on this since it can damage people who have the autoconfiguration on the classpath but don't use it.

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Apr 6, 2022
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you auto-configure only options. Why there are no auto-configured beans for KafkaReceiver and KafkaSender? Or even their respective Spring for Apache Kafka wrappers: ReactiveKafkaConsumerTemplate and ReactiveKafkaProducerTemplate ?

Thanks for contribution anyway!

else {
Pattern subscribePattern = this.reactiveKafkaProperties.getReceiver().getSubscribePattern();
if (subscribePattern != null) {
receiverOptions = receiverOptions.subscription(subscribePattern);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider to use a PropertyMapper for this kind of complex configuration properties settings.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artembilan Great idea. As an alternative, I can perform something similar to:

ReactiveKafkaProperties.Receiver receiverProperties = this.reactiveKafkaProperties.getReceiver();
return receiverOptions
		.atmostOnceCommitAheadSize(receiverProperties.getAtmostOnceCommitAheadSize())
		.maxDeferredCommits(receiverProperties.getMaxDeferredCommits())
		.maxCommitAttempts(receiverProperties.getMaxCommitAttempts())
		.commitBatchSize(receiverProperties.getCommitBatchSize())
		.closeTimeout(receiverProperties.getCloseTimeout())
		.pollTimeout(receiverProperties.getPollTimeout())
		.subscription(receiverProperties.getSubscribePattern());

But this means I won't check anymore for nulls, and I'll set ReceiverOptions anyway. Do you think the nulls (and integer sanity value) check is really necessary here? I actually think we can give up on this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for delay.

I don't think it is OK to propagate them blindly.
I might not be interested in subscription, so I won't provide that receiverProperties.getSubscribePattern() and we will fail here with the Objects.requireNonNull(pattern).
Therefore PropertyMapper is better way to accept those props which are expected and valid.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

* the underlying Kafka consumer is not thread-safe, long poll intervals may delay
* commits and other operations invoked using
* {@link KafkaReceiver#doOnConsumer(java.util.function.Function)}. Very short
* timeouts may reduce batching and increase load on the broker.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The description is these configuration properties must be short and simple.
And it cannot be like a method Javadoc: we must not use any Javadoc tag and style in this descriptions.
This property indeed doesn't set anything.
See docs for more info: https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#features.developing-auto-configuration.custom-starter.configuration-keys

@almogtavor
Copy link
Author

@artembilan That's actually a great idea. I've auto configured options basically because that's what we agreed on the issue. That's can be pretty easy to migrate this for an auto configuration to the templates. Would you suggest to still keep the auto configuration for the options (in case people would like to customize stuff)?

@artembilan
Copy link
Member

Sure! I will leave that decision to @garyrussell since it looks like exactly he advised to auto-configure only options.
But the feature doesn't look complete without something what we already can use use for sending or received to/from Kafka.
I just had in mind over here that auto-configured KafkaTemplate and some infrastructure for consumer...

@garyrussell
Copy link
Contributor

I suggested configuring just the options because that is the biggest pain point - Boot already has mechanisms to configure the properties, which are identical to spring-kafka's; I don't see much benefit in creating the sender and receiver because they can't be reused and it only needs one more line for the user to write; so it doesn't save a lot of boilerplate.

In most cases

KafkaReceiver.create(options)
    .receive()
    . ...
    .subscribe()

The idea here was to get the minimal auto config and have the community request more as time goes by.

Those Reactive....Template are very lightweight wrappers and don't currently add much value; I hope to work on tighter integration between reactor-kafka and spring-kafka later this year; and then the auto configuration can evolve further.

Copy link
Contributor

@garyrussell garyrussell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really need to pull Producer, Consumer etc in KafkaProperties to their own class(es) and use them in both places; these represent native Kafka Properties.

@ConditionalOnMissingBean(ReceiverOptions.class)
public <K, V> ReceiverOptions<K, V> receiverOptions() {
Map<String, Object> properties = this.kafkaProperties.buildConsumerProperties();
properties.putAll(this.reactiveKafkaProperties.buildReceiverProperties());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this? It is adding reactor-kafka properties to the Kafka receiver properties; Kafka knows nothing about these properties.

@ConditionalOnMissingBean(SenderOptions.class)
public <K, V> SenderOptions<K, V> senderOptions() {
Map<String, Object> properties = this.kafkaProperties.buildProducerProperties();
properties.putAll(this.reactiveKafkaProperties.buildSenderProperties());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto - we shouldn't be adding these; they are reactor-kafka specific properties.

this.maxDeferredCommits = maxDeferredCommits;
}

public Map<String, Object> buildProperties() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need any of these build*Properties() methods; all the kafka specific properties are already handled by parts of KafkaProperties.

@almogtavor
Copy link
Author

We really need to pull Producer, Consumer etc in KafkaProperties to their own class(es) and use them in both places; these represent native Kafka Properties.

You'd like to do this in this PR or on another one? I think it would be better to get done on another PR since it changes KafkaProperties and other classes that are unrelated to this PR.

Also, @artembilan & @garyrussell, I've resolved your notes, you're welcome to review the PR once more.

@artembilan
Copy link
Member

We really need to pull Producer, Consumer etc in KafkaProperties to their own class

Isn't that going to be a breaking change? Therefore such a fix cannot be applied to the current Spring Boot 2.7.x and can be aimed only for the next major 3.0.
Yes, for the number of changes it really might be better to do that in the separate PR.

Do I miss anything else, @garyrussell ?

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise this is OK with me.

Thanks

*/
@AutoConfiguration
@ConditionalOnClass({ KafkaReceiver.class, KafkaSender.class })
@ConditionalOnBean(KafkaProperties.class)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess something like @AutoConfigureAfter(KafkaAutoConfiguration.class) is better otherwise we cannot predict the order of auto-configurations.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea

@garyrussell
Copy link
Contributor

Why would it be a breaking change?

The boot team does not consider KafkaProperties to be a public API; however, all I am suggesting is putting these classes in their own files so they can be used for both spring-kafka and reactor-kafaka producer and consumer properties.

@almogtavor
Copy link
Author

almogtavor commented Apr 20, 2022

@garyrussell the only change that there is to do is to move the KafkaProperties.Consumer and KafkaProperties.Producer into separate classes & adjust this right?

I wonder if it also means to move KafkaProperties.Ssl, KafkaProperties.Security that are being used by the consumer & producer. KafkaProperties.Jaas & bootstrapServers property also seem to be common for both reactive & non-reactive. Does all of what I've mentioned get moved into separate common classes?

@garyrussell
Copy link
Contributor

I would say so, yes.

But, I don't know enough about Boot's property documentation as to whether such a change would break things, so we need input from the Boot team.

@almogtavor
Copy link
Author

almogtavor commented Apr 20, 2022

@garyrussell I think this would only break native usage of the KafkaProperties, which shouldn't really happen. Anyway, would you prefer to do this on another PR?

@garyrussell
Copy link
Contributor

As I said

But, I don't know enough about Boot's property documentation as to whether such a change would break things, so we need input from the Boot team.

This has missed the 2.7.x train anyway so there is now plenty of time to resolve this.

@almogtavor
Copy link
Author

almogtavor commented Apr 26, 2022

@garyrussell Ok. I'll add the commits for the classes split to this PR in the following days

@wilkinsona
Copy link
Member

Thanks for the PR, @almogtavor. A few general comments:

  • I'm a bit confused by the apparent overlap or at least similarity between some of the existing properties and those that are being proposed here. For example spring.kafka.consumer.auto-commit-interval and spring.reactor.kafka.receiver.commit-interval, spring.kafka.properties.* and spring.reactor.kafka.properties.*
  • Given that there is no spring.reactor.kafka.consumer.properties.*, there seems to be an imbalance between spring.reactor.kafka.properties.* and spring.reactor.kafka.sender.properties.*
  • Given the reuse of the spring.kafka.producer.* and spring.kafka.consumer.* properties to configure Reactor Kafka, would spring.kafka.reactor be a better prefix as it brings the property names closer together?

I think things would benefit from taking a small step back and considering exactly how we want the receiver and sender options to be configured. Once that's been established, we can then map that into properties classes and auto-configuration.

@wilkinsona wilkinsona added the status: waiting-for-feedback We need additional information before we can continue label May 4, 2022
@onobc
Copy link
Contributor

onobc commented May 9, 2022

I would say so, yes.

But, I don't know enough about Boot's property documentation as to whether such a change would break things, so we need input from the Boot team.

If we decided to "share" the consumer/producer properties they would be moved out to separate classes and then each instance var to the Consumer/Producer would be marked w/ @NestedConfigurationProperty and the annotation processor should handle it well. A good example is ServerProperties.java.

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels May 9, 2022
@onobc
Copy link
Contributor

onobc commented May 9, 2022

Thanks for the PR, @almogtavor. A few general comments:

  • I'm a bit confused by the apparent overlap or at least similarity between some of the existing properties and those that are being proposed here. For example spring.kafka.consumer.auto-commit-interval and spring.reactor.kafka.receiver.commit-interval, spring.kafka.properties.* and spring.reactor.kafka.properties.*
  • Given that there is no spring.reactor.kafka.consumer.properties.*, there seems to be an imbalance between spring.reactor.kafka.properties.* and spring.reactor.kafka.sender.properties.*
  • Given the reuse of the spring.kafka.producer.* and spring.kafka.consumer.* properties to configure Reactor Kafka, would spring.kafka.reactor be a better prefix as it brings the property names closer together?

I think things would benefit from taking a small step back and considering exactly how we want the receiver and sender options to be configured. Once that's been established, we can then map that into properties classes and auto-configuration.

@wilkinsona I will reply w/ what I see in regards to the overlap of the properties and also stepping back and thinking about how we want to configure this thing....

ℹ️ I talk mostly of Receiver but the Sender follows the same vein of thought...

So, the pattern is that all of the properties in the "ReceiverOptions.properties" map and the key and value serializer property
are passed into the KafkaConsumer.

public <K, V> Consumer<K, V> createConsumer(ReceiverOptions<K, V> config) {
    return new KafkaConsumer<>(config.consumerProperties(),
                               config.keyDeserializer(),
                               config.valueDeserializer());
}

These are the only "true" overlapping ones. All of the other options are used outside of the KafkaConsumer in the reactive control layer. You can see the usage in ConsumerEventLoop.

I am guessing they chose not to keep the key/value consumer property map entry and the 1st class key/value serde properties in sync since they are both passed into the KafkaConsumer constructor. We could do the same, or sync them if we want.

I remember when we I used ReactorKafka in the past, it is confusing as to what "levers" to configure (the native Kafka consumer ones or the ReactorKafka ones) to get the behavior you want. The ref docs for ReactorKafka do talk about this IIRC. I think we should stay out of that business and simply let the 1st class options properties exist where they do and not try to consolidate/reconcile for example w/ the
spring.kafka.consumer.auto-commit-interval and spring.reactor.kafka.receiver.commit-interval.

Possible direction

To summarize what I think I am hearing and also what I think would be a good direction:

  • Prefix the RKP as 'spring.kafka.reactor'
  • Do not leverage the KP class in the RKP any longer
  • Break out Consumer/Producer properties into own class (leverage NestedConfigurationProperty) in KP and RKP
  • Add buildConsumer/ProducerProeprties in RKP to get common props such as bootstrap etc..
  • Mirror the consumer/producer properties directly as done in the receiver/sender options (aka don't try to reconcile similiar properties)

Tradeoff of the split approach is that the consumer properties need to be mapped multiple times if someone is
using both normal and reactive Kafka. I would think the "levers" would be slightly different between them anyways though.

An example yaml would look like:

spring:
  kafka:
    bootstrap-servers: localhost:9092   
    consumer:
      auto-offset-reset: earliest
      auto-commit-interval: 5s
    reactive:
      consumer:
        auto-offset-reset: latest
        auto-commit-interval: 3s

Thoughts?

@@ -188,6 +188,7 @@ dependencies {
optional("org.thymeleaf.extras:thymeleaf-extras-java8time")
optional("org.thymeleaf.extras:thymeleaf-extras-springsecurity6")
optional("redis.clients:jedis")
optional("io.projectreactor.kafka:reactor-kafka")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT] Wdyt about moving the dep to alpha-order by the other io.projectreactor or another option, closer to spring-kafka? It seems out of place here.

@@ -0,0 +1,37 @@
/*
* Copyright 2012-2020 the original author or authors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT] Copyrights need updating.

* {@code reactor.kafka.receiver.internals.DefaultKafkaReceiver} beans.
*
* @author Almog Tavor
* @since 2.7.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT] This ship has sailed :) - @since needs to be updated

import org.springframework.context.annotation.Bean;

/**
* {@link EnableAutoConfiguration Auto-configuration} for the Reactive client of Apache
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT] We have a bit of inconsistent description of the "thing" we are doing. We call it several things, "Reactive client of Apache Kafka" here and slightly different things elsewhere in this PR - maybe consolidate on one? I like ReactorKafka personally.

class ReactiveKafkaAutoConfigurationTests {

private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be simplified into single withConfiguration by passing both auto-configs into AutoConfigurations.of. That is the typical pattern I see in the project.

* @since 2.7.0
*/
@AutoConfiguration
@ConditionalOnClass({ KafkaReceiver.class, KafkaSender.class })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be more direct to guard on the thing we are auto-configuring (ReceiverOptions/SenderOptions) rather than something that happens to use them and is in the same library.

* @since 2.7.0
*/
@FunctionalInterface
public interface DefaultKafkaReceiverCustomizer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are only auto-configuring the Receiver/SenderOptions I think we should drop these customizers out of this proposal. They also are not currently being called nor tested.

Instead we could add the Receiver/SenderOptionsCustomizer called out in spring-cloud/spring-cloud-stream#2296. We could also just add the options customizers in a separate PR under the SCS ticket.

If we do any customizers in this PR though they will need to be invoked in an order manner in the AC and have tests for this.

* Additional properties, common to producers and consumers, used to configure the
* client.
*/
private final Map<String, String> properties = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These extra "properties" are not needed as we do not do anything w/ them.

Map<String, Object> properties = this.kafkaProperties.buildConsumerProperties();
ReceiverOptions<K, V> receiverOptions = ReceiverOptions.create(properties);
ReactiveKafkaProperties.Receiver receiverProperties = this.reactiveKafkaProperties.getReceiver();
receiverOptions = setPropertyWhenGreaterThanZero(receiverProperties.getAtmostOnceCommitAheadSize(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[NIT-PREFERENCE] I would prefer to use the PropertyMapper API inline w/o the convenience methods below. There are many examples that do this same thing such as CassandraAutoConfiguration and JettyWebServerFactoryCustomizer

}

@Bean
@ConditionalOnMissingBean(SenderOptions.class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bean type is redundant here as the return type of the method is the default.

@wilkinsona
Copy link
Member

Thanks for sharing your thoughts, @onobc.

and not try to consolidate/reconcile for example w/ the
spring.kafka.consumer.auto-commit-interval and spring.reactor.kafka.receiver.commit-interval.

I'm not too comfortable with that. Once Boot surfaces the properties, IDEs will offer auto-completion for them which increases the chances of two similarly named properties causing confusion for users and being difficult to use as a result. If the two properties serve the same purpose, I think they should be consolidated into a single property. If the two properties serve different purposes, I think they should be renamed. In either case, I suspect some changes in Spring Kafka and/or Reactor Kafka will be necessary so that the property or properties map onto similar settings.

@onobc
Copy link
Contributor

onobc commented May 10, 2022

If the two properties serve the same purpose, I think they should be consolidated into a single property. If the two properties serve different purposes, I think they should be renamed

@wilkinsona that makes sense. So a deeper level of understanding of these "overlapping" properties is required to understand what to do for each one (consolidate, rename, leave-alone, etc.). I will try to take a scan today and get a list of these properties so we can start discussing.

Other than the few questionable properties, wdyt of the other points:

  • Prefix the RKP as 'spring.kafka.reactor'
  • Do not leverage the KP class in the RKP any longer
  • Break out Consumer/Producer properties into own class (leverage NestedConfigurationProperty) in KP and RKP
  • Add buildConsumer/ProducerProeprties in RKP to get common props such as bootstrap etc..
  • Mirror the consumer/producer properties directly as done in the receiver/sender options (aka don't try to reconcile similiar properties)

@wilkinsona
Copy link
Member

wilkinsona commented May 10, 2022

I think spring.kafka.reactor makes sense as the property prefix. For the others, I think it's too soon to say. I think we need to know what we want the auto-configuration to offer before we spend any more time thinking about the precise details. What's offered should be defined as the beans that will be auto-configured and the properties that will be available to control their configuration. Once we know that, we can figure out how to implement it.

Let's continue the discussion on #29080 rather than here please. Once it's reached a conclusion we can decide if this PR is roughly applicable or if a different approach is needed.

@wilkinsona wilkinsona added status: on-hold We can't start working on this issue yet and removed status: feedback-provided Feedback has been provided status: waiting-for-triage An issue we've not yet triaged labels May 10, 2022
@garyrussell
Copy link
Contributor

Just for another data point - see #17420 and the discussion on the linked PR.

The number of Kafka properties are overwhelming; when we first added auto config, we picked a subset of properties to be first class, as discussed here: https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka.additional-properties

Specifically,

Spring Boot auto-configuration supports all HIGH importance properties, some selected MEDIUM and LOW properties, and any properties that do not have a default value.

We have added others over time at user request (such as isolation level on that linked PR).

Some kind of code generation of the properties from the kafka-clients *Config classes would be ideal.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: on-hold We can't start working on this issue yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for Reactor Kafka
6 participants