@@ -2201,14 +2201,6 @@ func (g *groupConsumer) getSetAssigns(setOffsets map[string]map[int32]EpochOffse
2201
2201
// offsets are always updated on calls to PollFetches.
2202
2202
//
2203
2203
// If there are no uncommitted offsets, this returns nil.
2204
- //
2205
- // Note that, if manually committing, you should be careful with committing
2206
- // during group rebalances. You must ensure you commit before the group's
2207
- // session timeout is reached, otherwise this client will be kicked from the
2208
- // group and the commit will fail.
2209
- //
2210
- // If using a cooperative balancer, commits while consuming during rebalancing
2211
- // may fail with REBALANCE_IN_PROGRESS.
2212
2204
func (cl * Client ) UncommittedOffsets () map [string ]map [int32 ]EpochOffset {
2213
2205
if g := cl .consumer .g ; g != nil {
2214
2206
return g .getUncommitted (true )
@@ -2218,27 +2210,14 @@ func (cl *Client) UncommittedOffsets() map[string]map[int32]EpochOffset {
2218
2210
2219
2211
// MarkedOffsets returns the latest marked offsets. When autocommitting, a
2220
2212
// marked offset is an offset that can be committed, in comparison to a dirty
2221
- // offset that cannot yet be committed. You usually see marked offsets with
2222
- // AutoCommitMarks and MarkCommitRecords, but you can also use this function to
2223
- // grab the current offsets that are candidates for committing from normal
2224
- // autocommitting.
2225
- //
2226
- // If you set a custom OnPartitionsRevoked, marked offsets are not committed
2227
- // when partitions are revoked. You can use this function to mark records and
2228
- // issue a commit inside your OnPartitionsRevoked.
2229
- //
2230
- // Note that, if manually committing, you should be careful with committing
2231
- // during group rebalances. You must ensure you commit before the group's
2232
- // session timeout is reached, otherwise this client will be kicked from the
2233
- // group and the commit will fail.
2234
- //
2235
- // If using a cooperative balancer, commits while consuming during rebalancing
2236
- // may fail with REBALANCE_IN_PROGRESS.
2213
+ // offset that cannot yet be committed. MarkedOffsets returns nil if you are
2214
+ // not using AutoCommitMarks.
2237
2215
func (cl * Client ) MarkedOffsets () map [string ]map [int32 ]EpochOffset {
2238
- if g := cl .consumer .g ; g != nil {
2239
- return g .getUncommitted (false )
2216
+ g := cl .consumer .g
2217
+ if g == nil || ! cl .cfg .autocommitMarks {
2218
+ return nil
2240
2219
}
2241
- return nil
2220
+ return g . getUncommitted ( false )
2242
2221
}
2243
2222
2244
2223
// CommittedOffsets returns the latest committed offsets. Committed offsets are
@@ -2464,13 +2443,13 @@ func (cl *Client) CommitUncommittedOffsets(ctx context.Context) error {
2464
2443
return cl .commitOffsets (ctx , cl .UncommittedOffsets ())
2465
2444
}
2466
2445
2467
- // CommitMarkedOffsets issues a synchronous offset commit for any
2468
- // partition that has been consumed from that has marked offsets.
2469
- // Retryable errors are retried up to the configured retry limit, and any
2470
- // unretryable error is returned.
2446
+ // CommitMarkedOffsets issues a synchronous offset commit for any partition
2447
+ // that has been consumed from that has marked offsets. Retryable errors are
2448
+ // retried up to the configured retry limit, and any unretryable error is
2449
+ // returned.
2471
2450
//
2472
- // This function is useful if you have marked offsets with MarkCommitRecords
2473
- // when using AutoCommitMarks.
2451
+ // This function is only useful if you have marked offsets with
2452
+ // MarkCommitRecords when using AutoCommitMarks, otherwise this is a no-op .
2474
2453
//
2475
2454
// The recommended pattern for using this function is to have a poll / process
2476
2455
// / commit loop. First PollFetches, then process every record,
@@ -2480,7 +2459,11 @@ func (cl *Client) CommitUncommittedOffsets(ctx context.Context) error {
2480
2459
// As an alternative if you want to commit specific records, see CommitRecords.
2481
2460
func (cl * Client ) CommitMarkedOffsets (ctx context.Context ) error {
2482
2461
// This function is just the tail end of CommitRecords just above.
2483
- return cl .commitOffsets (ctx , cl .MarkedOffsets ())
2462
+ marked := cl .MarkedOffsets ()
2463
+ if len (marked ) == 0 {
2464
+ return nil
2465
+ }
2466
+ return cl .commitOffsets (ctx , marked )
2484
2467
}
2485
2468
2486
2469
func (cl * Client ) commitOffsets (ctx context.Context , offsets map [string ]map [int32 ]EpochOffset ) error {
0 commit comments