Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Related to spring-projects#2195

- check `isPartitionPauseRequested()` instead of `isPartitionPaused()`
- add diagnostic debug logging
- prevent re-pausing the entire consumer each time a poll exits but the first
  partition is still paused.

**cherry-pick to 2.9.x**

* Checkstyle fix.
  • Loading branch information
garyrussell committed Jul 5, 2022
1 parent e9b5e69 commit 36665e9
Showing 1 changed file with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private ConsumerRecords<K, V> pendingRecordsAfterError;

private boolean pauseForPending;

private volatile boolean consumerPaused;

private volatile Thread consumerThread;
Expand Down Expand Up @@ -1566,7 +1568,10 @@ private ConsumerRecords<K, V> doPoll() {
+ "after an error; emergency stop invoked to avoid message loss", howManyRecords));
KafkaMessageListenerContainer.this.emergencyStop.run();
}
if (!isPartitionPaused(this.pendingRecordsAfterError.partitions().iterator().next())) {
TopicPartition firstPart = this.pendingRecordsAfterError.partitions().iterator().next();
boolean isPaused = isPartitionPauseRequested(firstPart);
this.logger.debug(() -> "First pending after error: " + firstPart + "; paused: " + isPaused);
if (!isPaused) {
records = this.pendingRecordsAfterError;
this.pendingRecordsAfterError = null;
}
Expand Down Expand Up @@ -1682,10 +1687,11 @@ private void doPauseConsumerIfNecessary() {
this.logger.debug(() -> "Pausing for incomplete async acks: " + this.offsetsInThisBatch);
}
if (!this.consumerPaused && (isPaused() || this.pausedForAsyncAcks)
|| this.pendingRecordsAfterError != null) {
|| this.pauseForPending) {

this.consumer.pause(this.consumer.assignment());
this.consumerPaused = true;
this.pauseForPending = false;
this.logger.debug(() -> "Paused consumption from: " + this.consumer.paused());
publishConsumerPausedEvent(this.consumer.assignment());
}
Expand Down Expand Up @@ -2385,6 +2391,7 @@ private void invokeBatchErrorHandler(final ConsumerRecords<K, V> records,
() -> invokeBatchOnMessageWithRecordsOrList(records, list));
if (!afterHandling.isEmpty()) {
this.pendingRecordsAfterError = afterHandling;
this.pauseForPending = true;
}
}
}
Expand Down Expand Up @@ -2778,6 +2785,7 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
}
if (records.size() > 0) {
this.pendingRecordsAfterError = new ConsumerRecords<>(records);
this.pauseForPending = true;
}
}
}
Expand Down

0 comments on commit 36665e9

Please sign in to comment.