Skip to content

Commit

Permalink
spring-projectsGH-2280: Add ContainerProperties.pauseImmediate
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell authored and artembilan committed Jul 6, 2022
1 parent ad48491 commit bb26e05
Show file tree
Hide file tree
Showing 4 changed files with 287 additions and 16 deletions.
8 changes: 8 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2578,6 +2578,10 @@ See `monitorInterval`.
|`false`
|Set to false to log the complete consumer record (in error, debug logs etc) instead of just `topic-partition@offset`.

|[[pauseImmediate]]<<pauseImmediate,`pauseImmediate`>>
|`false`
|When the container is paused, stop processing after the current record instead of after processing all the records from the previous poll; the remaining records are retained in memory and will be passed to the listener when the container is resumed.

|[[pollTimeout]]<<pollTimeout,`pollTimeout`>>
|5000
|The timeout passed into `Consumer.poll()`.
Expand Down Expand Up @@ -3825,6 +3829,10 @@ However, the consumers might not have actually paused yet.

In addition (also since 2.1.5), `ConsumerPausedEvent` and `ConsumerResumedEvent` instances are published with the container as the `source` property and the `TopicPartition` instances involved in the `partitions` property.

Starting with version 2.9, a new container property `pauseImmediate`, when set to true, causes the pause to take effect after the current record is processed.
By default, the pause takes effect when all of the records from the previous poll have been processed.
See <<pauseImmediate>>.

The following simple Spring Boot application demonstrates by using the container registry to get a reference to a `@KafkaListener` method's container and pausing or resuming its consumers as well as receiving the corresponding events:

