Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: twmb/franz-go
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v1.15.0
Choose a base ref
...
head repository: twmb/franz-go
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v1.15.1
Choose a head ref

Commits on Sep 21, 2023

  1. kadm: bump deps

    twmb committed Sep 21, 2023
    Copy the full SHA
    b713611 View commit details
  2. Merge pull request #576 from twmb/kadm-deps

    kadm: bump deps
    twmb authored Sep 21, 2023
    Copy the full SHA
    bfe638e View commit details
  3. pkg/sr: use ptr type for SchemaMetadata and SchemaRuleSet (#577)

    * pkg/sr: use ptr type for SchemaMetadata and SchemaRuleSet
    shubham030 authored Sep 21, 2023
    Copy the full SHA
    c9d2351 View commit details

Commits on Sep 29, 2023

  1. Copy the full SHA
    4039539 View commit details
  2. Merge pull request #583 from twmb/sr-omitempty

    sr: make SetCompatibility fields optional
    twmb authored Sep 29, 2023
    Copy the full SHA
    4449480 View commit details
  3. kadm: populate GroupMemberLag.{Topic,Partition} always

    This was missing in one spot.
    twmb committed Sep 29, 2023
    Copy the full SHA
    1c2ccf9 View commit details
  4. Merge pull request #584 from twmb/kadm_lag_patch

    kadm: populate GroupMemberLag.{Topic,Partition} always
    twmb authored Sep 29, 2023
    Copy the full SHA
    9569753 View commit details

Commits on Oct 11, 2023

  1. build(deps): bump golang.org/x/net in /examples/bench/compare/segment

    Bumps [golang.org/x/net](https://github.com/golang/net) from 0.10.0 to 0.17.0.
    - [Commits](golang/net@v0.10.0...v0.17.0)
    
    ---
    updated-dependencies:
    - dependency-name: golang.org/x/net
      dependency-type: indirect
    ...
    
    Signed-off-by: dependabot[bot] <support@github.com>
    dependabot[bot] authored Oct 11, 2023
    Copy the full SHA
    57d5636 View commit details
  2. build(deps): bump golang.org/x/net in /examples/bench/compare/sarama

    Bumps [golang.org/x/net](https://github.com/golang/net) from 0.13.0 to 0.17.0.
    - [Commits](golang/net@v0.13.0...v0.17.0)
    
    ---
    updated-dependencies:
    - dependency-name: golang.org/x/net
      dependency-type: indirect
    ...
    
    Signed-off-by: dependabot[bot] <support@github.com>
    dependabot[bot] authored Oct 11, 2023
    Copy the full SHA
    5452514 View commit details
  3. build(deps): bump golang.org/x/net in /pkg/sasl/kerberos

    Bumps [golang.org/x/net](https://github.com/golang/net) from 0.13.0 to 0.17.0.
    - [Commits](golang/net@v0.13.0...v0.17.0)
    
    ---
    updated-dependencies:
    - dependency-name: golang.org/x/net
      dependency-type: indirect
    ...
    
    Signed-off-by: dependabot[bot] <support@github.com>
    dependabot[bot] authored Oct 11, 2023
    Copy the full SHA
    2d7e919 View commit details

Commits on Oct 12, 2023

  1. Merge pull request #595 from twmb/dependabot/go_modules/pkg/sasl/kerb…

    …eros/golang.org/x/net-0.17.0
    
    build(deps): bump golang.org/x/net from 0.13.0 to 0.17.0 in /pkg/sasl/kerberos
    twmb authored Oct 12, 2023
    Copy the full SHA
    f2c4281 View commit details
  2. Merge pull request #594 from twmb/dependabot/go_modules/examples/benc…

    …h/compare/sarama/golang.org/x/net-0.17.0
    
    build(deps): bump golang.org/x/net from 0.13.0 to 0.17.0 in /examples/bench/compare/sarama
    twmb authored Oct 12, 2023
    Copy the full SHA
    0249a71 View commit details
  3. Merge pull request #593 from twmb/dependabot/go_modules/examples/benc…

    …h/compare/segment/golang.org/x/net-0.17.0
    
    build(deps): bump golang.org/x/net from 0.10.0 to 0.17.0 in /examples/bench/compare/segment
    twmb authored Oct 12, 2023
    Copy the full SHA
    98f38a7 View commit details
  4. Copy the full SHA
    66974e8 View commit details
  5. Merge pull request #592 from r-vasquez/add-error-message

    kadm: include ErrorMessage in topic response
    twmb authored Oct 12, 2023
    Copy the full SHA
    ae169a1 View commit details

Commits on Oct 19, 2023

  1. KIP-890 proto

    twmb committed Oct 19, 2023
    Copy the full SHA
    11b0f91 View commit details

Commits on Oct 21, 2023

  1. GHA: use Kafka 3.5 for now

    Kafka 3.6 is a bit wonky (KAFKA-15653)
    twmb committed Oct 21, 2023
    Copy the full SHA
    d520096 View commit details
  2. kgo tests: add timestamps

    twmb committed Oct 21, 2023
    Copy the full SHA
    5cfb6a5 View commit details
  3. Copy the full SHA
    7e83e62 View commit details
  4. Merge pull request #599 from twmb/kip-890-proto

    KIP-890 proto
    twmb authored Oct 21, 2023
    Copy the full SHA
    6a961da View commit details
  5. go.{mod,sum}: pin to kmsg v1.7.0

    twmb committed Oct 21, 2023
    Copy the full SHA
    3273585 View commit details
  6. sticky balancer: try for better topic distribution among members

    The sticky balancer currently strives for ultimate stickiness, with no
    regard to trying to balance topic partitions among members equally. When
    adding a member, it is often the case that an entire topic's partitions
    shifts to the other member, while the first member has the other topic.
    
    By sorting by partition number before balancing, when the algorithm
    steals partitions from the end of an existing member to give to the new
    member, we ensure that we divvy up the topics equally to both members
    while still ensuring stickiness.
    
    This is likely not perfect but it goes a long way.
    twmb committed Oct 21, 2023
    Copy the full SHA
    1429d47 View commit details
  7. kgo: do not rotate the consumer session when pausing topics/partitions

    Issue #489 asked to stop returning data after a partition was paused --
    the original implementation of pausing kept returning any data that was
    in flight or already buffered, and simply stopped fetching new data.
    
    489 was dealt with by bumping the consumer session, which kills all
    in flight fetch requests. This was easy, but can cause a lot of
    connection churn if pausing and resuming a lot -- which is #585.
    
    The new implementation allows fetches to complete, but strips data
    from fetches based on what is paused at the moment the fetches are being
    returned to the client. This does make polling paused fetches very
    slightly slower (a map lookup per partition), but there's only so much
    that's possible. If a partition is paused, we drop the data and do not
    advance the internal offset. If a partition is not paused, we keep the
    data and return it -- same as before.
    twmb committed Oct 21, 2023
    Copy the full SHA
    0ecb52b View commit details
  8. kgo: avoid rare panic

    Scenario is:
    * Metadata update is actively running and has stopped an active session,
      returning all topicPartitions that were actively in list/epoch. These
      list/epoch loads are stored in reloadOffsets. Metadata grabs the
      session change mutex.
    * Client.Close is now called, stores client.consumer.kill(true). The
      Close is blocked briefly because Close calls assignPartitions which
      tries to lock to stop the session. Close is now paused -- however,
      importantly, the consumer.kill atomic is set to true.
    * Metadata tries to start a new session. startNewSession returns
      noConsumerSession because consumer.kill is now true.
    * Metadata calls reloadOffsets.loadWithSession, which panics once
      the session tries to access the client variable c.
    
    This panic can only happen if all of the following are true:
    * Client.Close is being called
    * Metadata is updating
    * Metadata response is moving a partition from one broker to another
    * The timing is perfect
    
    The fix to this is to check in listOrEpoch if the consumerSession is
    noConsumerSession. If so, return early.
    
    Note that doOnMetadataUpdate, incWorker, and decWorker already checked
    noConsumerSession. The other methods do not need to check:
    * mapLoadsToBrokers is called in listOrEpochs on a valid session
    * handleListOrEpochResults is the same
    * desireFetch is only called in source after noConsumerSession is
      checked, and manageFetchConcurrency is called only in desireFetch
    
    Closes redpanda-data/redpanda#13791.
    twmb committed Oct 21, 2023
    Copy the full SHA
    c013050 View commit details
  9. kadm: do not reuse ApiVersions in many concurrent requests

    The client calls SetVersion internally per request, so doing this
    concurrently leads to races.
    twmb committed Oct 21, 2023
    Copy the full SHA
    1955938 View commit details
  10. kgo group: switch memberID and generation to atomics

    Previously these required a mutex on write and read because of the rare
    (and mostly erroneous)  chance that a person is committing during a
    rebalance.
    
    Well, that makes the next transactional commit harder and is overkill --
    switching to atomics doesn't change any correctness bit but allows us to
    worry about deadlocks just a bit less.
    twmb committed Oct 21, 2023
    Copy the full SHA
    39e28c0 View commit details
  11. kgo: allow PreTxnCommitFnContext to modify empty offsets

    This builds the TxnOffsetCommitRequest early so that the hook can modify
    it. If the modified request has no topics to commit, then we abort as
    though uncommitted was empty.
    iamnoah authored and twmb committed Oct 21, 2023
    Copy the full SHA
    54a7418 View commit details

Commits on Oct 22, 2023

  1. kgo: reintroduce random broker iteration

    Random iteration was removed with 1e5c11d
    We can reintroduce random iteration easily enough, while still keeping
    the behavior of try-a-seed-occasionally.
    
    Closes #579.
    twmb committed Oct 22, 2023
    Copy the full SHA
    b2ccc2f View commit details
  2. kgo: add sharding for AddPartitionsToTxn for KIP-890

    This is more of a forward looking commit, in that kadm will eventually
    introduce support for this. We now basically handle v4 properly, even
    though KIP-890 dictates that v4 isn't meant to be sent by clients, it is
    indeed still necessary and not sending it results in INVALID_TXN_STATE
    errors.
    
    Also properly adds the WriteTxnMarkers sharder to the switch, though
    nothing really should send that request so that doesn't really fix any
    bugs.
    
    Kafka 3.6 has an NPE handling produce requests frequently, see
    KAFKA-15653, so tests may fail against 3.6 occasionally.
    twmb committed Oct 22, 2023
    Copy the full SHA
    fe5a660 View commit details
  3. pkg/kversion: detect 3.6

    This does not yet add V3_6_0 since that will require a minor.
    twmb committed Oct 22, 2023
    Copy the full SHA
    2a3b6bd View commit details
  4. Merge pull request #600 from twmb/kip-890

    KIP-890 (part 1)
    twmb authored Oct 22, 2023
    Copy the full SHA
    a905bf0 View commit details
  5. Merge pull request #601 from twmb/589

    kgo: do not rotate the consumer session when pausing topics/partitions
    twmb authored Oct 22, 2023
    Copy the full SHA
    3d115f1 View commit details
  6. Merge pull request #602 from twmb/sticky-move

    sticky balancer: try for better topic distribution among members
    twmb authored Oct 22, 2023
    Copy the full SHA
    e94230f View commit details
  7. Merge pull request #603 from twmb/rp-13791

    kgo: avoid rare panic
    twmb authored Oct 22, 2023
    Copy the full SHA
    067ec8e View commit details
  8. Merge pull request #604 from twmb/597

    kadm: do not reuse ApiVersions in many concurrent requests
    twmb authored Oct 22, 2023
    Copy the full SHA
    d156322 View commit details
  9. Merge pull request #605 from twmb/580

    kgo: allow PreTxnCommitFnContext to modify empty offsets
    twmb authored Oct 22, 2023
    Copy the full SHA
    ec02fac View commit details
  10. Merge pull request #606 from twmb/579

    kgo: reintroduce random broker iteration
    twmb authored Oct 22, 2023
    Copy the full SHA
    913b4b0 View commit details
  11. CHANGELOG: note v1.15.1

    twmb committed Oct 22, 2023
    Copy the full SHA
    fff4001 View commit details
  12. Merge pull request #607 from twmb/v1.15.1-changelog

    CHANGELOG: note v1.15.1
    twmb authored Oct 22, 2023
    Copy the full SHA
    019f134 View commit details
2 changes: 1 addition & 1 deletion .github/workflows/lint-and-test.yml
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@ jobs:
container: golang:1.20.3
services:
kafka:
image: bitnami/kafka:latest
image: bitnami/kafka:3.5
ports:
- 9092:9092
env:
7 changes: 3 additions & 4 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# We may as well allow multiple golangci-lint invocations at once.
run:
allow-parallel-runners: true
go: "1.19"
go: "1.21"

# golangci-lint by default ignores some staticcheck and vet raised issues that
# are actually important to catch. The following ensures that we do not ignore
@@ -68,15 +68,14 @@ linters-settings:
#
# https://github.com/mvdan/gofumpt/issues/137
gofumpt:
lang-version: "1.19"
lang-version: "1.21"
extra-rules: true

gosec:
excludes:
- G104 # unhandled errors, we exclude for the same reason we do not use errcheck
- G404 # we want math/rand


# Gocritic is a meta linter that has very good lints, and most of the
# experimental ones are very good too. We opt into everything, which helps
# us when we upgrade golangci-lint, and we specifically opt out of a batch.
@@ -164,5 +163,5 @@ linters-settings:
# contexts for beneficial reasons, and we disable the SSLv3 deprecation
# warning because this is solely for a debug log.
staticcheck:
go: "1.19"
go: "1.21"
checks: ["all", "-SA1012", "-SA1019"]
31 changes: 31 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,34 @@
v1.15.1
===

This patch release contains a bunch of internal improvements to kgo and
includes a bugfix for a very hard to encounter logic race. Each improvement
is a bit focused on a specific use case, so I recommend reading any relevant-to-you
commit message below.

As well, the kversion package now detects Kafka 3.6, and the kgo package now
handles AddPartitionsToTxn v4 (however, you will probably not be issuing this
request).

Lastly, this release is paired with a minor kadm release, which adds the
ErrMessage field CreateTopicsResponse and DeleteTopicsResponse, and,
importantly, fixes a data race in the ApiVersions request.

#### franz-go

- [`2a3b6bd`](https://github.com/twmb/franz-go/commit/2a3b6bd) **improvement** kversion: detect 3.6
- [`fe5a660`](https://github.com/twmb/franz-go/commit/fe5a660) **improvement** kgo: add sharding for AddPartitionsToTxn for KIP-890
- [`b2ccc2f`](https://github.com/twmb/franz-go/commit/b2ccc2f) **improvement** kgo: reintroduce random broker iteration
- [`54a7418`](https://github.com/twmb/franz-go/commit/54a7418) **improvement** kgo: allow PreTxnCommitFnContext to modify empty offsets
- [`c013050`](https://github.com/twmb/franz-go/commit/c013050) **bugfix** kgo: avoid rare panic
- [`0ecb52b`](https://github.com/twmb/franz-go/commit/0ecb52b) **improvement** kgo: do not rotate the consumer session when pausing topics/partitions
- [`1429d47`](https://github.com/twmb/franz-go/commit/1429d47) **improvement** sticky balancer: try for better topic distribution among members

#### kadm

- [`1955938`](https://github.com/twmb/franz-go/commit/1955938) **bugfix** kadm: do not reuse ApiVersions in many concurrent requests
- [`66974e8`](https://github.com/twmb/franz-go/commit/66974e8) **feature** kadm: include ErrMessage in topic response

v1.15.0
===

4 changes: 2 additions & 2 deletions examples/bench/compare/sarama/go.mod
Original file line number Diff line number Diff line change
@@ -21,6 +21,6 @@ require (
github.com/klauspost/compress v1.16.7 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/net v0.13.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/net v0.17.0 // indirect
)
8 changes: 4 additions & 4 deletions examples/bench/compare/sarama/go.sum
Original file line number Diff line number Diff line change
@@ -55,17 +55,17 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.13.0 h1:Nvo8UFsZ8X3BhAC9699Z1j7XQ3rsZnUUm7jfBEk1ueY=
golang.org/x/net v0.13.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
4 changes: 2 additions & 2 deletions examples/bench/compare/segment/go.mod
Original file line number Diff line number Diff line change
@@ -10,6 +10,6 @@ require (
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/text v0.13.0 // indirect
)
8 changes: 4 additions & 4 deletions examples/bench/compare/segment/go.sum
Original file line number Diff line number Diff line change
@@ -30,8 +30,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -48,8 +48,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.11.0 h1:LAntKIrcmeSKERyiOh0XMV39LXS8IE9UL2yP7+f5ij4=
golang.org/x/text v0.11.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
44 changes: 38 additions & 6 deletions generate/definitions/24_add_partitions_to_txn
Original file line number Diff line number Diff line change
@@ -2,28 +2,60 @@
// partitions in the request. Before producing any records to a partition in
// the transaction, that partition must have been added to the transaction with
// this request.
AddPartitionsToTxnRequest => key 24, max version 3, flexible v3+, txn coordinator
//
// Versions 3 and below are exclusively used by clients and versions 4 and
// above are used by brokers.
//
// Version 4 adds VerifyOnly field to check if partitions are already in
// transaction and adds support to batch multiple transactions.
AddPartitionsToTxnRequest => key 24, max version 4, flexible v3+, txn coordinator
// TransactionalID is the transactional ID to use for this request.
TransactionalID: string
TransactionalID: string // v0-v3
// ProducerID is the producer ID of the client for this transactional ID
// as received from InitProducerID.
ProducerID: int64
ProducerID: int64 // v0-v3
// ProducerEpoch is the producer epoch of the client for this transactional ID
// as received from InitProducerID.
ProducerEpoch: int16
ProducerEpoch: int16 // v0-v3
// Topics are topics to add as part of the producer side of a transaction.
Topics: [=>]
Topics: [=>] // v0-v3
// Topic is a topic name.
Topic: string
// Partitions are partitions within a topic to add as part of the producer
// side of a transaction.
Partitions: [int32]
// The list of transactions to add partitions to, for v4+, for brokers only.
// The fields in this are batch broker requests that duplicate the above fields
// and thus are undocumented (except VerifyOnly, which is new).
Transactions: [=>] // v4+
TransactionalID: string
ProducerID: int64
ProducerEpoch: int16
// VerifyOnly signifies if we want to check if the partition is in the
// transaction rather than add it.
VerifyOnly: bool
Topics: [=>]
Topic: string
Partitions: [int32]

// AddPartitionsToTxnResponse is a response to an AddPartitionsToTxnRequest.
AddPartitionsToTxnResponse =>
ThrottleMillis(1)
// The response top level error code.
ErrorCode: int16 // v4+
// Results categorized by transactional ID, v4+ only, for brokers only.
// The fields duplicate v3 and below fields (except TransactionalID) and
// are left undocumented.
Transactions: [=>] // v4+
// The transactional id corresponding to the transaction.
TransactionalID: string
Topics: [=>]
Topic: string
Partitions: [=>]
Partition: int32
ErrorCode: int16
// Topics are responses to topics in the request.
Topics: [=>]
Topics: [=>] // v0-v3
// Topic is a topic being responded to.
Topic: string
// Partitions are responses to partitions in the request.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ go 1.18
require (
github.com/klauspost/compress v1.16.7
github.com/pierrec/lz4/v4 v4.1.18
github.com/twmb/franz-go/pkg/kmsg v1.6.1
github.com/twmb/franz-go/pkg/kmsg v1.7.0
golang.org/x/crypto v0.11.0
)

4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGC
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/twmb/franz-go/pkg/kmsg v1.6.1 h1:tm6hXPv5antMHLasTfKv9R+X03AjHSkSkXhQo2c5ALM=
github.com/twmb/franz-go/pkg/kmsg v1.6.1/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E=
github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
2 changes: 1 addition & 1 deletion pkg/kadm/acls.go
Original file line number Diff line number Diff line change
@@ -924,7 +924,7 @@ func (cl *Client) DescribeACLs(ctx context.Context, b *ACLBuilder) (DescribeACLs
)
defer cancel()
for i := range descs {
req := descs[i]
req := descs[i] // each req is unique per loop, we are not reusing req, this is safe
myIdx := i
wg.Add(1)
go func() {
6 changes: 3 additions & 3 deletions pkg/kadm/go.mod
Original file line number Diff line number Diff line change
@@ -3,12 +3,12 @@ module github.com/twmb/franz-go/pkg/kadm
go 1.19

require (
github.com/twmb/franz-go v1.14.3
github.com/twmb/franz-go v1.15.0
github.com/twmb/franz-go/pkg/kmsg v1.6.1
golang.org/x/crypto v0.11.0
golang.org/x/crypto v0.13.0
)

require (
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
)
12 changes: 6 additions & 6 deletions pkg/kadm/go.sum
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/twmb/franz-go v1.14.3 h1:cq8rxAnVYU1uF3SRVn8eEaUf+AaXKWlB0Cl3Ca7JSa4=
github.com/twmb/franz-go v1.14.3/go.mod h1:nMAvTC2kHtK+ceaSHeHm4dlxC78389M/1DjpOswEgu4=
github.com/twmb/franz-go v1.15.0 h1:bw5n1COKJzWpkCXG/kMtHrurcS9HSWV6e3If5CUdc+M=
github.com/twmb/franz-go v1.15.0/go.mod h1:nMAvTC2kHtK+ceaSHeHm4dlxC78389M/1DjpOswEgu4=
github.com/twmb/franz-go/pkg/kmsg v1.6.1 h1:tm6hXPv5antMHLasTfKv9R+X03AjHSkSkXhQo2c5ALM=
github.com/twmb/franz-go/pkg/kmsg v1.6.1/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
12 changes: 7 additions & 5 deletions pkg/kadm/groups.go
Original file line number Diff line number Diff line change
@@ -1618,9 +1618,9 @@ func calculateEmptyLag(commit OffsetResponses, endOffsets ListedOffsets) GroupLa
}

lt[p] = GroupMemberLag{
Commit: pcommit.Offset,
Topic: t,
Partition: p,
Commit: pcommit.Offset,
End: pend,
Lag: lag,
Err: perr,
@@ -1650,10 +1650,12 @@ func calculateEmptyLag(commit OffsetResponses, endOffsets ListedOffsets) GroupLa
lag = pend.Offset
}
lt[p] = GroupMemberLag{
Commit: pcommit,
End: pend,
Lag: lag,
Err: perr,
Topic: t,
Partition: p,
Commit: pcommit,
End: pend,
Lag: lag,
Err: perr,
}
}
}
7 changes: 3 additions & 4 deletions pkg/kadm/misc.go
Original file line number Diff line number Diff line change
@@ -244,10 +244,6 @@ func (cl *Client) ApiVersions(ctx context.Context) (BrokersApiVersions, error) {
return nil, err
}

req := kmsg.NewPtrApiVersionsRequest()
req.ClientSoftwareName = "kadm"
req.ClientSoftwareVersion = softwareVersion()

var mu sync.Mutex
var wg sync.WaitGroup
vs := make(BrokersApiVersions, len(m.Brokers))
@@ -256,6 +252,9 @@ func (cl *Client) ApiVersions(ctx context.Context) (BrokersApiVersions, error) {
wg.Add(1)
go func() {
defer wg.Done()
req := kmsg.NewPtrApiVersionsRequest()
req.ClientSoftwareName = "kadm"
req.ClientSoftwareVersion = softwareVersion()
v := BrokerApiVersions{NodeID: n, keyVersions: make(map[int16]minmax)}
v.raw, v.Err = req.RequestWith(ctx, cl.cl.Broker(int(n)))

24 changes: 14 additions & 10 deletions pkg/kadm/topics.go
Original file line number Diff line number Diff line change
@@ -231,9 +231,10 @@ func (cl *Client) createTopics(ctx context.Context, dry bool, p int32, rf int16,

// DeleteTopicResponse contains the response for an individual deleted topic.
type DeleteTopicResponse struct {
Topic string // Topic is the topic that was deleted, if not using topic IDs.
ID TopicID // ID is the topic ID for this topic, if talking to Kafka v2.8+ and using topic IDs.
Err error // Err is any error preventing this topic from being deleted.
Topic string // Topic is the topic that was deleted, if not using topic IDs.
ID TopicID // ID is the topic ID for this topic, if talking to Kafka v2.8+ and using topic IDs.
Err error // Err is any error preventing this topic from being deleted.
ErrMessage string // ErrMessage a potential extra message describing any error.
}

// DeleteTopicResponses contains per-topic responses for deleted topics.
@@ -311,9 +312,10 @@ func (cl *Client) DeleteTopics(ctx context.Context, topics ...string) (DeleteTop
topic = *t.Topic
}
rs[topic] = DeleteTopicResponse{
Topic: topic,
ID: t.TopicID,
Err: kerr.ErrorForCode(t.ErrorCode),
Topic: topic,
ID: t.TopicID,
Err: kerr.ErrorForCode(t.ErrorCode),
ErrMessage: unptrStr(t.ErrorMessage),
}
}
return rs, nil
@@ -457,8 +459,9 @@ func (cl *Client) DeleteRecords(ctx context.Context, os Offsets) (DeleteRecordsR
// CreatePartitionsResponse contains the response for an individual topic from
// a create partitions request.
type CreatePartitionsResponse struct {
Topic string // Topic is the topic this response is for.
Err error // Err is non-nil if partitions were unable to be added to this topic.
Topic string // Topic is the topic this response is for.
Err error // Err is non-nil if partitions were unable to be added to this topic.
ErrMessage string // ErrMessage a potential extra message describing any error.
}

// CreatePartitionsResponses contains per-topic responses for a create
@@ -584,8 +587,9 @@ func (cl *Client) createPartitions(ctx context.Context, dry bool, add, set int,
rs := make(CreatePartitionsResponses)
for _, t := range resp.Topics {
rs[t.Topic] = CreatePartitionsResponse{
Topic: t.Topic,
Err: kerr.ErrorForCode(t.ErrorCode),
Topic: t.Topic,
Err: kerr.ErrorForCode(t.ErrorCode),
ErrMessage: unptrStr(t.ErrorMessage),
}
}
return rs, nil
5 changes: 4 additions & 1 deletion pkg/kgo/broker.go
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ import (
"fmt"
"io"
"math"
"math/rand"
"net"
"os"
"strconv"
@@ -948,7 +949,9 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
if latencyMillis > minPessimismMillis {
minPessimismMillis = latencyMillis
}
maxPessimismMillis := float64(lifetimeMillis) * (0.05 - 0.03*cxn.b.cl.rng()) // 95 to 98% of lifetime (pessimism 2% to 5%)
var random float64
cxn.b.cl.rng(func(r *rand.Rand) { random = r.Float64() })
maxPessimismMillis := float64(lifetimeMillis) * (0.05 - 0.03*random) // 95 to 98% of lifetime (pessimism 2% to 5%)

// Our minimum lifetime is always 1s (or latency, if larger).
// When our max pessimism becomes more than min pessimism,
Loading