Skip to content

Commit

Permalink
spring-projectsGH-2239: Replace PartitionPausingBackOffManager
Browse files Browse the repository at this point in the history
New back of manager (and factory) that uses a task scheduler to resume
the paused partitions.

Revert change to deprecated PartitionPausingBackoffManager.

Log resume.
  • Loading branch information
garyrussell committed Jul 6, 2022
1 parent d7b100b commit 9acdbde
Show file tree
Hide file tree
Showing 12 changed files with 286 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class KafkaListenerEndpointRegistry implements ListenerContainerRegistry,

protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); //NOSONAR

private final Map<String, MessageListenerContainer> unregisteredContainers = new ConcurrentHashMap<>();

private final Map<String, MessageListenerContainer> listenerContainers = new ConcurrentHashMap<>();

private int phase = AbstractMessageListenerContainer.DEFAULT_PHASE;
Expand Down Expand Up @@ -109,6 +111,17 @@ public MessageListenerContainer getListenerContainer(String id) {
return this.listenerContainers.get(id);
}

@Override
@Nullable
public MessageListenerContainer getUnregisteredListenerContainer(String id) {
MessageListenerContainer container = this.unregisteredContainers.get(id);
if (container == null) {
refreshContextContainers();
return this.unregisteredContainers.get(id);
}
return null;
}

/**
* By default, containers registered for endpoints after the context is refreshed
* are immediately started, regardless of their autoStartup property, to comply with
Expand Down Expand Up @@ -156,10 +169,17 @@ public Collection<MessageListenerContainer> getListenerContainers() {
public Collection<MessageListenerContainer> getAllListenerContainers() {
List<MessageListenerContainer> containers = new ArrayList<>();
containers.addAll(getListenerContainers());
containers.addAll(this.applicationContext.getBeansOfType(MessageListenerContainer.class, true, false).values());
refreshContextContainers();
containers.addAll(this.unregisteredContainers.values());
return containers;
}

private void refreshContextContainers() {
this.unregisteredContainers.clear();
this.applicationContext.getBeansOfType(MessageListenerContainer.class, true, false).values()
.forEach(container -> this.unregisteredContainers.put(container.getListenerId(), container));
}

/**
* Create a message listener container for the given {@link KafkaListenerEndpoint}.
* <p>This create the necessary infrastructure to honor that endpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@

package org.springframework.kafka.listener;

import org.apache.kafka.common.TopicPartition;

import org.springframework.lang.Nullable;

/**
* Handler for the provided back off time, listener container and exception.
* Also supports back off for individual partitions.
*
* @author Jan Marincek
* @since 2.9
* @author Jan Marincek
* @author Gary Russell
* @since 2.9
*/
@FunctionalInterface
public interface BackOffHandler {

/**
Expand All @@ -35,4 +38,16 @@ public interface BackOffHandler {
*/
void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff);

