Concurrency and Virtual Threads #2915
-
My service creates a I recently switched my service over to use virtual threads, and enabled them for Kafka as well: @Bean
public <V> ConcurrentKafkaListenerContainerFactory<String, V> kafkaListenerContainerFactory(
ObjectMapper objectMapper
) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, V>();
factory.setAutoStartup(true);
factory.setConsumerFactory(kafkaConsumerFactory(objectMapper));
factory.setContainerCustomizer(container -> {
container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
container.getContainerProperties().setListenerTaskExecutor(createVirtualThreadAsyncTaskExecutor());
});
return factory;
}
private AsyncTaskExecutor createVirtualThreadAsyncTaskExecutor() {
var asyncTaskExecutor = new SimpleAsyncTaskExecutor();
asyncTaskExecutor.setConcurrencyLimit(SimpleAsyncTaskExecutor.UNBOUNDED_CONCURRENCY);
asyncTaskExecutor.setVirtualThreads(true);
return asyncTaskExecutor;
} With this pattern, should I be invoking |
Beta Was this translation helpful? Give feedback.
Answered by
garyrussell
Nov 27, 2023
Replies: 1 comment 1 reply
-
You still need to set concurrency if you want to consume multiple records from different partitions concurrently; each consumer will get its own virtual thread. |
Beta Was this translation helpful? Give feedback.
1 reply
Answer selected by
jhg023
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
You still need to set concurrency if you want to consume multiple records from different partitions concurrently; each consumer will get its own virtual thread.