Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Reactor Kafka #29080

Open
thepaep opened this issue Dec 16, 2021 · 47 comments · May be fixed by #30567
Open

Add support for Reactor Kafka #29080

thepaep opened this issue Dec 16, 2021 · 47 comments · May be fixed by #30567
Labels
status: pending-design-work Needs design work before any code can be developed type: enhancement A general enhancement
Milestone

Comments

@thepaep
Copy link

thepaep commented Dec 16, 2021

As said in reactor/reactor-kafka#100 (comment) native support for Reactor Kafka would be nice to lots of users. Spring Boot currently supports WebFlux, but auto-configuration and more capabilities aren't supported yet with Reactor Kafka. As others said in different issues, I also think the connection between Reactor Kafka and Spring Boot should be more documented and supported since it's the most basic usage of Reactor Kafka. There is currently no official way of using Reactor Kafka with Spring Boot, which is pretty odd.

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Dec 16, 2021
@snicoll snicoll changed the title Reactor Kafka support Add support for Reactor Kafka Dec 16, 2021
@lynch19

This comment has been minimized.

@lynch19
Copy link

lynch19 commented Dec 18, 2021

@snicoll @simonbasle @garyrussell any help with this?

Can someone describe what features are currently missing on this subject and how should they be implemented (what functionalities should be added and where), so it'll be easier for contributors to help with this?

@roger751

This comment has been minimized.

@snicoll
Copy link
Member

snicoll commented Dec 18, 2021

@lynch19 @roger751 please use the reaction on the original description rather than +1 comments like this.

@garyrussell
Copy link
Contributor

@lynch19 reactor-kafka has two fundamental properties objects

https://github.com/reactor/reactor-kafka/blob/main/src/main/java/reactor/kafka/receiver/ReceiverOptions.java

https://github.com/reactor/reactor-kafka/blob/main/src/main/java/reactor/kafka/sender/SenderOptions.java

Each with a number of properties.

In addition, creation of these takes a Map<String, Object> of regular Kafka Properties (similar to the ...properties extension for the spring-kafka auto configuration).

There are several levels of Boot auto configuration that would be useful, with the MVP being the auto-configuration of these two beans using application properties.

@AntonLGVS
Copy link

I'm using the reactive kafka in my applications. The Reactor kafka provides everything you need to work. Or does it means support in the form of annotations?
For example:

@ReactiveKafkaListener(topic = "smth", autoCommit=true)
public Mono<Void> consumer(DTO dto) {
    log.info("received dto: " + dto);
    return service.call(dto).then();
}

@garyrussell
Copy link
Contributor

No; not at all; this is just about configuring the sender and receiver options via Boot properties.

Such a mechanism would belong in Spring for Apache Kafka (spring-kafka), but there are currently no plans to do so.

@garyrussell
Copy link
Contributor

I would suggest something like package org.springframework.boot.autoconfigure.reactor.kafka in the spring-boot-autoconfigure project.

We can leverage the existing KafkaProperties for the consumer and producer properties passed into the receiver and sender options.

@wilkinsona
Copy link
Member

wilkinsona commented Jan 4, 2022