/**
* Perform the next back off for a partition.
* @param container the container.
* @param partition the partition.
* @param nextBackOff the next back off.
*/
default void onNextBackOff(@Nullable MessageListenerContainer container, TopicPartition partition,
long nextBackOff) {

throw new UnsupportedOperationException();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.TopicPartition;

import org.springframework.core.log.LogAccessor;
import org.springframework.util.Assert;

/**
*
* A manager that backs off consumption for a given topic if the timestamp provided is not
* due. Use with {@link DefaultErrorHandler} to guarantee that the message is read
* again after partition consumption is resumed (or seek it manually by other means).
* Note that when a record backs off the partition consumption gets paused for
* approximately that amount of time, so you must have a fixed backoff value per partition.
*
* @author Tomaz Fernandes
* @author Gary Russell
* @since 2.9
* @see DefaultErrorHandler
*/
public class ContainerPartitionPausingBackOffManager implements KafkaConsumerBackoffManager {

private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(KafkaConsumerBackoffManager.class));

private final ListenerContainerRegistry listenerContainerRegistry;

private final BackOffHandler backOffHandler;

/**
* Construct an instance with the provided registry and back off handler.
* @param listenerContainerRegistry
* @param backOffHandler
*/
public ContainerPartitionPausingBackOffManager(ListenerContainerRegistry listenerContainerRegistry,
BackOffHandler backOffHandler) {

this.listenerContainerRegistry = listenerContainerRegistry;
this.backOffHandler = backOffHandler;
}

/**
* Backs off if the current time is before the dueTimestamp provided
* in the {@link Context} object.
* @param context the back off context for this execution.
*/
@Override
public void backOffIfNecessary(Context context) {
long backoffTime = context.getDueTimestamp() - System.currentTimeMillis();
LOGGER.debug(() -> "Back off time: " + backoffTime + " Context: " + context);
if (backoffTime > 0) {
pauseConsumptionAndThrow(context, backoffTime);
}
}

private void pauseConsumptionAndThrow(Context context, Long backOffTime) throws KafkaBackoffException {
TopicPartition topicPartition = context.getTopicPartition();
getListenerContainerFromContext(context).pausePartition(topicPartition);
this.backOffHandler.onNextBackOff(getListenerContainerFromContext(context), topicPartition, backOffTime);
throw new KafkaBackoffException(String.format("Partition %s from topic %s is not ready for consumption, " +
"backing off for approx. %s millis.", context.getTopicPartition().partition(),
context.getTopicPartition().topic(), backOffTime),
topicPartition, context.getListenerId(), context.getDueTimestamp());
}

private MessageListenerContainer getListenerContainerFromContext(Context context) {
MessageListenerContainer container = this.listenerContainerRegistry.getListenerContainer(context.getListenerId()); // NOSONAR
if (container == null) {
container = this.listenerContainerRegistry.getUnregisteredListenerContainer(context.getListenerId());
}
Assert.notNull(container, () -> "No container found with id: " + context.getListenerId());
return container;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.listener;

/**
* A factory for {@link ContainerPartitionPausingBackoffManager}.
*
* @author Gary Russell
* @since 2.9
*
*/
public class ContainerPartitionPausingBackOffManagerFactory extends AbstractKafkaBackOffManagerFactory {

private BackOffHandler backOffHandler;

/**
* Construct an instance with the provided properties.
* @param listenerContainerRegistry the registry.
* @param backOffHandler the back off handler.
*/
public ContainerPartitionPausingBackOffManagerFactory(ListenerContainerRegistry listenerContainerRegistry) {

super(listenerContainerRegistry);
}

@Override
protected KafkaConsumerBackoffManager doCreateManager(ListenerContainerRegistry registry) {
return new ContainerPartitionPausingBackOffManager(getListenerContainerRegistry(), this.backOffHandler);
}

public void setBackOffHandler(BackOffHandler backOffHandler) {
this.backOffHandler = backOffHandler;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.time.Duration;

import org.apache.kafka.common.TopicPartition;

import org.springframework.lang.Nullable;

/**
Expand Down Expand Up @@ -51,4 +53,9 @@ public void onNextBackOff(@Nullable MessageListenerContainer container, Exceptio
}
}

@Override
public void onNextBackOff(MessageListenerContainer container, TopicPartition partition, long nextBackOff) {
this.pauser.pausePartition(container, partition, Duration.ofMillis(nextBackOff));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,15 @@ public void resume() {
}
}

@Override
public void resumePartition(TopicPartition topicPartition) {
super.resumePartition(topicPartition);
KafkaMessageListenerContainer<K, V>.ListenerConsumer consumer = this.listenerConsumer;
if (consumer != null) {
this.listenerConsumer.wakeIfNecessary();
}
}

@Override
public Map<String, Map<MetricName, ? extends Metric>> metrics() {
ListenerConsumer listenerConsumerForMetrics = this.listenerConsumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Optional;

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.TopicPartition;

import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -81,13 +82,37 @@ public void pause(MessageListenerContainer messageListenerContainer, Duration pa
}
else {
Instant resumeAt = Instant.now().plusMillis(pauseDuration.toMillis());
LOGGER.debug(() -> "Pausing container " + messageListenerContainer + "resume scheduled for "
LOGGER.debug(() -> "Pausing container " + messageListenerContainer + ", resume scheduled for "
+ resumeAt.atZone(ZoneId.systemDefault()).toLocalDateTime());
messageListenerContainer.pause();
this.scheduler.schedule(() -> resume(messageListenerContainer), resumeAt);
this.scheduler.schedule(() -> {
LOGGER.debug(() -> "Pausing container " + messageListenerContainer);
resume(messageListenerContainer);
}, resumeAt);
}
}

/**
* Pause consumption from a given partition for the duration.
* @param messageListenerContainer the container.
* @param partition the partition.
* @param pauseDuration the duration.
*/
public void pausePartition(MessageListenerContainer messageListenerContainer, TopicPartition partition,
Duration pauseDuration) {

Instant resumeAt = Instant.now().plusMillis(pauseDuration.toMillis());
LOGGER.debug(() -> "Pausing container: " + messageListenerContainer + " partition: " + partition
+ ", resume scheduled for "
+ resumeAt.atZone(ZoneId.systemDefault()).toLocalDateTime());
messageListenerContainer.pausePartition(partition);
this.scheduler.schedule(() -> {
LOGGER.debug(() -> "Resuming container: " + messageListenerContainer + " partition: " + partition);
messageListenerContainer.resumePartition(partition);
}, resumeAt);

}

/**
* Resume the listener container by given id.
* @param listenerId the id of the listener
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 the original author or authors.
* Copyright 2021-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,6 +41,17 @@ public interface ListenerContainerRegistry {
@Nullable
MessageListenerContainer getListenerContainer(String id);

/**
* Return the {@link MessageListenerContainer} with the specified id or {@code null}
* if no such container exists. Returns containers that are not registered with the
* registry, but exist in the application context.
* @param id the id of the container
* @return the container or {@code null} if no container with that id exists
* @see #getListenerContainerIds()
*/
@Nullable
MessageListenerContainer getUnregisteredListenerContainer(String id);

/**
* Return the ids of the managed {@link MessageListenerContainer} instance(s).
* @return the ids.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
* IMPORTANT: Since 2.9 this class doesn't create a {@link ThreadPoolTaskExecutor}
* by default. In order for the factory to create a {@link KafkaConsumerTimingAdjuster},
* such thread executor must be provided.
* @deprecated in favor of {@link ContainerPartitionPausingBackOffManager}.
*
* @author Tomaz Fernandes
* @since 2.7
*/
@Deprecated
public class PartitionPausingBackOffManagerFactory extends AbstractKafkaBackOffManagerFactory {

private boolean timingAdjustmentEnabled = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@
*
* Note that when a record backs off the partition consumption gets paused for
* approximately that amount of time, so you must have a fixed backoff value per partition.
* @deprecated in favor of {@link ContainerPartitionPausingBackOffManager}.
*
* @author Tomaz Fernandes
* @author Gary Russell
* @since 2.7
* @see DefaultErrorHandler
*/
@Deprecated
public class PartitionPausingBackoffManager implements KafkaConsumerBackoffManager,
ApplicationListener<ListenerContainerPartitionIdleEvent> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import org.springframework.beans.factory.BeanFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManagerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.KafkaBackOffManagerFactory;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.ListenerContainerRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.PartitionPausingBackOffManagerFactory;
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;

/**
Expand Down Expand Up @@ -153,7 +153,7 @@ public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
* @return the instance.
*/
public KafkaBackOffManagerFactory kafkaBackOffManagerFactory(ListenerContainerRegistry registry) {
return new PartitionPausingBackOffManagerFactory(registry);
return new ContainerPartitionPausingBackOffManagerFactory(registry);
}

/**
Expand Down

0 comments on commit 9acdbde

Please sign in to comment.