Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a sample app with Spring Kafka #364

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open

Conversation

dsyer
Copy link
Contributor

@dsyer dsyer commented Mar 26, 2021

If you use Spring Cloud Function 3.1.3 then Kafka should work
with Spring Cloud Streams out of the box already. This sample
adds support for vanilla Spring Kafka with @KafkaListener where
the listener can listen for and emit CloudEvent. Some issues
with existing messaging support came to light and these have been
ironed out in the process.

@dsyer
Copy link
Contributor Author

dsyer commented Mar 26, 2021

Closes #359

@dsyer
Copy link
Contributor Author

dsyer commented Mar 29, 2021

I don't know what's going on with Github actions. Doesn't look like anything I did?

@slinkydeveloper
Copy link
Member

@dsyer I'm looking at it, it seems related to this PR, given it doesn't fail here: https://github.com/cloudevents/sdk-java/pull/363/checks?check_run_id=2226508723#step:4:6464

The javadoc tool is failing because it can't find a class I guess? https://github.com/cloudevents/sdk-java/pull/364/checks?check_run_id=2216090212#step:4:2680

Can you repro locally running mvn verify?

@dsyer
Copy link
Contributor Author

dsyer commented Mar 30, 2021

Thanks for looking. It fails for me also locally, so I guess that's a problem, but I don't know how to solve it. I mean, the Kafka classes are on the classpath for sure, so what's the problem exactly?

@dsyer
Copy link
Contributor Author

dsyer commented Mar 30, 2021

I think it's a bug in the Maven javadoc plugin. I'll see if I can find a workaround, or maybe bump the plugin version or something.

If you use Spring Cloud Function 3.1.3 then Kafka should work
with Spring Cloud Streams out of the box already. This sample
adds support for vanilla Spring Kafka with `@KafkaListener` where
the listener can listen for and emit `CloudEvent`. Some issues
with existing messaging support came to light and these have been
ironed out in the process...

Signed-off-by: Dave Syer <dsyer@vmware.com>
@dsyer
Copy link
Contributor Author

dsyer commented Mar 30, 2021

Update: the workaround was to add <source>8</source> to the Javadoc plugin config.

@slinkydeveloper
Copy link
Member

@dsyer I have a super noob spring question here: why do we need this? We already implement serializers and deserializers for Kafka: https://github.com/cloudevents/sdk-java/tree/master/kafka. Isn't Spring using the Kafka Serializer/Deserializer interfaces already?

@dsyer
Copy link
Contributor Author

dsyer commented Apr 1, 2021

I guess there's nothing stopping you from using the existing serdes, but Spring is more flexible with listener method signatures so you won't get the full benefit. The sample wouldn't work if you used only cloudevents-kafka. Bits of it might, but, for example the DemoApplicationTests has a @KafkaListener that listens for Message<byte[]>, so that will fail because cloudevents-kafka only does conversion to/from CloudEvent.

I also made a design decision in the cloudevents-spring components that Message headers (in Spring) would always be "canonical", so the producers and consumers don't need to be aware of the fact that they are handling messages from Kafka. There's no logic like that in the existing cloudevents-kafka module. We could/should do the same thing with RabbitMQ (I'll send another PR if you want).

@slinkydeveloper
Copy link
Member

but Spring is more flexible with listener method signatures so you won't get the full benefit

@dsyer I see your point, but IMHO this is not a good reason, at least for me, to have this additional code to maintain that, at the end of the day, does the same thing of another part of this project. From my understanding (but I might be wrong here) you could just swap the serializers here https://github.com/cloudevents/sdk-java/pull/364/files#diff-e0e93e8e5ff2f3d2d6955abaa1a4bd104e93d4fbe49fae37798cbe96cd6e9fcaR2 with the ones provided by cloudevents-kafka and you should be good to go.

Bits of it might, but, for example the DemoApplicationTests has a @KafkaListener that listens for Message<byte[]>, so that will fail because cloudevents-kafka only does conversion to/from CloudEvent.

In that case, I think the test code has to be modified to use the byte deserializer in the listener instance.

I also made a design decision in the cloudevents-spring components that Message headers (in Spring) would always be "canonical", so the producers and consumers don't need to be aware of the fact that they are handling messages from Kafka.

I don't understand this particular point, which might be the reason why I'm confused to see this PR. Don't you have the Message interface exactly to avoid to develop this https://github.com/cloudevents/sdk-java/pull/364/files#diff-4afe376db76bf0c16a50ca0c7fe89ae37fb7c0ebba62c475157e1b6e59a61a20 for every transport? Isn't there something in spring that handles Message <-> ConsumerRecord so users will just use the Message <-> CloudEvent converters we already have?

@dsyer
Copy link
Contributor Author

dsyer commented Apr 6, 2021

In that case, I think the test code has to be modified to use the byte deserializer in the listener instance.

That's not the purpose of this test. The purpose is to get the raw Kafka message and look at its raw content to verify it independent of any custom serdes.

Isn't there something in spring that handles Message <-> ConsumerRecord so users will just use the Message <-> CloudEvent converters we already have?

Yes, probably, but then you'd have to build the knowledge of Kafka header name conventions into the converters we already have (and they so far didn't need to know about that). The same thing will happen if we want to use RabbitMQ with the amqp_ header prefix. I don't mind refactoring to do this, but I thought it was better to keep the Kafka knowledge in a separate class.

