Skip to content

Commit

Permalink
kgo: no-op mark functions when not using AutoCommitMarks
Browse files Browse the repository at this point in the history
Closes #598.
  • Loading branch information
twmb committed Nov 1, 2023
1 parent 6ebcb43 commit 72778cb
Showing 1 changed file with 17 additions and 34 deletions.
51 changes: 17 additions & 34 deletions pkg/kgo/consumer_group.go
Expand Up @@ -2201,14 +2201,6 @@ func (g *groupConsumer) getSetAssigns(setOffsets map[string]map[int32]EpochOffse
// offsets are always updated on calls to PollFetches.
//
// If there are no uncommitted offsets, this returns nil.
//
// Note that, if manually committing, you should be careful with committing
// during group rebalances. You must ensure you commit before the group's
// session timeout is reached, otherwise this client will be kicked from the
// group and the commit will fail.
//
// If using a cooperative balancer, commits while consuming during rebalancing
// may fail with REBALANCE_IN_PROGRESS.
func (cl *Client) UncommittedOffsets() map[string]map[int32]EpochOffset {
if g := cl.consumer.g; g != nil {
return g.getUncommitted(true)
Expand All @@ -2218,27 +2210,14 @@ func (cl *Client) UncommittedOffsets() map[string]map[int32]EpochOffset {

// MarkedOffsets returns the latest marked offsets. When autocommitting, a
// marked offset is an offset that can be committed, in comparison to a dirty
// offset that cannot yet be committed. You usually see marked offsets with
// AutoCommitMarks and MarkCommitRecords, but you can also use this function to
// grab the current offsets that are candidates for committing from normal
// autocommitting.
//
// If you set a custom OnPartitionsRevoked, marked offsets are not committed
// when partitions are revoked. You can use this function to mark records and
// issue a commit inside your OnPartitionsRevoked.
//
// Note that, if manually committing, you should be careful with committing
// during group rebalances. You must ensure you commit before the group's
// session timeout is reached, otherwise this client will be kicked from the
// group and the commit will fail.
//
// If using a cooperative balancer, commits while consuming during rebalancing
// may fail with REBALANCE_IN_PROGRESS.
// offset that cannot yet be committed. MarkedOffsets returns nil if you are
// not using AutoCommitMarks.
func (cl *Client) MarkedOffsets() map[string]map[int32]EpochOffset {
if g := cl.consumer.g; g != nil {
return g.getUncommitted(false)
g := cl.consumer.g
if g == nil || !cl.cfg.autocommitMarks {
return nil
}
return nil
return g.getUncommitted(false)
}

// CommittedOffsets returns the latest committed offsets. Committed offsets are
Expand Down Expand Up @@ -2464,13 +2443,13 @@ func (cl *Client) CommitUncommittedOffsets(ctx context.Context) error {
return cl.commitOffsets(ctx, cl.UncommittedOffsets())
}

// CommitMarkedOffsets issues a synchronous offset commit for any
// partition that has been consumed from that has marked offsets.
// Retryable errors are retried up to the configured retry limit, and any
// unretryable error is returned.
// CommitMarkedOffsets issues a synchronous offset commit for any partition
// that has been consumed from that has marked offsets. Retryable errors are
// retried up to the configured retry limit, and any unretryable error is
// returned.
//
// This function is useful if you have marked offsets with MarkCommitRecords
// when using AutoCommitMarks.
// This function is only useful if you have marked offsets with
// MarkCommitRecords when using AutoCommitMarks, otherwise this is a no-op.
//
// The recommended pattern for using this function is to have a poll / process
// / commit loop. First PollFetches, then process every record,
Expand All @@ -2480,7 +2459,11 @@ func (cl *Client) CommitUncommittedOffsets(ctx context.Context) error {
// As an alternative if you want to commit specific records, see CommitRecords.
func (cl *Client) CommitMarkedOffsets(ctx context.Context) error {
// This function is just the tail end of CommitRecords just above.
return cl.commitOffsets(ctx, cl.MarkedOffsets())
marked := cl.MarkedOffsets()
if len(marked) == 0 {
return nil
}
return cl.commitOffsets(ctx, marked)
}

func (cl *Client) commitOffsets(ctx context.Context, offsets map[string]map[int32]EpochOffset) error {
Expand Down

0 comments on commit 72778cb

Please sign in to comment.