====
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ public enum EOSMode {

private boolean asyncAcks;

private boolean pauseImmediate;

/**
* Create properties for a container that will subscribe to the specified topics.
* @param topics the topics.
Expand Down Expand Up @@ -873,6 +875,27 @@ public void setAsyncAcks(boolean asyncAcks) {
this.asyncAcks = asyncAcks;
}

/**
* When pausing the container with a record listener, whether the pause takes effect
* immediately, when the current record has been processed, or after all records from
* the previous poll have been processed. Default false.
* @return whether to pause immediately.
* @since 2.9
*/
public boolean isPauseImmediate() {
return this.pauseImmediate;
}

/**
* Set to true to pause the container after the current record has been processed, rather
* than after all the records from the previous poll have been processed.
* @param pauseImmediate true to pause immediately.
* @since 2.9
*/
public void setPauseImmediate(boolean pauseImmediate) {
this.pauseImmediate = pauseImmediate;
}

private void adviseListenerIfNeeded() {
if (!CollectionUtils.isEmpty(this.adviceChain)) {
if (AopUtils.isAopProxy(this.messageListener)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final Set<TopicPartition> pausedForNack = new HashSet<>();

private final boolean pauseImmediate = this.containerProperties.isPauseImmediate();

private Map<TopicPartition, OffsetMetadata> definedPartitions;

private int count;
Expand Down Expand Up @@ -782,7 +784,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private boolean receivedSome;

private ConsumerRecords<K, V> pendingRecordsAfterError;
private ConsumerRecords<K, V> remainingRecords;

private boolean pauseForPending;

Expand Down Expand Up @@ -1381,7 +1383,7 @@ protected void pollAndInvoke() {
debugRecords(records);

invokeIfHaveRecords(records);
if (this.pendingRecordsAfterError == null) {
if (this.remainingRecords == null) {
resumeConsumerIfNeccessary();
if (!this.consumerPaused) {
resumePartitionsIfNecessary();
Expand All @@ -1395,9 +1397,9 @@ private void doProcessCommits() {
processCommits();
}
catch (CommitFailedException cfe) {
if (this.pendingRecordsAfterError != null && !this.isBatchListener) {
ConsumerRecords<K, V> pending = this.pendingRecordsAfterError;
this.pendingRecordsAfterError = null;
if (this.remainingRecords != null && !this.isBatchListener) {
ConsumerRecords<K, V> pending = this.remainingRecords;
this.remainingRecords = null;
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
Iterator<ConsumerRecord<K, V>> iterator = pending.iterator();
while (iterator.hasNext()) {
Expand Down Expand Up @@ -1563,19 +1565,19 @@ private ConsumerRecords<K, V> doPoll() {
}
else {
records = pollConsumer();
if (this.pendingRecordsAfterError != null) {
if (this.remainingRecords != null) {
int howManyRecords = records.count();
if (howManyRecords > 0) {
this.logger.error(() -> String.format("Poll returned %d record(s) while consumer was paused "
+ "after an error; emergency stop invoked to avoid message loss", howManyRecords));
KafkaMessageListenerContainer.this.emergencyStop.run();
}
TopicPartition firstPart = this.pendingRecordsAfterError.partitions().iterator().next();
TopicPartition firstPart = this.remainingRecords.partitions().iterator().next();
boolean isPaused = isPaused() || isPartitionPauseRequested(firstPart);
this.logger.debug(() -> "First pending after error: " + firstPart + "; paused: " + isPaused);
if (!isPaused) {
records = this.pendingRecordsAfterError;
this.pendingRecordsAfterError = null;
records = this.remainingRecords;
this.remainingRecords = null;
}
}
captureOffsets(records);
Expand Down Expand Up @@ -2225,8 +2227,8 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
private void commitOffsetsIfNeeded(final ConsumerRecords<K, V> records) {
if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle())
|| this.producer != null) {
if (this.pendingRecordsAfterError != null) {
ConsumerRecord<K, V> firstUncommitted = this.pendingRecordsAfterError.iterator().next();
if (this.remainingRecords != null) {
ConsumerRecord<K, V> firstUncommitted = this.remainingRecords.iterator().next();
Iterator<ConsumerRecord<K, V>> it = records.iterator();
while (it.hasNext()) {
ConsumerRecord<K, V> next = it.next();
Expand Down Expand Up @@ -2392,7 +2394,7 @@ private void invokeBatchErrorHandler(final ConsumerRecords<K, V> records,
records, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer,
() -> invokeBatchOnMessageWithRecordsOrList(records, list));
if (!afterHandling.isEmpty()) {
this.pendingRecordsAfterError = afterHandling;
this.remainingRecords = afterHandling;
this.pauseForPending = true;
}
}
Expand Down Expand Up @@ -2444,7 +2446,9 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
handleNack(records, record);
break;
}

if (checkImmediatePause(iterator)) {
break;
}
}
}

Expand Down Expand Up @@ -2523,9 +2527,28 @@ private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
handleNack(records, record);
break;
}
if (checkImmediatePause(iterator)) {
break;
}
}
}

private boolean checkImmediatePause(Iterator<ConsumerRecord<K, V>> iterator) {
if (isPaused() && this.pauseImmediate) {
Map<TopicPartition, List<ConsumerRecord<K, V>>> remaining = new HashMap<>();
while (iterator.hasNext()) {
ConsumerRecord<K, V> next = iterator.next();
remaining.computeIfAbsent(new TopicPartition(next.topic(), next.partition()),
tp -> new ArrayList<ConsumerRecord<K, V>>()).add(next);
}
if (remaining.size() > 0) {
this.remainingRecords = new ConsumerRecords<>(remaining);
return true;
}
}
return false;
}

@Nullable
private ConsumerRecords<K, V> checkEarlyIntercept(ConsumerRecords<K, V> nextArg) {
ConsumerRecords<K, V> next = nextArg;
Expand Down Expand Up @@ -2669,8 +2692,8 @@ private void commitOffsetsIfNeeded(final ConsumerRecord<K, V> record) {
if (this.isManualAck) {
this.commitRecovered = true;
}
if (this.pendingRecordsAfterError == null
|| !record.equals(this.pendingRecordsAfterError.iterator().next())) {
if (this.remainingRecords == null
|| !record.equals(this.remainingRecords.iterator().next())) {
ackCurrent(record);
}
if (this.isManualAck) {
Expand Down Expand Up @@ -2787,7 +2810,7 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
tp -> new ArrayList<ConsumerRecord<K, V>>()).add(next);
}
if (records.size() > 0) {
this.pendingRecordsAfterError = new ConsumerRecords<>(records);
this.remainingRecords = new ConsumerRecords<>(records);
this.pauseForPending = true;
}
}
Expand Down

0 comments on commit bb26e05

Please sign in to comment.