@@ -433,6 +433,8 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {
433
433
}()
434
434
}
435
435
436
+ paused := c .loadPaused ()
437
+
436
438
// A group can grab the consumer lock then the group mu and
437
439
// assign partitions. The group mu is grabbed to update its
438
440
// uncommitted map. Assigning partitions clears sources ready
@@ -451,13 +453,13 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {
451
453
c .sourcesReadyMu .Lock ()
452
454
if maxPollRecords < 0 {
453
455
for _ , ready := range c .sourcesReadyForDraining {
454
- fetches = append (fetches , ready .takeBuffered ())
456
+ fetches = append (fetches , ready .takeBuffered (paused ))
455
457
}
456
458
c .sourcesReadyForDraining = nil
457
459
} else {
458
460
for len (c .sourcesReadyForDraining ) > 0 && maxPollRecords > 0 {
459
461
source := c .sourcesReadyForDraining [0 ]
460
- fetch , taken , drained := source .takeNBuffered (maxPollRecords )
462
+ fetch , taken , drained := source .takeNBuffered (paused , maxPollRecords )
461
463
if drained {
462
464
c .sourcesReadyForDraining = c .sourcesReadyForDraining [1 :]
463
465
}
@@ -555,9 +557,7 @@ func (cl *Client) UpdateFetchMaxBytes(maxBytes, maxPartBytes int32) {
555
557
// PauseFetchTopics sets the client to no longer fetch the given topics and
556
558
// returns all currently paused topics. Paused topics persist until resumed.
557
559
// You can call this function with no topics to simply receive the list of
558
- // currently paused topics. Pausing topics drops everything currently buffered
559
- // and kills any in flight fetch requests to ensure nothing that is paused
560
- // can be returned anymore from polling.
560
+ // currently paused topics.
561
561
//
562
562
// Pausing topics is independent from pausing individual partitions with the
563
563
// PauseFetchPartitions method. If you pause partitions for a topic with
@@ -569,15 +569,8 @@ func (cl *Client) PauseFetchTopics(topics ...string) []string {
569
569
if len (topics ) == 0 {
570
570
return c .loadPaused ().pausedTopics ()
571
571
}
572
-
573
572
c .pausedMu .Lock ()
574
573
defer c .pausedMu .Unlock ()
575
- defer func () {
576
- c .mu .Lock ()
577
- defer c .mu .Unlock ()
578
- c .assignPartitions (nil , assignBumpSession , nil , fmt .Sprintf ("pausing fetch topics %v" , topics ))
579
- }()
580
-
581
574
paused := c .clonePaused ()
582
575
paused .addTopics (topics ... )
583
576
c .storePaused (paused )
@@ -587,9 +580,7 @@ func (cl *Client) PauseFetchTopics(topics ...string) []string {
587
580
// PauseFetchPartitions sets the client to no longer fetch the given partitions
588
581
// and returns all currently paused partitions. Paused partitions persist until
589
582
// resumed. You can call this function with no partitions to simply receive the
590
- // list of currently paused partitions. Pausing partitions drops everything
591
- // currently buffered and kills any in flight fetch requests to ensure nothing
592
- // that is paused can be returned anymore from polling.
583
+ // list of currently paused partitions.
593
584
//
594
585
// Pausing individual partitions is independent from pausing topics with the
595
586
// PauseFetchTopics method. If you pause partitions for a topic with
@@ -601,15 +592,8 @@ func (cl *Client) PauseFetchPartitions(topicPartitions map[string][]int32) map[s
601
592
if len (topicPartitions ) == 0 {
602
593
return c .loadPaused ().pausedPartitions ()
603
594
}
604
-
605
595
c .pausedMu .Lock ()
606
596
defer c .pausedMu .Unlock ()
607
- defer func () {
608
- c .mu .Lock ()
609
- defer c .mu .Unlock ()
610
- c .assignPartitions (nil , assignBumpSession , nil , fmt .Sprintf ("pausing fetch partitions %v" , topicPartitions ))
611
- }()
612
-
613
597
paused := c .clonePaused ()
614
598
paused .addPartitions (topicPartitions )
615
599
c .storePaused (paused )
@@ -884,10 +868,6 @@ const (
884
868
// The counterpart to assignInvalidateMatching, assignSetMatching
885
869
// resets all matching partitions to the specified offset / epoch.
886
870
assignSetMatching
887
-
888
- // For pausing, we want to drop anything inflight. We start a new
889
- // session with the old tps.
890
- assignBumpSession
891
871
)
892
872
893
873
func (h assignHow ) String () string {
@@ -902,8 +882,6 @@ func (h assignHow) String() string {
902
882
return "unassigning and purging any partition matching the input topics"
903
883
case assignSetMatching :
904
884
return "reassigning any currently assigned matching partition to the input"
905
- case assignBumpSession :
906
- return "bumping internal consumer session to drop anything currently in flight"
907
885
}
908
886
return ""
909
887
}
@@ -984,8 +962,6 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
984
962
// if we had no session before, which is why we need to pass in
985
963
// our topicPartitions.
986
964
session = c .guardSessionChange (tps )
987
- } else if how == assignBumpSession {
988
- loadOffsets , tps = c .stopSession ()
989
965
} else {
990
966
loadOffsets , _ = c .stopSession ()
991
967
@@ -1032,7 +1008,7 @@ func (c *consumer) assignPartitions(assignments map[string]map[int32]Offset, how
1032
1008
// assignment went straight to listing / epoch loading, and
1033
1009
// that list/epoch never finished.
1034
1010
switch how {
1035
- case assignWithoutInvalidating , assignBumpSession :
1011
+ case assignWithoutInvalidating :
1036
1012
// Nothing to do -- this is handled above.
1037
1013
case assignInvalidateAll :
1038
1014
loadOffsets = listOrEpochLoads {}
0 commit comments