This has been requested previously (#18751) but was declined as the Reactor team advised us against adding support. IIRC, this was due to the status of Reactor's Kafka support at the time and things may well have moved on since then.

@simonbasle, what's you take on this now please? Has this moved on sufficiently in the last couple of years that this is now worth considering again?

@simonbasle
Copy link

@wilkinsona with the broad changes in 1.3.x late 2020, reactor-kafka has been made more stable and maintainable. thanks to @garyrussell and @artembilan, the project is more actively maintained. thus I would defer to gary and artem regarding that decision, but it definitely looks better than back in 2019

@garyrussell
Copy link
Contributor

I concur; it is in much better shape now, thanks to some significant work by Sergie back then. Also, due to community requests, the Spring Cloud Stream team are likely to incorporate it in the next major release and basic auto configuration of the sender and receiver properties will make things easier there too.

@almogtavor
Copy link

@garyrussell what will it exactly demand? Only ReactiveKafkaAutoConfiguration, ReactiveKafkaAnnotationDrivenConfiguration (that will use KafkaProperties\ ReactiveKafkaProperties), ReceiverOptionsCustomizer and SenderOptionsCustomizer classes? Or is there anything else that needs to be noted?

We can leverage the existing KafkaProperties for the consumer and producer properties passed into the receiver and sender options.

Do you mean that we should use the existing KafkaProperties class, right?

@garyrussell
Copy link
Contributor

@almogtavor There would be no ReactiveKafkaAnnotationDrivenConfiguration - there are no annotations in that project.

As I said above, I'd say the MVP would be a ReactiveKafkaAutoConfiguration creating 2 beans SenderOptions and ReceiverOptions.

For example

@Bean
ReceiverOptions receiverOptions(KafkaProperties kp, ReactiveKafkaProperties rkp) {
    ReceiverOptions opts = ReceiverOptions.create(kp.buildConsumerProperties());
    /// apply rkps.getReceiver() properties here
    return opts;
}

Calling options customizers (if configured) before returning might be a nice addition, but not really a requirement because the user can further customize the properties where used to create a receiver/sender - since the options are immutable, the base options can be altered in different ways for each usage. A object is created each time a property is added.

@almogtavor
Copy link

almogtavor commented Jan 8, 2022

@garyrussell Seems great. Is there any need for using KafkaProperties? Seems that we can easily auto-configure all of the ReceiverOptions and SenderOptions parameters.

@garyrussell
Copy link
Contributor

garyrussell commented Jan 10, 2022

KafkaProperties.Consumer and ....Producer encompass common Kafka consumer and producer properties, as well as a general .properties node, (passed into the create() methods as in my example above). We don't want to duplicate all of those properties here.

@almogtavor
Copy link

almogtavor commented Jan 10, 2022

Notice that there are not a lot of common parameters between the two (KafkaProperties.Consumer and KafkaProperties.Receiver have only 3 parameters in common). There are also some caveats I'd like you to take a look at:

  • Provide auto-configuration for Reactor Kafka #18751 won't get solved (since we still need to use KafkaProperties.
  • If some of the properties will be of KafkaProperties and some of ReactiveKafkaProperties, users will have to auto-configure some parameters with spring.kafka.whatever, and some with spring.reactor.kafka.whatever. I don't see here any other option.

@garyrussell
Copy link
Contributor

Notice that there are not a lot of common parameters between the two (KafkaProperties.Consumer and KafkaProperties.Receiver have only 3 parameters in common). There are also some caveats I'd like you to take a look at:

I am not talking about those properties, I am specifically talking about the ConsumerConfig and ProducerConfig kafka-clients properties (which are handled within KafkaProperties - some being first class properties that boot knows about, but other generic properties; see https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka.additional-properties

Ideally, that part of KafkaProperties will be pulled out into a common super class.

@almogtavor
Copy link

Agree about the common super class. Is this a thing you'd want later on or at the very first PR?

@almogtavor almogtavor linked a pull request Apr 7, 2022 that will close this issue
@almogtavor
Copy link

@garyrussell I've raised a PR.

@wilkinsona wilkinsona added type: enhancement A general enhancement status: pending-design-work Needs design work before any code can be developed and removed status: waiting-for-triage An issue we've not yet triaged labels May 10, 2022
@wilkinsona wilkinsona added this to the 3.x milestone May 10, 2022
@onobc
Copy link
Contributor

onobc commented May 10, 2022

As a continuation of the discussion in the PR...

For the others, I think it's too soon to say. I think we need to know what we want the auto-configuration to offer before we spend any more time thinking about the precise details. What's offered should be defined as the beans that will be auto-configured and the properties that will be available to control their configuration

I believe we want to auto-configure only the ReceiverOptions and SenderOptions in the initial offering.

@garyrussell @artembilan @almogtavor is this your understanding as well?

and the properties that will be available to control their configuration

The properties would be the 1st class properties offered by the Receiver/Sender options as well as the ability to specify the normal consumer/producer properties currently available in KafkaProperties.Consumer/Producer. It does sound like we may only want to offer a subset of those. And there is also a question of "overlapping" properties to figure out.

@onobc
Copy link
Contributor

onobc commented May 10, 2022

As a continuation from this comment on the PR...

The number of Kafka properties are overwhelming; when we first added auto config, we picked a subset of properties to be first class, as discussed here: https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka.additional-properties

Specifically,

Spring Boot auto-configuration supports all HIGH importance properties, some selected MEDIUM and LOW properties, and any properties that do not have a default value.

We have added others over time at user request (such as isolation level on that linked PR).

@garyrussell are you suggesting we should drop some of the properties in the current PR? I think the number of primitive properties on the ReceiverOptions (the ones chosen in the associated PR) are not too overwhelming. The more complicated props can be adjusted as needed in the options customizer that will be added shortly after.

Some kind of code generation of the properties from the kafka-clients *Config classes would be ideal.

Yes, this would be nice.

@garyrussell
Copy link
Contributor

garyrussell commented May 10, 2022

I believe we want to auto-configure only the ReceiverOptions and SenderOptions in the initial offering.

That was my proposal for the MVP above, yes:

#29080 (comment)

For the "overlapping" properties (for spring-kafka), the hierarchy is as follows (e.g. for the consumer properties).

  1. Top level generic spring.kafka.properties
  2. First class boot-recognized properties
  3. Consumer generic ...consumer.properties

i.e. with:

spring.kafka.properties.auto.offset.reset=...
spring.kafka.consumer.auto-offset-reset=...
spring.kafka.consumer.properties.auto.offset.reset-...

the last one would win.

drop some of the properties in the current PR?

I am not sure what you mean; I am suggesting using the Producer and Consumer (from KafkaProperties) to build the map for ReceiverOptions.create() and expose the other methods that take simple values as Boot properties.

For complex types, for example, users can grab the auto configured ReceiverOptions and add listeners to it.

Given that the default implementation of ReceiverOptions is immutable, each use can "modify" the auto wired bean without affecting other uses.

@Autowired
ReceiverOptions ro;

...

ReceiverOptions one = ro.addAssignListener(...).subscription(List.of("topic1"));
ReceiverOptions two = ro.addAssignListener(...).subscription(List.of("topic2"));

one and two will be different objects.

@onobc
Copy link
Contributor

onobc commented May 10, 2022

For the "overlapping" properties (for spring-kafka), the hierarchy is as follows (e.g. for the consumer properties)

@garyrussell

I agree w/ the property hierarchy and precedence as you outlined above. The "overlapping" ones I was referring to are the ones that are named similar but may or may not be the same thing (eg. consumer.auto-commit-interval and receiver option's commit-interval which is not passed into the consumer).

I am not sure what you mean; I am suggesting using the Producer and Consumer (from KafkaProperties) to build the map for ReceiverOptions.create() and expose the other methods that take simple values as Boot properties.

I am suggesting the same thing. My comment around the "drop some of the properties" I was referring to the 1st class boot properties in the current proposal. I was not sure if you were suggesting there were currently too many of them and we should pick the high priority ones. It sounds like that is not what you are saying though. Sorry for the confusion.

For complex types, for example, users can grab the auto configured ReceiverOptions and add listeners to it.

Given that the default implementation of ReceiverOptions is immutable, each use can "modify" the auto wired bean without affecting other uses.

Are you suggesting we do not add the Sender/ReceiverOptionCustomizers or that we can use this technique until we do add the customizers?

@garyrussell
Copy link
Contributor

On second thought, I suppose customizers would still make sense - e.g. for an app that creates multiple receivers but wants to add the same assignment listener to them all.

@onobc
Copy link
Contributor

onobc commented May 12, 2022

  • RKP = ReactorKafkaProperties
  • KP = KafkaProperties

So I dug into each ReactorKafka ReceiverOptions property. Here is what I found:

TL;DR

The only consumer properties that needs to be considered as duplicate/overlapping at this point are:

  • ReceiverOptions.commitInterval
  • KafkaProperties.autoCommitInterval
  • and possibly KafkaProperties.enableAutoCommit

They are not the same exact property but could be used to control the same underlying concept. More details below.

Details

ReceiverOptions properties

ignore (we are not surfacing these - customizer only)

  • assignTopicPartitions
  • schedulerSupplier
  • assignListeners
  • revokeListeners
  • Deserializers
    • keyDeserializer
    • valueDeserializer
      These are passed to the KafkaConsumer but are expected to already be configured and configure()
      will not be called on them. So they are additive to the KP ones and not a problem here.
      We are not surfacing the in the RKP anyways but I wanted to detail what I found.

Unique to RK only

  • subscribeTopics
  • subscribePattern
  • pollTimeout - the timeout for each {@link KafkaConsumer#poll(long)} operation. Closest setting is
    spring.kafka.listener.pollTimeout
  • closeTimeout - timeout for graceful shutdown of {@link KafkaConsumer}. No equivalent in KP

Auto-commit related

So these were the ones that seemed likely overlapping

  • commitInterval - commit interval for automatic commits
  • commitBatchSize - commit batch size for automatic commits

ℹ️ If commit interval and commit batch size are configured, a commit operation is scheduled when either the interval or batch size is reached

ℹ️ The KP has the following similar properties but always sets enableAutoCommit to false and controls this in the Reactive API

  • enableAutoCommit
  • autoCommitInterval - frequency to commit if above is set to true
Unique to RK only

While these are commit related, I believe these have no equivalent in KP

  • maxCommitAttempts - max num consecutive non-fatal commit failures tolerated
  • commitRetryInterval - how long to wait before retry
  • maxDeferredCommits - max out-of-order commits

Needs more digging

I think this is RKP only but need to dig a bit more to see what KP options are available for out-of-order-commits

  • atmostOnceCommitAheadSize - commit ahead size per partition for at-most-once delivery

@garyrussell
Copy link
Contributor

The only consumer properties that needs to be considered as duplicate/overlapping at this point are:

ReceiverOptions.commitInterval
KafkaProperties.autoCommitInterval
and possibly KafkaProperties.enableAutoCommit

This "overlap" is by name only; unfortunate, but true.

The latter two control whether the kafka-clients automatically commits the offsets on a schedule.

The first one is the interval used by reactor kafka to automatically commit offsets.

Users should not enable both mechanisms; in spring-kafka, we disable enable.auto.commit by default, because it takes care of the commits in a more deterministic fashion.

reactor-kafka also disables it - see ImmutableReceiverOptions (ctor).

this.properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

@garyrussell
Copy link
Contributor

We should consider removing those properties from KafkaProperties - spring-kafka prefers them not to be set and reactor-kafka ignores them.

@onobc
Copy link
Contributor

onobc commented May 12, 2022

Users should not enable both mechanisms; in spring-kafka, we disable enable.auto.commit by default, because it takes care of the commits in a more deterministic fashion.

reactor-kafka also disables it - see ImmutableReceiverOptions (ctor).

this.properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

Agreed @garyrussell - that is what I was alluding to here as well with these comments:

They are not the same exact property but could be used to control the same underlying concept. More details below.

ℹ️ The KP has the following similar properties but always sets enableAutoCommit to false and controls this in the Reactive API

If they are not desired in spring-kafka either, it seems like a great thing to remove (as you suggested).

@onobc
Copy link
Contributor

onobc commented May 13, 2022

  • RKP = ReactorKafkaProperties
  • KP = KafkaProperties

So I dug into each ReactorKafka SenderOptions properties. Here is what I found:

TL;DR

These are much more straight forward than the consumer/receiver properties.

Properties unique to RKP

  • closeTimeout - timeout for graceful shutdown of sender
  • maxInFlight - max num in-flight records fetched from the outbound record publisher while acks are pending
  • stopOnError - indicates if a send op should be terminated when error encountered

ignore (not surfacing via props - customizer only)

  • scheduler - the scheduler used for publishing send results
  • Serializers
    • keySerializer
    • valueSerializer
      These are passed to the KafkaProducer directly and are expected to already be configured and configure()
      will not be called on them. So they are additive to the KP ones and not a problem here.
      We are not surfacing the in the RKP anyways but I wanted to detail what I found.

@onobc
Copy link
Contributor

onobc commented May 13, 2022

Based on the above analysis of the KafkaProperties and the Sender/ReceiverOptions of ReactorKafka, here is a suggested list of properties as well as a suggested "layout" of them. I think we can use this as a starting point of how to map them out.

spring:
  kafka:
    reactor:

      # common properties (just like KafkaProperties)
      bootstrap-servers: localhost:9093
      ssl: ...
      security: ...
      properties: ...
      client.id: fooDemoApp

      # Producer class - pass to Kafka producer props (aka SenderOptions.properties)
      producer:
        key-serializer: org.apache.kafka.common.serialization.LongSerializer
        value-serializer: com.example.FooSerializer
        buffer-memory: 32MB # used in tandem w/ max-in-flight
        ...

      # SenderOptions props
      sender:
        close-timeout: 5m
        max-in-flight: 256
        stop-on-error: false

      # Consumer class - pass to Kafka consumer props (aka ReceiverOptions.properties)
      consumer:
        auto-offset-reset: earliest
        key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
        value-deserializer: com.example.FooDeserializer
        ...
        # DELETE enable-auto-commit:
        # DELETE auto-commit-interval:

      # ReceiverOptions props
      receiver:
        subscribe-topics:
        subscribe-pattern:
        poll-timeout:
        close-timeout:
        commit-interval:
        commit-batch-size:
        max-commit-attempts:
        commit-retry-interval:
        max-deferred-commits:
        atmost-once-commit-ahead-size:

This assumes that no properties will be used for ReactorKafka directly under spring.kafka. but rather the properties for RK start under spring.kafka.reactor.. Do we want to allow regular and reactor spring-kafka to share properties under spring.kafka.? I could see possibly wanting to do that for the list of current common properties

  • bootstrap-servers: localhost:9093
  • ssl: ...
  • security: ...
  • properties: ...
  • client.id: fooDemoApp

A benefit of not sharing the KP producer/consumer props (the ones under spring.kafka.producer|consumer) and mapping them instead separately under spring.kafka.reactor.producer|consumer w/ RK is that both regular and reactor variants can co-exist and be configured independently and not worry about one change affecting the other one.

@garyrussell
Copy link
Contributor

I would vote for keeping them separate.

@almogtavor
Copy link

@onobc @garyrussell Any concrete decision taken?

@onobc do you suggest implementing one CommonKafkaProperties and just autoconfigure it with both aliases? Or would you prefer to actually copy-paste it?

@onobc
Copy link
Contributor

onobc commented Jun 19, 2022

Hi @almogtavor ,

Due to competing priorities we have not had a chance to come to an agreement on the approach. More than likely we will close this PR and will revisit in a subsequent proposal. I will be sure we tag you when that happens to keep you in the loop. Thank you for this initial contribution.

@almogtavor
Copy link

almogtavor commented Jun 19, 2022

@onobc Don't you think we can meanwhile lean on the current PR since it delivers the issue, and just migrate paths to the ideal solution when the time comes? The current PR doesn't include changes to Spring-Kafka's autoconfigurations, so that'll only be an additional feature. It just feels like it's going to take lots of time for some pretty semantic changes. After all, merging an initial and basic version will still help to prettify the usage of Reactor Kafka with Spring Boot. Wdyt?

@wilkinsona
Copy link
Member

wilkinsona commented Jun 20, 2022

@almogtavor Unfortunately, I don't think we should do that. Until we know exactly what direction we want to take, merging something may set us off in the wrong direction. If we don't have a chance to course correct before 3.0 is released later this year, backwards compatibility will then limit our options. We want to avoid that by giving ourselves enough time to maximise our chances of getting it right first time round.

@almogtavor
Copy link

almogtavor commented Jun 22, 2022

@wilkinsona makes sense.

@influence160
Copy link

will the methods annotated @ReactiveKafkaListener supports @RetryableTopic ?

@garyrussell
Copy link
Contributor

There is no such thing as @ReactiveKafkaListener.

Also @RetryableTopic is a feature of spring-kafka, not reactor-kafka so, no.

@ashish-8f
Copy link

ashish-8f commented Sep 22, 2022

Hi @garyrussell any plan when we'll be getting @Listener support for Reactor-Kafka and other Annotations similar to Spring-Kafka ? If not ready yet then any future plan for the same ?

Thanks

@garyrussell
Copy link
Contributor

There are no such plans; an evaluation resulted in there not being much value add that spring-kafka can provide over reactor-kafka.

@ashish-8f
Copy link

There are no such plans; an evaluation resulted in there not being much value add that spring-kafka can provide over reactor-kafka.

So, If I want to implement reactor-kafka then I have only below referral doc as of now?

https://projectreactor.io/docs/kafka/release/reference/

@garyrussell
Copy link
Contributor

Yes.

@ahmedalnuaimi
Copy link

There are no such plans; an evaluation resulted in there not being much value add that spring-kafka can provide over reactor-kafka.

Why do you think there is no value in that? Isn't the difference similar to the one between WebMVC and WebFlux (i.e. blocking vs non-blocking?)

@garyrussell
Copy link
Contributor

That is not a useful comparison. If you feel spring-kafka can provide some useful value add over reactor-kafka, please start a discussion with your thoughts over there. https://github.com/spring-projects/spring-kafka/discussions

Let's not continue here.

I am open to ideas, but nobody has provided a compelling reason to do anything so far.

@ahmedalnuaimi
Copy link

@almogtavor
Copy link

@wilkinsona @garyrussell any reason for this to not get solved for so long? There are already 3 working PRs on the subject

@wilkinsona
Copy link
Member

@almogtavor Thanks for your patience. That there are 3 working PRs is largely why nothing's happened. We know we want to do something here but as of yet we haven't managed to identify exactly what we want to do. We have a number of competing priorities at the moment and adding support for Reactor Kafka hasn't yet made it to the top of the list. Unfortunately, that's unlikely to change until we're able to carve out a block of time to work on this and figure out exactly what needs to be done.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: pending-design-work Needs design work before any code can be developed type: enhancement A general enhancement
Projects
None yet
Development

Successfully merging a pull request may close this issue.