Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: segmentio/kafka-go
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v0.4.42
Choose a base ref
...
head repository: segmentio/kafka-go
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v0.4.43
Choose a head ref
  • 6 commits
  • 26 files changed
  • 4 contributors

Commits on Jul 10, 2023

  1. run tests against unsafe tag as well (#1159)

    * run tests against unsafe tag as well
    
    * differentiate test step name
    
    * use random transactional id
    rhansen2 authored Jul 10, 2023
    Copy the full SHA
    c293a8c View commit details

Commits on Jul 18, 2023

  1. offsetfetch request topics are now nullable (#1162)

    * offsetfetch request topics are now nullable
    
    * new unit tests for offsetfetch was added
    amortezaei authored Jul 18, 2023

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    8ceaf94 View commit details

Commits on Jul 21, 2023

  1. Adds attempt number to log message when failing to write messages. Th…

    …is is done to make it easier to write better error logger implementations. For instance, when using IAM auth for MSK, the first write will fail after credentials rotate. The connection is re-negotiated and the retry is successful, but the logs are filled with errors. This lets us ignore the error of the first attempt, but log subsequent ones. (#1165)
    jcarter3 authored Jul 21, 2023

    Verified

    This commit was signed with the committer’s verified signature.
    SimenB Simen Bekkhus
    Copy the full SHA
    861e102 View commit details

Commits on Jul 28, 2023

  1. support userscramcredentials apis (#1168)

    * userscramcredentials protocols
    
    * alteruserscramcredentials working
    
    * describeuserscramcredentials working
    
    * gofmt -s -w alteruserscramcredentials_test.go
    
    * fix typo
    
    * add tests for deletion
    
    * gofmt
    
    * improve test
    
    * separate alteruserscramcredentials_test and describeuserscramcredentials_test
    
    * add protocol tests
    
    * remove unused v1 constant
    
    * change iterations from int32 to int
    
    * keep errors with results
    petedannemann authored Jul 28, 2023
    Copy the full SHA
    6193fa9 View commit details
  2. Support describeacls (#1166)

    * Support describeacls
    
    * gofmt -s -w createacl_test.go
    
    * make test diff smaller and fix protocl api key
    
    * fix another protocol api key
    
    * improve test name
    
    * protocol fixes
    
    * add missing patterntype
    
    * fix createacls protocol
    
    * fix tags and add tagged fields back in
    
    * bump createacls version to v3
    
    * wip
    
    * just one filter, not a list of filters
    
    * add missing patterntype in test
    
    * fix patterntype location
    
    * add prototests
    
    * createacl_test.go -> createacls_test.go
    
    * seperate createacls_test and describeacls_test
    
    * fix describeaclstest
    
    * add comment for ResourcePatternTypeFilter
    petedannemann authored Jul 28, 2023
    Copy the full SHA
    f4ca0b4 View commit details

Commits on Sep 13, 2023

  1. Deleteacls support (#1174)

    * support deleteacls
    
    * add deleteacls_test
    
    * add protocol test
    
    * test that acl was deleted
    
    * trigger build
    petedannemann authored Sep 13, 2023

    Verified

    This commit was signed with the committer’s verified signature.
    SimenB Simen Bekkhus
    Copy the full SHA
    9ecb9d2 View commit details
3 changes: 3 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -51,6 +51,9 @@ jobs:
- run:
name: Test kafka-go
command: go test -race -cover ./...
- run:
name: Test kafka-go unsafe
command: go test -tags=unsafe -race -cover ./...
- run:
name: Test kafka-go/sasl/aws_msk_iam
working_directory: ./sasl/aws_msk_iam
107 changes: 107 additions & 0 deletions alteruserscramcredentials.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package kafka

import (
"context"
"fmt"
"net"
"time"

"github.com/segmentio/kafka-go/protocol/alteruserscramcredentials"
)

// AlterUserScramCredentialsRequest represents a request sent to a kafka broker to
// alter user scram credentials.
type AlterUserScramCredentialsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr

// List of credentials to delete.
Deletions []UserScramCredentialsDeletion

// List of credentials to upsert.
Upsertions []UserScramCredentialsUpsertion
}

type ScramMechanism int8

const (
ScramMechanismUnknown ScramMechanism = iota // 0
ScramMechanismSha256 // 1
ScramMechanismSha512 // 2
)

type UserScramCredentialsDeletion struct {
Name string
Mechanism ScramMechanism
}

type UserScramCredentialsUpsertion struct {
Name string
Mechanism ScramMechanism
Iterations int
Salt []byte
SaltedPassword []byte
}

// AlterUserScramCredentialsResponse represents a response from a kafka broker to an alter user
// credentials request.
type AlterUserScramCredentialsResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration

// List of altered user scram credentials.
Results []AlterUserScramCredentialsResponseUser
}

type AlterUserScramCredentialsResponseUser struct {
User string
Error error
}

// AlterUserScramCredentials sends user scram credentials alteration request to a kafka broker and returns
// the response.
func (c *Client) AlterUserScramCredentials(ctx context.Context, req *AlterUserScramCredentialsRequest) (*AlterUserScramCredentialsResponse, error) {
deletions := make([]alteruserscramcredentials.RequestUserScramCredentialsDeletion, len(req.Deletions))
upsertions := make([]alteruserscramcredentials.RequestUserScramCredentialsUpsertion, len(req.Upsertions))

for deletionIdx, deletion := range req.Deletions {
deletions[deletionIdx] = alteruserscramcredentials.RequestUserScramCredentialsDeletion{
Name: deletion.Name,
Mechanism: int8(deletion.Mechanism),
}
}

for upsertionIdx, upsertion := range req.Upsertions {
upsertions[upsertionIdx] = alteruserscramcredentials.RequestUserScramCredentialsUpsertion{
Name: upsertion.Name,
Mechanism: int8(upsertion.Mechanism),
Iterations: int32(upsertion.Iterations),
Salt: upsertion.Salt,
SaltedPassword: upsertion.SaltedPassword,
}
}

m, err := c.roundTrip(ctx, req.Addr, &alteruserscramcredentials.Request{
Deletions: deletions,
Upsertions: upsertions,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).AlterUserScramCredentials: %w", err)
}

res := m.(*alteruserscramcredentials.Response)
responseEntries := make([]AlterUserScramCredentialsResponseUser, len(res.Results))

for responseIdx, responseResult := range res.Results {
responseEntries[responseIdx] = AlterUserScramCredentialsResponseUser{
User: responseResult.User,
Error: makeError(responseResult.ErrorCode, responseResult.ErrorMessage),
}
}
ret := &AlterUserScramCredentialsResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Results: responseEntries,
}

return ret, nil
}
73 changes: 73 additions & 0 deletions alteruserscramcredentials_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package kafka

import (
"context"
"testing"

ktesting "github.com/segmentio/kafka-go/testing"
)

func TestAlterUserScramCredentials(t *testing.T) {
// https://issues.apache.org/jira/browse/KAFKA-10259
if !ktesting.KafkaIsAtLeast("2.7.0") {
return
}

client, shutdown := newLocalClient()
defer shutdown()

name := makeTopic()

createRes, err := client.AlterUserScramCredentials(context.Background(), &AlterUserScramCredentialsRequest{
Upsertions: []UserScramCredentialsUpsertion{
{
Name: name,
Mechanism: ScramMechanismSha512,
Iterations: 15000,
Salt: []byte("my-salt"),
SaltedPassword: []byte("my-salted-password"),
},
},
})

if err != nil {
t.Fatal(err)
}

if len(createRes.Results) != 1 {
t.Fatalf("expected 1 createResult; got %d", len(createRes.Results))
}

if createRes.Results[0].User != name {
t.Fatalf("expected createResult with user: %s, got %s", name, createRes.Results[0].User)
}

if createRes.Results[0].Error != nil {
t.Fatalf("didn't expect an error in createResult, got %v", createRes.Results[0].Error)
}

deleteRes, err := client.AlterUserScramCredentials(context.Background(), &AlterUserScramCredentialsRequest{
Deletions: []UserScramCredentialsDeletion{
{
Name: name,
Mechanism: ScramMechanismSha512,
},
},
})

if err != nil {
t.Fatal(err)
}

if len(deleteRes.Results) != 1 {
t.Fatalf("expected 1 deleteResult; got %d", len(deleteRes.Results))
}

if deleteRes.Results[0].User != name {
t.Fatalf("expected deleteResult with user: %s, got %s", name, deleteRes.Results[0].User)
}

if deleteRes.Results[0].Error != nil {
t.Fatalf("didn't expect an error in deleteResult, got %v", deleteRes.Results[0].Error)
}
}
11 changes: 7 additions & 4 deletions createacl_test.go → createacls_test.go
Original file line number Diff line number Diff line change
@@ -15,15 +15,18 @@ func TestClientCreateACLs(t *testing.T) {
client, shutdown := newLocalClient()
defer shutdown()

res, err := client.CreateACLs(context.Background(), &CreateACLsRequest{
topic := makeTopic()
group := makeGroupID()

createRes, err := client.CreateACLs(context.Background(), &CreateACLsRequest{
ACLs: []ACLEntry{
{
Principal: "User:alice",
PermissionType: ACLPermissionTypeAllow,
Operation: ACLOperationTypeRead,
ResourceType: ResourceTypeTopic,
ResourcePatternType: PatternTypeLiteral,
ResourceName: "fake-topic-for-alice",
ResourceName: topic,
Host: "*",
},
{
@@ -32,7 +35,7 @@ func TestClientCreateACLs(t *testing.T) {
Operation: ACLOperationTypeRead,
ResourceType: ResourceTypeGroup,
ResourcePatternType: PatternTypeLiteral,
ResourceName: "fake-group-for-bob",
ResourceName: group,
Host: "*",
},
},
@@ -41,7 +44,7 @@ func TestClientCreateACLs(t *testing.T) {
t.Fatal(err)
}

for _, err := range res.Errors {
for _, err := range createRes.Errors {
if err != nil {
t.Error(err)
}
114 changes: 114 additions & 0 deletions deleteacls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package kafka

import (
"context"
"fmt"
"net"
"time"

"github.com/segmentio/kafka-go/protocol/deleteacls"
)

// DeleteACLsRequest represents a request sent to a kafka broker to delete
// ACLs.
type DeleteACLsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr

// List of ACL filters to use for deletion.
Filters []DeleteACLsFilter
}

type DeleteACLsFilter struct {
ResourceTypeFilter ResourceType
ResourceNameFilter string
ResourcePatternTypeFilter PatternType
PrincipalFilter string
HostFilter string
Operation ACLOperationType
PermissionType ACLPermissionType
}

// DeleteACLsResponse represents a response from a kafka broker to an ACL
// deletion request.
type DeleteACLsResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration

// List of the results from the deletion request.
Results []DeleteACLsResult
}

type DeleteACLsResult struct {
Error error
MatchingACLs []DeleteACLsMatchingACLs
}

type DeleteACLsMatchingACLs struct {
Error error
ResourceType ResourceType
ResourceName string
ResourcePatternType PatternType
Principal string
Host string
Operation ACLOperationType
PermissionType ACLPermissionType
}

// DeleteACLs sends ACLs deletion request to a kafka broker and returns the
// response.
func (c *Client) DeleteACLs(ctx context.Context, req *DeleteACLsRequest) (*DeleteACLsResponse, error) {
filters := make([]deleteacls.RequestFilter, 0, len(req.Filters))

for _, filter := range req.Filters {
filters = append(filters, deleteacls.RequestFilter{
ResourceTypeFilter: int8(filter.ResourceTypeFilter),
ResourceNameFilter: filter.ResourceNameFilter,
ResourcePatternTypeFilter: int8(filter.ResourcePatternTypeFilter),
PrincipalFilter: filter.PrincipalFilter,
HostFilter: filter.HostFilter,
Operation: int8(filter.Operation),
PermissionType: int8(filter.PermissionType),
})
}

m, err := c.roundTrip(ctx, req.Addr, &deleteacls.Request{
Filters: filters,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).DeleteACLs: %w", err)
}

res := m.(*deleteacls.Response)

results := make([]DeleteACLsResult, 0, len(res.FilterResults))

for _, result := range res.FilterResults {
matchingACLs := make([]DeleteACLsMatchingACLs, 0, len(result.MatchingACLs))

for _, matchingACL := range result.MatchingACLs {
matchingACLs = append(matchingACLs, DeleteACLsMatchingACLs{
Error: makeError(matchingACL.ErrorCode, matchingACL.ErrorMessage),
ResourceType: ResourceType(matchingACL.ResourceType),
ResourceName: matchingACL.ResourceName,
ResourcePatternType: PatternType(matchingACL.ResourcePatternType),
Principal: matchingACL.Principal,
Host: matchingACL.Host,
Operation: ACLOperationType(matchingACL.Operation),
PermissionType: ACLPermissionType(matchingACL.PermissionType),
})
}

results = append(results, DeleteACLsResult{
Error: makeError(result.ErrorCode, result.ErrorMessage),
MatchingACLs: matchingACLs,
})
}

ret := &DeleteACLsResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Results: results,
}

return ret, nil
}
Loading