Skip to content

Commit 72778cb

Browse files
committedNov 1, 2023
kgo: no-op mark functions when not using AutoCommitMarks
Closes #598.
1 parent 6ebcb43 commit 72778cb

File tree

1 file changed

+17
-34
lines changed

1 file changed

+17
-34
lines changed
 

‎pkg/kgo/consumer_group.go

+17-34
Original file line numberDiff line numberDiff line change
@@ -2201,14 +2201,6 @@ func (g *groupConsumer) getSetAssigns(setOffsets map[string]map[int32]EpochOffse
22012201
// offsets are always updated on calls to PollFetches.
22022202
//
22032203
// If there are no uncommitted offsets, this returns nil.
2204-
//
2205-
// Note that, if manually committing, you should be careful with committing
2206-
// during group rebalances. You must ensure you commit before the group's
2207-
// session timeout is reached, otherwise this client will be kicked from the
2208-
// group and the commit will fail.
2209-
//
2210-
// If using a cooperative balancer, commits while consuming during rebalancing
2211-
// may fail with REBALANCE_IN_PROGRESS.
22122204
func (cl *Client) UncommittedOffsets() map[string]map[int32]EpochOffset {
22132205
if g := cl.consumer.g; g != nil {
22142206
return g.getUncommitted(true)
@@ -2218,27 +2210,14 @@ func (cl *Client) UncommittedOffsets() map[string]map[int32]EpochOffset {
22182210

22192211
// MarkedOffsets returns the latest marked offsets. When autocommitting, a
22202212
// marked offset is an offset that can be committed, in comparison to a dirty
2221-
// offset that cannot yet be committed. You usually see marked offsets with
2222-
// AutoCommitMarks and MarkCommitRecords, but you can also use this function to
2223-
// grab the current offsets that are candidates for committing from normal
2224-
// autocommitting.
2225-
//
2226-
// If you set a custom OnPartitionsRevoked, marked offsets are not committed
2227-
// when partitions are revoked. You can use this function to mark records and
2228-
// issue a commit inside your OnPartitionsRevoked.
2229-
//
2230-
// Note that, if manually committing, you should be careful with committing
2231-
// during group rebalances. You must ensure you commit before the group's
2232-
// session timeout is reached, otherwise this client will be kicked from the
2233-
// group and the commit will fail.
2234-
//
2235-
// If using a cooperative balancer, commits while consuming during rebalancing
2236-
// may fail with REBALANCE_IN_PROGRESS.
2213+
// offset that cannot yet be committed. MarkedOffsets returns nil if you are
2214+
// not using AutoCommitMarks.
22372215
func (cl *Client) MarkedOffsets() map[string]map[int32]EpochOffset {
2238-
if g := cl.consumer.g; g != nil {
2239-
return g.getUncommitted(false)
2216+
g := cl.consumer.g
2217+
if g == nil || !cl.cfg.autocommitMarks {
2218+
return nil
22402219
}
2241-
return nil
2220+
return g.getUncommitted(false)
22422221
}
22432222

22442223
// CommittedOffsets returns the latest committed offsets. Committed offsets are
@@ -2464,13 +2443,13 @@ func (cl *Client) CommitUncommittedOffsets(ctx context.Context) error {
24642443
return cl.commitOffsets(ctx, cl.UncommittedOffsets())
24652444
}
24662445

2467-
// CommitMarkedOffsets issues a synchronous offset commit for any
2468-
// partition that has been consumed from that has marked offsets.
2469-
// Retryable errors are retried up to the configured retry limit, and any
2470-
// unretryable error is returned.
2446+
// CommitMarkedOffsets issues a synchronous offset commit for any partition
2447+
// that has been consumed from that has marked offsets. Retryable errors are
2448+
// retried up to the configured retry limit, and any unretryable error is
2449+
// returned.
24712450
//
2472-
// This function is useful if you have marked offsets with MarkCommitRecords
2473-
// when using AutoCommitMarks.
2451+
// This function is only useful if you have marked offsets with
2452+
// MarkCommitRecords when using AutoCommitMarks, otherwise this is a no-op.
24742453
//
24752454
// The recommended pattern for using this function is to have a poll / process
24762455
// / commit loop. First PollFetches, then process every record,
@@ -2480,7 +2459,11 @@ func (cl *Client) CommitUncommittedOffsets(ctx context.Context) error {
24802459
// As an alternative if you want to commit specific records, see CommitRecords.
24812460
func (cl *Client) CommitMarkedOffsets(ctx context.Context) error {
24822461
// This function is just the tail end of CommitRecords just above.
2483-
return cl.commitOffsets(ctx, cl.MarkedOffsets())
2462+
marked := cl.MarkedOffsets()
2463+
if len(marked) == 0 {
2464+
return nil
2465+
}
2466+
return cl.commitOffsets(ctx, marked)
24842467
}
24852468

24862469
func (cl *Client) commitOffsets(ctx context.Context, offsets map[string]map[int32]EpochOffset) error {

0 commit comments

Comments
 (0)
Please sign in to comment.