@slinkydeveloper
Copy link
Member

Yes, probably, but then you'd have to build the knowledge of Kafka header name conventions into the converters we already have (and they so far didn't need to know about that). The same thing will happen if we want to use RabbitMQ with the amqp_ header prefix. I don't mind refactoring to do this, but I thought it was better to keep the Kafka knowledge in a separate class.

To me this prefix problem sounds either:

  • a problem that has to be solved at spring level
  • a problem that has to be solved on the cloudevent message converter where, given the protocol source, allows you to infer what's the separator to expect for cloudevent headers

But for sure having a single encoder/decoder for each transport doesn't sound like a good approach to me, given it's also not scalable...

@dsyer
Copy link
Contributor Author

dsyer commented Apr 6, 2021

So where are we going with this? Isn't cloudevents-kafka already basically a "single encoder/decoder for each transport"?

Anyway, here's a branch using the serdes from cloudevents-kafka: https://github.com/dsyer/sdk-java/tree/kafka-only. If you prefer that approach I can live with it (but it drops the idea of a "canonical" header).

@olegz
Copy link
Contributor

olegz commented Apr 6, 2021

FWIW, attribute prefixes are only relevant on the edges while in the application space they carry no value, hence having canonical or no-prefix (which is a form of canonical) would address consistency in dealing with CE. This is especially relevant when sources and targets are different (e.g., from HTTP to Kafka). In any case user must NOT care where the CE came from or where it goes when it comes to application code. That should be framework's responsibility
So in the native Spring Message-based support for CE we already take care of that while using canonical ce- prefix primarily to not interfere with few native Spring Message headers (i.e., id and timestamp), otherwise we could have drop canonical prefix all together.

@slinkydeveloper
Copy link
Member

slinkydeveloper commented Apr 7, 2021

Isn't cloudevents-kafka already basically a "single encoder/decoder for each transport"?

It is, but my understanding of the reasoning behind cloudevents-spring was to originally implement just the Message encoder/decoder to have a single encoder/decoder for the whole spring framework. But what I see instead is a growing number of encoders for every single transport/framework component, and this PR also add another encoder for a specific transport we already support and spring can support. That's why I'm puzzled to see this PR, and that's why I'm worried about maintain all this code 😄.

I wonder if we can somehow (and maybe this requires some changes on the spring side) use always the https://github.com/cloudevents/sdk-java/blob/master/spring/src/main/java/io/cloudevents/spring/messaging/CloudEventMessageConverter.java, where the fromMessage and toMessage signatures contains also the indication of the transport where the message is going. This way you can easily find out the attribute prefixes when encoding/decoding, asking users to just register a single encoder/decoder and then, as @olegz sad, let the framework figure out the rest.

I understand how the solution of using cloudevents-kafka is not very Spring idiomatic, but the solution of adding encoders/decoders for each transport is not scalable nor maintainable either, nor user friendly, because for every transport the user has to add the proper encoder/decoder of the very same type, so I strongly suggest we find out how to reuse CloudEventMessageConverter for every transport and deal with the attributes prefix issue.

@dsyer
Copy link
Contributor Author

dsyer commented Apr 20, 2021

I don't think we're on the same page here yet. I dislike the idea of adding encoders/decoders for every new transport, but that's the current design of Cloud Events and the sdk (cloudevents-kafka being the example in question here). I would love to use one CloudEventMessageConverter for everything, but it would have to know about all the transports would it not?

Did you look at the "kafka-only" branch? Is that closer to what you like (using the cloudevents-kafka serdes)? The CE headers end up inside the CloudEvent so the user can ignore the non-canonical transport-specific headers. That seems to be what you were asking for, so I don't know what to do to move this forward if that's not it.

@slinkydeveloper
Copy link
Member

slinkydeveloper commented Apr 21, 2021

I would love to use one CloudEventMessageConverter for everything, but it would have to know about all the transports would it not?

Yes, but you don't need to know the transport details at all. You just need to map "protocol xyz" to its prefix, if any (some protocol bindings don't even have prefix). You could just provide in the MessageConverter a string identifying the id of the protocol, like:

public Object fromMessage(Protocol proto, Message<?> message, Class<?> targetClass) {
  if (proto.getName() == "http") // then use ce-
  if (proto.getName() == "kafka") // then use ce_
}

Alternatively, you could contain the protocol information inside the Message interface.

Because CloudEvents spec supports only a bunch of protocol bindings, the mantainance burden of such converter is very low, and at the same time is more Spring idiomatic and easier for the end user.

I think this approach is the best one we can follow IMO, is it somehow feasible?

If not, then your other kafka branch is fine for me, although i would really love to try this approach using always CloudEventMessageConverter.

@olegz
Copy link
Contributor

olegz commented Apr 21, 2021

@dsyer perhaps we can standardize on target_protocol header as we are already relying on it here

Fix typo

Co-authored-by: Eddú Meléndez Gonzales <eddu.melendez@gmail.com>
@pierDipi pierDipi added this to To do in SDK Java Nov 14, 2021
@pierDipi pierDipi moved this from To do to In progress in SDK Java Nov 14, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
SDK Java
In progress
Development

Successfully merging this pull request may close these issues.

None yet

4 participants