Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: twmb/franz-go
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v1.16.0
Choose a base ref
...
head repository: twmb/franz-go
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v1.16.1
Choose a head ref
  • 9 commits
  • 6 files changed
  • 1 contributor

Commits on Jan 22, 2024

  1. kadm: add DeleteTopic,DeleteGroup,Error

    Adds Error for many responses, which may sometimes be helpful.
    twmb committed Jan 22, 2024
    Copy the full SHA
    0b89158 View commit details
  2. Merge pull request #666 from twmb/kadm

    kadm: add DeleteTopic,DeleteGroup,Error
    twmb authored Jan 22, 2024
    Copy the full SHA
    a2d69ce View commit details

Commits on Feb 7, 2024

  1. Copy the full SHA
    d40ac19 View commit details
  2. kgo: fix bug

    Previously, if CommitOffsetsSync was called during a rebalance, and the
    context being used is canceled while the rebalance was occurring, then
    the client would deadlock.
    
    Internally, committing is blocked if a rebalance is actively happening.
    There's some complex logic to have, effectively, a cancelable lock
    if the user wants to not wait for a rebalance to complete while trying
    to commit offsets.
    
    There was a bug, and since the fix is one line, it's easier to see than
    explain.
    
    Closes #668.
    twmb committed Feb 7, 2024
    Copy the full SHA
    cd65d77 View commit details
  3. Copy the full SHA
    20867cd View commit details
  4. Merge pull request #673 from twmb/patch

    v1.16.1 patches
    twmb authored Feb 7, 2024
    Copy the full SHA
    41f0269 View commit details
  5. kgo: further fix for cd65d77

    The prior commit was insufficient -- we left a dangling lock, and we
    had that same dangling lock in the cancelable commit offsets.
    twmb committed Feb 7, 2024
    Copy the full SHA
    99d6dfb View commit details
  6. Copy the full SHA
    e08d276 View commit details
  7. Merge pull request #674 from twmb/patches

    Patches
    twmb authored Feb 7, 2024
    Copy the full SHA
    c5207aa View commit details
Showing with 172 additions and 30 deletions.
  1. +17 −0 CHANGELOG.md
  2. +61 −0 pkg/kadm/groups.go
  3. +12 −0 pkg/kadm/partas.go
  4. +65 −18 pkg/kadm/topics.go
  5. +7 −9 pkg/kgo/consumer_group.go
  6. +10 −3 pkg/kgo/partitioner.go
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
v1.16.1
===

This patch release fixes one bug and un-deprecates SaramaHasher.

SaramaHasher, while not identical to Sarama's partitioner, actually _is_
identical to some other partitioners in the Kafka client ecosystem. So, the old
function is now un-deprecated, but the documentation correctly points you to
SaramaCompatHasher and mentions why you may still want to use SaramaHasher.

For the bug: if you tried using CommitOffsetsSync during a group rebalance, and
you canceled your context while the group was still rebalancing, then
CommitOffsetsSync would enter a deadlock and never return. That has been fixed.

