Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avro Schema registry kafka pubsub implementation #3292

Merged
merged 6 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions bindings/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import (
"sync"
"sync/atomic"

"github.com/dapr/kit/logger"

"github.com/dapr/components-contrib/bindings"
"github.com/dapr/components-contrib/common/component/kafka"
contribMetadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/kit/logger"
)

const (
Expand Down
29 changes: 28 additions & 1 deletion bindings/kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -268,4 +268,31 @@ metadata:
This is potentially insecure and not recommended for use in production.
example: "true"
default: "false"
type: bool
type: bool
- name: schemaRegistryURL
type: string
description: |
The Schema Registry URL.
example: '"http://localhost:8081"'
- name: schemaRegistryAPIKey
type: string
description: |
The Schema Registry credentials API Key.
example: '"XYAXXAZ"'
- name: schemaRegistryAPISecret
type: string
description: |
The Schema Registry credentials API Secret.
example: '"ABCDEFGMEADFF"'
- name: schemaCachingEnabled
type: bool
description: |
Enables caching for schemas.
example: '"true"'
default: '"true"'
- name: LatestSchemaCacheTTL
type: duration
description: |
The TTL for schema caching when publishing a message with latest schema available.
example: '"5min"'
default: '"5min"'
passuied marked this conversation as resolved.
Show resolved Hide resolved
17 changes: 15 additions & 2 deletions common/component/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,17 @@ func (consumer *consumer) doBulkCallback(session sarama.ConsumerGroupSession,
for i, message := range messages {
if message != nil {
metadata := GetEventMetadata(message)
handlerConfig, err := consumer.k.GetTopicHandlerConfig(message.Topic)
if err != nil {
return err
}
messageVal, err := consumer.k.DeserializeValue(message, handlerConfig)
if err != nil {
return err
}
childMessage := KafkaBulkMessageEntry{
EntryId: strconv.Itoa(i),
Event: message.Value,
Event: messageVal,
Metadata: metadata,
}
messageValues[i] = childMessage
Expand Down Expand Up @@ -184,9 +192,14 @@ func (consumer *consumer) doCallback(session sarama.ConsumerGroupSession, messag
if !handlerConfig.IsBulkSubscribe && handlerConfig.Handler == nil {
return errors.New("invalid handler config for subscribe call")
}

messageVal, err := consumer.k.DeserializeValue(message, handlerConfig)
if err != nil {
return err
}
event := NewEvent{
Topic: message.Topic,
Data: message.Value,
Data: messageVal,
}
event.Metadata = GetEventMetadata(message)

Expand Down
180 changes: 180 additions & 0 deletions common/component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@ package kafka

import (
"context"
"encoding/binary"
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/IBM/sarama"
"github.com/linkedin/goavro/v2"
"github.com/riferrei/srclient"

"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
Expand All @@ -42,6 +47,13 @@ type Kafka struct {
subscribeTopics TopicHandlerConfig
subscribeLock sync.Mutex

// schema registry settings
srClient srclient.ISchemaRegistryClient
schemaCachingEnabled bool
latestSchemaCache map[string]SchemaCacheEntry
latestSchemaCacheTTL time.Duration
latestSchemaCacheLock sync.RWMutex

// used for background logic that cannot use the context passed to the Init function
internalContext context.Context
internalContextCancel func()
Expand All @@ -55,6 +67,39 @@ type Kafka struct {
consumeRetryInterval time.Duration
}

type SchemaType int

const (
None SchemaType = iota
Avro
)

type SchemaCacheEntry struct {
schema *srclient.Schema
codec *goavro.Codec
expirationTime time.Time
}

func GetValueSchemaType(metadata map[string]string) (SchemaType, error) {
schemaTypeStr, ok := metadata[valueSchemaType]
passuied marked this conversation as resolved.
Show resolved Hide resolved
if ok {
v, err := parseSchemaType(schemaTypeStr)
return v, err
}
return None, nil
}

func parseSchemaType(sVal string) (SchemaType, error) {
switch strings.ToLower(sVal) {
case "avro":
return Avro, nil
case "none":
return None, nil
default:
return None, fmt.Errorf("error parsing schema type. '%s' is not a supported value", sVal)
}
}

func NewKafka(logger logger.Logger) *Kafka {
return &Kafka{
logger: logger,
Expand Down Expand Up @@ -146,6 +191,18 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
k.consumeRetryEnabled = meta.ConsumeRetryEnabled
k.consumeRetryInterval = meta.ConsumeRetryInterval

if meta.SchemaRegistryURL != "" {
k.srClient = srclient.CreateSchemaRegistryClient(meta.SchemaRegistryURL)
// Empty password is a possibility
if meta.SchemaRegistryAPIKey != "" {
k.srClient.SetCredentials(meta.SchemaRegistryAPIKey, meta.SchemaRegistryAPISecret)
}
k.srClient.CachingEnabled(meta.SchemaCachingEnabled)
if meta.SchemaCachingEnabled {
k.latestSchemaCache = make(map[string]SchemaCacheEntry)
k.latestSchemaCacheTTL = meta.LatestSchemaCacheTTL
}
}
k.logger.Debug("Kafka message bus initialization complete")

return nil
Expand All @@ -166,6 +223,128 @@ func (k *Kafka) Close() (err error) {
return err
}

func getSchemaSubject(topic string) string {
// For now assumes that subject is named after topic (e.g. `my-topic-value`)
return fmt.Sprintf("%s-value", topic)
passuied marked this conversation as resolved.
Show resolved Hide resolved
}

func (k *Kafka) DeserializeValue(message *sarama.ConsumerMessage, config SubscriptionHandlerConfig) ([]byte, error) {
if config.ValueSchemaType == Avro {
passuied marked this conversation as resolved.
Show resolved Hide resolved
srClient, err := k.getSchemaRegistyClient()
if err != nil {
return nil, err
}

if len(message.Value) < 5 {
return nil, fmt.Errorf("value is too short")
}
schemaID := binary.BigEndian.Uint32(message.Value[1:5])
schema, err := srClient.GetSchema(int(schemaID))
if err != nil {
return nil, err
}
// The data coming through is standard JSON. The version currently supported by srclient doesn't support this yet
// Use this specific codec instead.
codec, err := goavro.NewCodecForStandardJSONFull(schema.Schema())
if err != nil {
return nil, err
}
native, _, err := codec.NativeFromBinary(message.Value[5:])
if err != nil {
return nil, err
}
value, err := codec.TextualFromNative(nil, native)
if err != nil {
return nil, err
}

return value, nil
}
return message.Value, nil
}

func (k *Kafka) getLatestSchema(topic string) (*srclient.Schema, *goavro.Codec, error) {
srClient, err := k.getSchemaRegistyClient()
if err != nil {
return nil, nil, err
}

subject := getSchemaSubject(topic)
if k.schemaCachingEnabled {
cacheEntry, ok := k.latestSchemaCache[subject]
passuied marked this conversation as resolved.
Show resolved Hide resolved
// Cache present and not expired
if ok && cacheEntry.expirationTime.Compare(time.Now()) < 0 {
passuied marked this conversation as resolved.
Show resolved Hide resolved
return cacheEntry.schema, cacheEntry.codec, nil
}
schema, errSchema := srClient.GetLatestSchema(subject)
if errSchema != nil {
return nil, nil, errSchema
}
// New JSON standard serialization/Deserialization is not integrated in srclient yet.
// Since standard json is passed from dapr, it is needed.
codec, errCodec := goavro.NewCodecForStandardJSONFull(schema.Schema())
if errCodec != nil {
return nil, nil, err
passuied marked this conversation as resolved.
Show resolved Hide resolved
}
k.latestSchemaCacheLock.Lock()
k.latestSchemaCache[subject] = SchemaCacheEntry{schema: schema, codec: codec, expirationTime: time.Now().Add(k.latestSchemaCacheTTL)}
k.latestSchemaCacheLock.Unlock()
return schema, codec, nil
}
schema, err := srClient.GetLatestSchema(getSchemaSubject(topic))
if err != nil {
return nil, nil, err
}
codec, err := goavro.NewCodecForStandardJSONFull(schema.Schema())
if err != nil {
return nil, nil, err
}

return schema, codec, nil
}

func (k *Kafka) getSchemaRegistyClient() (srclient.ISchemaRegistryClient, error) {
if k.srClient == nil {
return nil, errors.New("schema registry details not set")
}

return k.srClient, nil
}

func (k *Kafka) SerializeValue(topic string, data []byte, metadata map[string]string) ([]byte, error) {
valueSchemaType, err := GetValueSchemaType(metadata)
if err != nil {
return nil, err
}

if valueSchemaType == Avro {
passuied marked this conversation as resolved.
Show resolved Hide resolved
schema, codec, err := k.getLatestSchema(topic)
if err != nil {
return nil, err
}

native, _, err := codec.NativeFromTextual(data)
if err != nil {
return nil, err
}

valueBytes, err := codec.BinaryFromNative(nil, native)
if err != nil {
return nil, err
}
schemaIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.ID()))

var recordValue []byte
passuied marked this conversation as resolved.
Show resolved Hide resolved
recordValue = append(recordValue, byte(0))
recordValue = append(recordValue, schemaIDBytes...)
recordValue = append(recordValue, valueBytes...)
return recordValue, nil
}

return data, nil
}

// EventHandler is the handler used to handle the subscribed event.
type EventHandler func(ctx context.Context, msg *NewEvent) error

Expand All @@ -178,6 +357,7 @@ type SubscriptionHandlerConfig struct {
SubscribeConfig pubsub.BulkSubscribeConfig
BulkHandler BulkEventHandler
Handler EventHandler
ValueSchemaType SchemaType
}

// NewEvent is an event arriving from a message bus instance.
Expand Down