Skip to content

Commit

Permalink
Merge pull request #988 from yanmxa/br_kafka_confluent
Browse files Browse the repository at this point in the history
  • Loading branch information
embano1 committed Mar 29, 2024
2 parents e118aba + 5cd87ea commit e6a74ef
Show file tree
Hide file tree
Showing 20 changed files with 1,499 additions and 5 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ jobs:
- 9091:9091
- 9092:9092

kafka_confluent:
image: confluentinc/confluent-local:7.6.0
ports:
- "9192:9192"
env:
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:29192,PLAINTEXT_HOST://localhost:9192'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@localhost:29193'
KAFKA_LISTENERS: 'PLAINTEXT://localhost:29192,CONTROLLER://localhost:29193,PLAINTEXT_HOST://0.0.0.0:9192'

natss:
image: nats-streaming:0.22.1
ports:
Expand Down
3 changes: 2 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ err := json.Unmarshal(bytes, &event)
| AVRO Event Format | :x: | :x: |
| [HTTP Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/http) | :heavy_check_mark: | :heavy_check_mark: |
| [JSON Event Format](event_data_structure.md#marshalunmarshal-event-to-json) | :heavy_check_mark: | :heavy_check_mark: |
| [Kafka Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/kafka) | :heavy_check_mark: | :heavy_check_mark: |
| [Sarama Kafka Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/kafka) | :heavy_check_mark: | :heavy_check_mark: |
| [Confluent Kafka Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/kafka_confluent) | :heavy_check_mark: | :heavy_check_mark: |
| MQTT Protocol Binding | :x: | :x: |
| [NATS Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/nats) | :heavy_check_mark: | :heavy_check_mark: |
| [STAN Protocol Binding](https://github.com/cloudevents/sdk-go/tree/main/samples/stan) | :heavy_check_mark: | :heavy_check_mark: |
Expand Down
25 changes: 25 additions & 0 deletions protocol/kafka_confluent/v2/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
module github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2

go 1.18

replace github.com/cloudevents/sdk-go/v2 => ../../../v2

require (
github.com/cloudevents/sdk-go/v2 v2.15.2
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
github.com/stretchr/testify v1.8.4
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/json-iterator/go v1.1.11 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
68 changes: 68 additions & 0 deletions protocol/kafka_confluent/v2/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA=
github.com/Microsoft/hcsshim v0.9.4 h1:mnUj0ivWy6UzbB1uLFqKR6F+ZyiDc7j4iGgHTpO+5+I=
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 h1:icCHutJouWlQREayFwCc7lxDAhws08td+W3/gdqgZts=
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0/go.mod h1:/VTy8iEpe6mD9pkCH5BhijlUl8ulUXymKv1Qig5Rgb8=
github.com/containerd/cgroups v1.0.4 h1:jN/mbWBEaz+T1pi5OFtnkQ+8qnmEbAr1Oo1FRm5B0dA=
github.com/containerd/containerd v1.6.8 h1:h4dOFDwzHmqFEP754PgfgTeVXFnLiRc6kiqC7tplDJs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/docker/distribution v2.8.1+incompatible h1:Q50tZOPR6T/hjNsyc9g8/syEs6bk8XXApsHjKukMl68=
github.com/docker/docker v20.10.17+incompatible h1:JYCuMrWaVNophQTOrMMoSwudOVEfcegoZZrleKc1xwE=
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo=
github.com/moby/sys/mount v0.3.3 h1:fX1SVkXFJ47XWDoeFW4Sq7PdQJnV2QIDZAqjNqgEjUs=
github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78=
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 h1:dcztxKSvZ4Id8iPpHERQBbIJfabdt4wUm5qy3wOL2Zc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 h1:rc3tiVYb5z54aKaDfakKn0dDjIyPpTtszkjuMzyt7ec=
github.com/opencontainers/runc v1.1.3 h1:vIXrkId+0/J2Ymu2m7VjGvbSlAId9XNRPhn2p4b+d8w=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/testcontainers/testcontainers-go v0.14.0 h1:h0D5GaYG9mhOWr2qHdEKDXpkce/VlvaYOCzTRi6UBi8=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs=
google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 h1:0BOZf6qNozI3pkN3fJLwNubheHJYHhMh91GRFOWWK08=
google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
156 changes: 156 additions & 0 deletions protocol/kafka_confluent/v2/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
Copyright 2023 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

package kafka_confluent

import (
"bytes"
"context"
"strconv"
"strings"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/format"
"github.com/cloudevents/sdk-go/v2/binding/spec"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

const (
prefix = "ce-"
contentTypeKey = "content-type"
)

const (
KafkaOffsetKey = "kafkaoffset"
KafkaPartitionKey = "kafkapartition"
KafkaTopicKey = "kafkatopic"
KafkaMessageKey = "kafkamessagekey"
)

var specs = spec.WithPrefix(prefix)

// Message represents a Kafka message.
// This message *can* be read several times safely
type Message struct {
internal *kafka.Message
properties map[string][]byte
format format.Format
version spec.Version
}

// Check if Message implements binding.Message
var (
_ binding.Message = (*Message)(nil)
_ binding.MessageMetadataReader = (*Message)(nil)
)

// NewMessage returns a binding.Message that holds the provided kafka.Message.
// The returned binding.Message *can* be read several times safely
// This function *doesn't* guarantee that the returned binding.Message is always a kafka_sarama.Message instance
func NewMessage(msg *kafka.Message) *Message {
if msg == nil {
panic("the kafka.Message shouldn't be nil")
}
if msg.TopicPartition.Topic == nil {
panic("the topic of kafka.Message shouldn't be nil")
}
if msg.TopicPartition.Partition < 0 || msg.TopicPartition.Offset < 0 {
panic("the partition or offset of the kafka.Message must be non-negative")
}

var contentType, contentVersion string
properties := make(map[string][]byte, len(msg.Headers)+3)
for _, header := range msg.Headers {
k := strings.ToLower(string(header.Key))
if k == strings.ToLower(contentTypeKey) {
contentType = string(header.Value)
}
if k == specs.PrefixedSpecVersionName() {
contentVersion = string(header.Value)
}
properties[k] = header.Value
}

// add the kafka message key, topic, partition and partition key to the properties
properties[prefix+KafkaOffsetKey] = []byte(strconv.FormatInt(int64(msg.TopicPartition.Offset), 10))
properties[prefix+KafkaPartitionKey] = []byte(strconv.FormatInt(int64(msg.TopicPartition.Partition), 10))
properties[prefix+KafkaTopicKey] = []byte(*msg.TopicPartition.Topic)
if msg.Key != nil {
properties[prefix+KafkaMessageKey] = msg.Key
}

message := &Message{
internal: msg,
properties: properties,
}
if ft := format.Lookup(contentType); ft != nil {
message.format = ft
} else if v := specs.Version(contentVersion); v != nil {
message.version = v
}

return message
}

func (m *Message) ReadEncoding() binding.Encoding {
if m.version != nil {
return binding.EncodingBinary
}
if m.format != nil {
return binding.EncodingStructured
}
return binding.EncodingUnknown
}

func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error {
if m.format != nil {
return encoder.SetStructuredEvent(ctx, m.format, bytes.NewReader(m.internal.Value))
}
return binding.ErrNotStructured
}

func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error {
if m.version == nil {
return binding.ErrNotBinary
}

var err error
for k, v := range m.properties {
if strings.HasPrefix(k, prefix) {
attr := m.version.Attribute(k)
if attr != nil {
err = encoder.SetAttribute(attr, string(v))
} else {
err = encoder.SetExtension(strings.TrimPrefix(k, prefix), string(v))
}
} else if k == strings.ToLower(contentTypeKey) {
err = encoder.SetAttribute(m.version.AttributeFromKind(spec.DataContentType), string(v))
}
if err != nil {
return err
}
}

if m.internal.Value != nil {
err = encoder.SetData(bytes.NewBuffer(m.internal.Value))
}
return err
}

func (m *Message) Finish(error) error {
return nil
}

func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{}) {
attr := m.version.AttributeFromKind(k)
if attr == nil {
return nil, nil
}
return attr, m.properties[attr.PrefixedName()]
}

func (m *Message) GetExtension(name string) interface{} {
return m.properties[prefix+name]
}

0 comments on commit e6a74ef

Please sign in to comment.