- [`cd65d77`](https://github.com/twmb/franz-go/commit/cd65d77) and [`99d6dfb`](https://github.com/twmb/franz-go/commit/99d6dfb) kgo: fix bug
- [`d40ac19`](https://github.com/twmb/franz-go/commit/d40ac19) kgo: un-deprecate SaramaHasher and add docs explaining why

v1.16.0
===

61 changes: 61 additions & 0 deletions pkg/kadm/groups.go
Original file line number Diff line number Diff line change
@@ -162,6 +162,17 @@ func (rs DescribedGroups) On(group string, fn func(*DescribedGroup) error) (Desc
return DescribedGroup{}, kerr.GroupIDNotFound
}

// Error iterates over all groups and returns the first error encountered, if
// any.
func (ds DescribedGroups) Error() error {
for _, d := range ds {
if d.Err != nil {
return d.Err
}
}
return nil
}

// Topics returns a sorted list of all group names.
func (ds DescribedGroups) Names() []string {
all := make([]string, 0, len(ds))
@@ -385,6 +396,32 @@ func (rs DeleteGroupResponses) On(group string, fn func(*DeleteGroupResponse) er
return DeleteGroupResponse{}, kerr.GroupIDNotFound
}

// Error iterates over all groups and returns the first error encountered, if
// any.
func (rs DeleteGroupResponses) Error() error {
for _, r := range rs {
if r.Err != nil {
return r.Err
}
}
return nil
}

// DeleteGroup deletes the specified group. This is similar to DeleteGroups,
// but returns the kerr.ErrorForCode(response.ErrorCode) if the request/response
// is successful.
func (cl *Client) DeleteGroup(ctx context.Context, group string) (DeleteGroupResponse, error) {
rs, err := cl.DeleteGroups(ctx, group)
if err != nil {
return DeleteGroupResponse{}, err
}
g, exists := rs[group]
if !exists {
return DeleteGroupResponse{}, errors.New("requested group was not part of the delete group response")
}
return g, g.Err
}

// DeleteGroups deletes all groups specified.
//
// The purpose of this request is to allow operators a way to delete groups
@@ -984,6 +1021,17 @@ func (rs FetchOffsetsResponses) On(group string, fn func(*FetchOffsetsResponse)
return FetchOffsetsResponse{}, kerr.GroupIDNotFound
}

// Error iterates over all responses and returns the first error encountered,
// if any.
func (rs FetchOffsetsResponses) Error() error {
for _, r := range rs {
if r.Err != nil {
return r.Err
}
}
return nil
}

// FetchManyOffsets issues a fetch offsets requests for each group specified.
//
// This function is a batch version of FetchOffsets. FetchOffsets and
@@ -1092,6 +1140,19 @@ func (ds DeleteOffsetsResponses) EachError(fn func(string, int32, error)) {
}
}

// Error iterates over all responses and returns the first error encountered,
// if any.
func (ds DeleteOffsetsResponses) Error() error {
for _, ps := range ds {
for _, err := range ps {
if err != nil {
return err
}
}
}
return nil
}

// DeleteOffsets deletes offsets for the given group.
//
// Originally, offset commits were persisted in Kafka for some retention time.
12 changes: 12 additions & 0 deletions pkg/kadm/partas.go
Original file line number Diff line number Diff line change
@@ -69,6 +69,18 @@ func (rs AlterPartitionAssignmentsResponses) Each(fn func(AlterPartitionAssignme
}
}

// Error returns the first error in the responses, if any.
func (rs AlterPartitionAssignmentsResponses) Error() error {
for _, ps := range rs {
for _, r := range ps {
if r.Err != nil {
return r.Err
}
}
}
return nil
}

// AlterPartitionAssignments alters partition assignments for the requested
// partitions, returning an error if the response could not be issued or if
// you do not have permissions.
83 changes: 65 additions & 18 deletions pkg/kadm/topics.go
Original file line number Diff line number Diff line change
@@ -91,24 +91,21 @@ func (rs CreateTopicResponses) On(topic string, fn func(*CreateTopicResponse) er
return CreateTopicResponse{}, kerr.UnknownTopicOrPartition
}

// Error iterates over all responses and returns the first error
// encountered, if any.
func (rs CreateTopicResponses) Error() error {
for _, r := range rs {
if r.Err != nil {
return r.Err
}
}
return nil
}

// CreateTopic issues a create topics request with the given partitions,
// replication factor, and (optional) configs for the given topic name. Under
// the hood, this uses the default 15s request timeout and lets Kafka choose
// where to place partitions. This function exists to complement CreateTopics,
// making the single-topic creation case easier to handle.
//
// Version 4 of the underlying create topic request was introduced in Kafka 2.4
// and brought client support for creation defaults. If talking to a 2.4+
// cluster, you can use -1 for partitions and replicationFactor to use broker
// defaults.
//
// This package includes a StringPtr function to aid in building config values.
//
// If the topic could not be created this function will return an error. An
// error may be returned due to authorization failure, a failed network
// request, a missing controller or other issues. If the request was successful
// but the CreateTopicResponse.Err is non-nil, this returns the error, so you
// do not need to additionally check the Err field.
// replication factor, and (optional) configs for the given topic name.
// This is similar to CreateTopics, but returns the kerr.ErrorForCode(response.ErrorCode)
// if the request/response is successful.
func (cl *Client) CreateTopic(
ctx context.Context,
partitions int32,
@@ -277,8 +274,34 @@ func (rs DeleteTopicResponses) On(topic string, fn func(*DeleteTopicResponse) er
return DeleteTopicResponse{}, kerr.UnknownTopicOrPartition
}

// Error iterates over all responses and returns the first error
// encountered, if any.
func (rs DeleteTopicResponses) Error() error {
for _, r := range rs {
if r.Err != nil {
return r.Err
}
}
return nil
}

// DeleteTopic issues a delete topic request for the given topic name with a
// (by default) 15s timeout. This is similar to DeleteTopics, but returns the
// kerr.ErrorForCode(response.ErrorCode) if the request/response is successful.
func (cl *Client) DeleteTopic(ctx context.Context, topic string) (DeleteTopicResponse, error) {
rs, err := cl.DeleteTopics(ctx, topic)
if err != nil {
return DeleteTopicResponse{}, err
}
r, exists := rs[topic]
if !exists {
return DeleteTopicResponse{}, errors.New("requested topic was not part of delete topic response")
}
return r, r.Err
}

// DeleteTopics issues a delete topics request for the given topic names with a
// 15s timeout.
// (by default) 15s timeout.
//
// This does not return an error on authorization failures, instead,
// authorization failures are included in the responses. This only returns an
@@ -402,6 +425,19 @@ func (rs DeleteRecordsResponses) On(topic string, partition int32, fn func(*Dele
return DeleteRecordsResponse{}, kerr.UnknownTopicOrPartition
}

// Error iterates over all responses and returns the first error
// encountered, if any.
func (rs DeleteRecordsResponses) Error() error {
for _, ps := range rs {
for _, r := range ps {
if r.Err != nil {
return r.Err
}
}
}
return nil
}

// DeleteRecords issues a delete records request for the given offsets. Per
// offset, only the Offset field needs to be set.
//
@@ -498,6 +534,17 @@ func (rs CreatePartitionsResponses) On(topic string, fn func(*CreatePartitionsRe
return CreatePartitionsResponse{}, kerr.UnknownTopicOrPartition
}

// Error iterates over all responses and returns the first error
// encountered, if any.
func (rs CreatePartitionsResponses) Error() error {
for _, r := range rs {
if r.Err != nil {
return r.Err
}
}
return nil
}

// CreatePartitions issues a create partitions request for the given topics,
// adding "add" partitions to each topic. This request lets Kafka choose where
// the new partitions should be.
16 changes: 7 additions & 9 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
@@ -2580,12 +2580,13 @@ func (g *groupConsumer) commitOffsetsSync(
onDone = func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error) {}
}

g.syncCommitMu.Lock() // block all other concurrent commits until our OnDone is done.

if err := g.waitJoinSyncMu(ctx); err != nil {
onDone(g.cl, kmsg.NewPtrOffsetCommitRequest(), kmsg.NewPtrOffsetCommitResponse(), err)
close(done)
return
}

g.syncCommitMu.Lock() // block all other concurrent commits until our OnDone is done.
unblockCommits := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
g.noCommitDuringJoinAndSync.RUnlock()
defer close(done)
@@ -2662,19 +2663,16 @@ func (cl *Client) CommitOffsets(
return
}

g.syncCommitMu.RLock() // block sync commit, but allow other concurrent Commit to cancel us
unblockSyncCommit := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
defer g.syncCommitMu.RUnlock()
onDone(cl, req, resp, err)
}

if err := g.waitJoinSyncMu(ctx); err != nil {
onDone(g.cl, kmsg.NewPtrOffsetCommitRequest(), kmsg.NewPtrOffsetCommitResponse(), err)
return
}

g.syncCommitMu.RLock() // block sync commit, but allow other concurrent Commit to cancel us
unblockJoinSync := func(cl *Client, req *kmsg.OffsetCommitRequest, resp *kmsg.OffsetCommitResponse, err error) {
g.noCommitDuringJoinAndSync.RUnlock()
unblockSyncCommit(cl, req, resp, err)
defer g.syncCommitMu.RUnlock()
onDone(cl, req, resp, err)
}

g.mu.Lock()
13 changes: 10 additions & 3 deletions pkg/kgo/partitioner.go
Original file line number Diff line number Diff line change
@@ -487,9 +487,16 @@ func KafkaHasher(hashFn func([]byte) uint32) PartitionerHasher {
}
}

// Deprecated: SaramaHasher is not compatible with Sarama's default partitioner
// and only remains to avoid re-keying records for existing users of this API. See
// [SaramaCompatHasher] for a correct partitioner.
// SaramaHasher is a historical misnamed partitioner. This library's original
// implementation of the SaramaHasher was incorrect, if you want an exact
// match for the Sarama partitioner, use the [SaramaCompatHasher].
//
// This partitioner remains because as it turns out, other ecosystems provide
// a similar partitioner and this partitioner is useful for compatibility.
//
// In particular, using this function with a crc32.ChecksumIEEE hasher makes
// this partitioner match librdkafka's consistent partitioner, or the
// zendesk/ruby-kafka partitioner.
func SaramaHasher(hashFn func([]byte) uint32) PartitionerHasher {
return func(key []byte, n int) int {
p := int(hashFn(key)) % n