Skip to content

Commit

Permalink
Implement Avro Producer + tests
Browse files Browse the repository at this point in the history
- includes configurable caching

Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>
  • Loading branch information
passuied committed Jan 1, 2024
1 parent cc383b5 commit 7b19c5b
Show file tree
Hide file tree
Showing 7 changed files with 639 additions and 45 deletions.
113 changes: 101 additions & 12 deletions common/component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kafka
import (
"context"
"encoding/binary"
"errors"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -45,8 +46,12 @@ type Kafka struct {
subscribeTopics TopicHandlerConfig
subscribeLock sync.Mutex

backOffConfig retry.Config
srClient srclient.ISchemaRegistryClient
backOffConfig retry.Config
srClient srclient.ISchemaRegistryClient
cachingEnabled bool
latestSchemaCache map[string]SchemaCacheEntry
latestSchemaCacheTTL time.Duration
latestSchemaCacheLock sync.RWMutex

// The default value should be true for kafka pubsub component and false for kafka binding component
// This default value can be overridden by metadata consumeRetryEnabled
Expand All @@ -59,11 +64,16 @@ type SchemaType int

const (
None SchemaType = iota
AVRO
Avro
)

func GetValueSchemaType(req pubsub.SubscribeRequest) (SchemaType, error) {
schemaTypeStr, ok := req.Metadata[valueSchemaType]
type SchemaCacheEntry struct {
schema *srclient.Schema
expirationTime time.Time
}

func GetValueSchemaType(metadata map[string]string) (SchemaType, error) {
schemaTypeStr, ok := metadata[valueSchemaType]
if ok {
v, err := parseSchemaType(schemaTypeStr)
return v, err
Expand All @@ -72,10 +82,10 @@ func GetValueSchemaType(req pubsub.SubscribeRequest) (SchemaType, error) {
}

func parseSchemaType(sVal string) (SchemaType, error) {
switch sVal {
case "AVRO":
return AVRO, nil
case "None":
switch strings.ToLower(sVal) {
case "avro":
return Avro, nil
case "none":
return None, nil
default:
return None, fmt.Errorf("error parsing schema type. '%s' is not a supported value", sVal)
Expand Down Expand Up @@ -176,6 +186,11 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
if meta.SchemaRegistryAPIKey != "" {
k.srClient.SetCredentials(meta.SchemaRegistryAPIKey, meta.SchemaRegistryAPISecret)
}
k.srClient.CachingEnabled(meta.SchemaCachingEnabled)
if meta.SchemaCachingEnabled {
k.latestSchemaCache = make(map[string]SchemaCacheEntry)
k.latestSchemaCacheTTL = meta.LatestSchemaCacheTTL
}
}
k.logger.Debug("Kafka message bus initialization complete")

Expand All @@ -193,14 +208,23 @@ func (k *Kafka) Close() (err error) {
return err
}

func getSchemaSubject(topic string) string {
// For now assumes that subject is named after topic (e.g. `my-topic-value`)
return fmt.Sprintf("%s-value", topic)
}

func (k *Kafka) DeserializeValue(message *sarama.ConsumerMessage, config SubscriptionHandlerConfig) ([]byte, error) {
if config.ValueSchemaType == AVRO {
if config.ValueSchemaType == Avro {
srClient, err := k.getSchemaRegistyClient()
if err != nil {
return nil, err
}

if len(message.Value) < 5 {
return nil, fmt.Errorf("value is too short")
}
schemaID := binary.BigEndian.Uint32(message.Value[1:5])
// For now assumes that subject is named after topic (e.g. `my-topic-value`)
schema, err := k.srClient.GetSchemaByVersion(fmt.Sprintf("%s-value", message.Topic), int(schemaID))
schema, err := srClient.GetSchema(int(schemaID))
if err != nil {
return nil, err
}
Expand All @@ -218,6 +242,71 @@ func (k *Kafka) DeserializeValue(message *sarama.ConsumerMessage, config Subscri
return message.Value, nil
}

func (k *Kafka) getLatestSchema(topic string) (*srclient.Schema, error) {
srClient, err := k.getSchemaRegistyClient()
if err != nil {
return nil, err
}

subject := getSchemaSubject(topic)
if k.cachingEnabled {
cacheEntry, ok := k.latestSchemaCache[subject]
// Cache present and not expired
if ok && cacheEntry.expirationTime.Compare(time.Now()) < 0 {
return cacheEntry.schema, nil
}
schema, errSchema := srClient.GetLatestSchema(subject)
if errSchema != nil {
return nil, errSchema
}
k.latestSchemaCacheLock.Lock()
k.latestSchemaCache[subject] = SchemaCacheEntry{schema: schema, expirationTime: time.Now().Add(k.latestSchemaCacheTTL)}
k.latestSchemaCacheLock.Unlock()
return schema, nil
}
schema, err := srClient.GetLatestSchema(getSchemaSubject(topic))
if err != nil {
return nil, err
}

return schema, nil
}

func (k *Kafka) getSchemaRegistyClient() (srclient.ISchemaRegistryClient, error) {
if k.srClient == nil {
return nil, errors.New("schema registry details not set")
}

return k.srClient, nil
}

func (k *Kafka) SerializeValue(topic string, data []byte, metadata map[string]string) ([]byte, error) {
valueSchemaType, err := GetValueSchemaType(metadata)
if err != nil {
return nil, err
}

if valueSchemaType == Avro {
schema, err := k.getLatestSchema(topic)
if err != nil {
return nil, err
}
native, _, _ := schema.Codec().NativeFromTextual(data)
valueBytes, _ := schema.Codec().BinaryFromNative(nil, native)

schemaIDBytes := make([]byte, 4)
binary.BigEndian.PutUint32(schemaIDBytes, uint32(schema.ID()))

var recordValue []byte
recordValue = append(recordValue, byte(0))
recordValue = append(recordValue, schemaIDBytes...)
recordValue = append(recordValue, valueBytes...)
return recordValue, nil
}

return data, nil
}

// EventHandler is the handler used to handle the subscribed event.
type EventHandler func(ctx context.Context, msg *NewEvent) error

Expand Down

0 comments on commit 7b19c5b

Please sign in to comment.