Skip to content

Commit

Permalink
Avro Schema registry implementation
Browse files Browse the repository at this point in the history
- consumer
- producer
- includes configurable caching
- Overrides srclient.codec to use newer standard json codec since this is the format used by dapr.
- unit tests

Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>
  • Loading branch information
passuied committed Jan 4, 2024
1 parent 94c5618 commit 45d86a7
Show file tree
Hide file tree
Showing 14 changed files with 915 additions and 16 deletions.
3 changes: 1 addition & 2 deletions bindings/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import (
"sync"
"sync/atomic"

"github.com/dapr/kit/logger"

"github.com/dapr/components-contrib/bindings"
"github.com/dapr/components-contrib/common/component/kafka"
contribMetadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/kit/logger"
)

const (
Expand Down
29 changes: 28 additions & 1 deletion bindings/kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -268,4 +268,31 @@ metadata:
This is potentially insecure and not recommended for use in production.
example: "true"
default: "false"
type: bool
type: bool
- name: schemaRegistryURL
type: string
description: |
The Schema Registry URL.
example: '"http://localhost:8081"'
- name: schemaRegistryAPIKey
type: string
description: |
The Schema Registry credentials API Key.
example: '"XYAXXAZ"'
- name: schemaRegistryAPISecret
type: string
description: |
The Schema Registry credentials API Secret.
example: '"ABCDEFGMEADFF"'
- name: schemaCachingEnabled
type: bool
description: |
Enables caching for schemas.
example: '"true"'
default: '"true"'
- name: LatestSchemaCacheTTL
type: duration
description: |
The TTL for schema caching when publishing a message with latest schema available.
example: '"15min"'
default: '"15min"'
17 changes: 15 additions & 2 deletions common/component/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,17 @@ func (consumer *consumer) doBulkCallback(session sarama.ConsumerGroupSession,
for i, message := range messages {
if message != nil {
metadata := GetEventMetadata(message)
handlerConfig, err := consumer.k.GetTopicHandlerConfig(message.Topic)
if err != nil {
return err
}
messageVal, err := consumer.k.DeserializeValue(message, handlerConfig)
if err != nil {
return err
}
childMessage := KafkaBulkMessageEntry{
EntryId: strconv.Itoa(i),
Event: message.Value,
Event: messageVal,
Metadata: metadata,
}
messageValues[i] = childMessage
Expand Down Expand Up @@ -184,9 +192,14 @@ func (consumer *consumer) doCallback(session sarama.ConsumerGroupSession, messag
if !handlerConfig.IsBulkSubscribe && handlerConfig.Handler == nil {
return errors.New("invalid handler config for subscribe call")
}

messageVal, err := consumer.k.DeserializeValue(message, handlerConfig)
if err != nil {
return err
}
event := NewEvent{
Topic: message.Topic,
Data: message.Value,
Data: messageVal,
}
event.Metadata = GetEventMetadata(message)

Expand Down
180 changes: 179 additions & 1 deletion common/component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@ package kafka

import (
"context"
"encoding/binary"
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/IBM/sarama"
"github.com/linkedin/goavro/v2"
"github.com/riferrei/srclient"

"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
Expand All @@ -42,7 +47,12 @@ type Kafka struct {
subscribeTopics TopicHandlerConfig
subscribeLock sync.Mutex

backOffConfig retry.Config
backOffConfig retry.Config
srClient srclient.ISchemaRegistryClient
schemaCachingEnabled bool
latestSchemaCache map[string]SchemaCacheEntry
latestSchemaCacheTTL time.Duration
latestSchemaCacheLock sync.RWMutex

// The default value should be true for kafka pubsub component and false for kafka binding component
// This default value can be overridden by metadata consumeRetryEnabled
Expand All @@ -51,6 +61,39 @@ type Kafka struct {
consumeRetryInterval time.Duration
}

type SchemaType int

const (
None SchemaType = iota
Avro
)

type SchemaCacheEntry struct {
schema *srclient.Schema
codec *goavro.Codec
expirationTime time.Time
}

func GetValueSchemaType(metadata map[string]string) (SchemaType, error) {
schemaTypeStr, ok := metadata[valueSchemaType]
if ok {
v, err := parseSchemaType(schemaTypeStr)
return v, err
}
return None, nil
}

func parseSchemaType(sVal string) (SchemaType, error) {
switch strings.ToLower(sVal) {
case "avro":
return Avro, nil
case "none":
return None, nil
default:
return None, fmt.Errorf("error parsing schema type. '%s' is not a supported value", sVal)
}
}

func NewKafka(logger logger.Logger) *Kafka {
return &Kafka{
logger: logger,
Expand Down Expand Up @@ -139,6 +182,18 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
k.consumeRetryEnabled = meta.ConsumeRetryEnabled
k.consumeRetryInterval = meta.ConsumeRetryInterval

if meta.SchemaRegistryURL != "" {
k.srClient = srclient.CreateSchemaRegistryClient(meta.SchemaRegistryURL)
// Empty password is a possibility
if meta.SchemaRegistryAPIKey != "" {
k.srClient.SetCredentials(meta.SchemaRegistryAPIKey, meta.SchemaRegistryAPISecret)
}
k.srClient.CachingEnabled(meta.SchemaCachingEnabled)
if meta.SchemaCachingEnabled {
k.latestSchemaCache = make(map[string]SchemaCacheEntry)
k.latestSchemaCacheTTL = meta.LatestSchemaCacheTTL
}
}
k.logger.Debug("Kafka message bus initialization complete")

return nil
Expand All @@ -155,6 +210,128 @@ func (k *Kafka) Close() (err error) {
return err
}

func getSchemaSubject(topic string) string {
// For now assumes that subject is named after topic (e.g. `my-topic-value`)
return fmt.Sprintf("%s-value", topic)
}

func (k *Kafka) DeserializeValue(message *sarama.ConsumerMessage, config SubscriptionHandlerConfig) ([]byte, error) {
if config.ValueSchemaType == Avro {
srClient, err := k.getSchemaRegistyClient()
if err != nil {
return nil, err
}

if len(message.Value) < 5 {
return nil, fmt.Errorf("value is too short")
}
schemaID := binary.BigEndian.Uint32(message.Value[1:5])
schema, err := srClient.GetSchema(int(schemaID))
if err != nil {
return nil, err
}
// The data coming through is standard JSON. The version currently supported by srclient doesn't support this yet
// Use this specific codec instead.
codec, err := goavro.NewCodecForStandardJSONFull(schema.Schema())
if err != nil {
return nil, err
}
native, _, err := codec.NativeFromBinary(message.Value[5:])
if err != nil {
return nil, err
}
value, err := codec.TextualFromNative(nil, native)
if err != nil {
return nil, err
}

return value, nil
}
return message.Value, nil
}

func (k *Kafka) getLatestSchema(topic string) (*srclient.Schema, *goavro.Codec, error) {
srClient, err := k.getSchemaRegistyClient()
if err != nil {
return nil, nil, err
}

subject := getSchemaSubject(topic)
if k.schemaCachingEnabled {
cacheEntry, ok := k.latestSchemaCache[subject]
// Cache present and not expired
if ok && cacheEntry.expirationTime.Compare(time.Now()) < 0 {
return cacheEntry.schema, cacheEntry.codec, nil
}
schema, errSchema := srClient.GetLatestSchema(subject)
if errSchema != nil {
return nil, nil, errSchema
}
// New JSON standard serialization/Deserialization is not integrated in srclient yet.
// Since standard json is passed from dapr, it is needed.
codec, errCodec := goavro.NewCodecForStandardJSONFull(schema.Schema())
if errCodec != nil {
return nil, nil, err
}
k.latestSchemaCacheLock.Lock()
k.latestSchemaCache[subject] = SchemaCacheEntry{schema: schema, codec: codec, expirationTime: time.Now().Add(k.latestSchemaCacheTTL)}
k.latestSchemaCacheLock.Unlock()
return schema, codec, nil
}
schema, err := srClient.GetLatestSchema(getSchemaSubject(topic))
if err != nil {
return nil, nil, err
}
codec, err := goavro.NewCodecForStandardJSONFull(schema.Schema())
if err != nil {
return nil, nil, err
}

return schema, codec, nil
}

func (k *Kafka) getSchemaRegistyClient() (srclient.ISchemaRegistryClient, error) {
if k.srClient == nil {
return nil, errors.New("schema registry details not set")
}

return k.srClient, nil
}

func (k *Kafka) SerializeValue(topic string, data []byte, metadata map[string]string) ([]byte, error) {
valueSchemaType, err := GetValueSchemaType(metadata)
if err != nil {
return nil, err
}

if valueSchemaType == Avro {
schema, codec, err := k.getLatestSchema(topic)
if err != nil {
return nil, err
}

native, _, err := codec.NativeFromTextual(data)
if err != nil {
return nil, err
}

valueBytes, err := codec.BinaryFromNative(nil, native)
if err != nil {
return nil, err
}
schemaIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.ID()))

var recordValue []byte
recordValue = append(recordValue, byte(0))
recordValue = append(recordValue, schemaIDBytes...)
recordValue = append(recordValue, valueBytes...)
return recordValue, nil
}

return data, nil
}

// EventHandler is the handler used to handle the subscribed event.
type EventHandler func(ctx context.Context, msg *NewEvent) error

Expand All @@ -167,6 +344,7 @@ type SubscriptionHandlerConfig struct {
SubscribeConfig pubsub.BulkSubscribeConfig
BulkHandler BulkEventHandler
Handler EventHandler
ValueSchemaType SchemaType
}

// NewEvent is an event arriving from a message bus instance.
Expand Down

0 comments on commit 45d86a7

Please sign in to comment.