Skip to content

Commit

Permalink
FIX: Redis to handle event metadata. (#3320)
Browse files Browse the repository at this point in the history
Signed-off-by: Artur Souza <asouza.pro@gmail.com>
Signed-off-by: Bernd Verst <github@bernd.dev>
Co-authored-by: Bernd Verst <github@bernd.dev>
  • Loading branch information
artursouza and berndverst committed Jan 25, 2024
1 parent 120a649 commit 0c687df
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 16 deletions.
31 changes: 26 additions & 5 deletions pubsub/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package redis

import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
Expand Down Expand Up @@ -101,7 +102,17 @@ func (r *redisStreams) Publish(ctx context.Context, req *pubsub.PublishRequest)
return errors.New("component is closed")
}

_, err := r.client.XAdd(ctx, req.Topic, r.clientSettings.MaxLenApprox, map[string]interface{}{"data": req.Data})
redisPayload := map[string]interface{}{"data": req.Data}

if req.Metadata != nil {
serializedMetadata, err := json.Marshal(req.Metadata)
if err != nil {
return err
}
redisPayload["metadata"] = serializedMetadata
}

_, err := r.client.XAdd(ctx, req.Topic, r.clientSettings.MaxLenApprox, redisPayload)
if err != nil {
return fmt.Errorf("redis streams: error from publish: %s", err)
}
Expand Down Expand Up @@ -157,7 +168,7 @@ func (r *redisStreams) Subscribe(ctx context.Context, req pubsub.SubscribeReques
// pick them up for processing.
func (r *redisStreams) enqueueMessages(ctx context.Context, stream string, handler pubsub.Handler, msgs []rediscomponent.RedisXMessage) {
for _, msg := range msgs {
rmsg := createRedisMessageWrapper(ctx, stream, handler, msg)
rmsg := r.createRedisMessageWrapper(ctx, stream, handler, msg)

select {
// Might block if the queue is full so we need the ctx.Done below.
Expand All @@ -172,7 +183,7 @@ func (r *redisStreams) enqueueMessages(ctx context.Context, stream string, handl

// createRedisMessageWrapper encapsulates the Redis message, message identifier, and handler
// in `redisMessage` for processing.
func createRedisMessageWrapper(ctx context.Context, stream string, handler pubsub.Handler, msg rediscomponent.RedisXMessage) redisMessageWrapper {
func (r *redisStreams) createRedisMessageWrapper(ctx context.Context, stream string, handler pubsub.Handler, msg rediscomponent.RedisXMessage) redisMessageWrapper {
var data []byte
if dataValue, exists := msg.Values["data"]; exists && dataValue != nil {
switch v := dataValue.(type) {
Expand All @@ -183,11 +194,21 @@ func createRedisMessageWrapper(ctx context.Context, stream string, handler pubsu
}
}

var metadata map[string]string
if metadataValue, exists := msg.Values["metadata"]; exists && metadataValue != nil {
metadataStr := metadataValue.(string)
err := json.Unmarshal([]byte(metadataStr), &metadata)
if err != nil {
r.logger.Warnf("Redis PubSub: Could not extract metadata for Redis message %s: %v", msg.ID, err)
}
}

return redisMessageWrapper{
ctx: ctx,
message: pubsub.NewMessage{
Topic: stream,
Data: data,
Topic: stream,
Data: data,
Metadata: metadata,
},
messageID: msg.ID,
handler: handler,
Expand Down
63 changes: 52 additions & 11 deletions pubsub/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,44 @@ func TestParseRedisMetadata(t *testing.T) {

func TestProcessStreams(t *testing.T) {
fakeConsumerID := "fakeConsumer"
topicCount := 0
messageCount := 0
expectedData := "testData"
expectedMetadata := "testMetadata"

var wg sync.WaitGroup
wg.Add(3)

fakeHandler := func(ctx context.Context, msg *pubsub.NewMessage) error {
defer wg.Done()

messageCount++

// assert
assert.Equal(t, expectedData, string(msg.Data))
assert.Equal(t, expectedMetadata, msg.Metadata["mymetadata"])

// return fake error to skip executing redis client command
return errors.New("fake error")
}

// act
testRedisStream := &redisStreams{
logger: logger.NewLogger("test"),
clientSettings: &commonredis.Settings{},
}
testRedisStream.queue = make(chan redisMessageWrapper, 10)
go testRedisStream.worker()
testRedisStream.enqueueMessages(context.Background(), fakeConsumerID, fakeHandler, generateRedisStreamTestData(3, expectedData, expectedMetadata))

// Wait for the handler to finish processing
wg.Wait()

// assert
assert.Equal(t, 3, messageCount)
}

func TestProcessStreamsWithoutEventMetadata(t *testing.T) {
fakeConsumerID := "fakeConsumer"
messageCount := 0
expectedData := "testData"

Expand All @@ -87,13 +124,12 @@ func TestProcessStreams(t *testing.T) {
defer wg.Done()

messageCount++
if topicCount == 0 {
topicCount = 1
}

// assert
assert.Equal(t, expectedData, string(msg.Data))

assert.Nil(t, msg.Metadata)

// return fake error to skip executing redis client command
return errors.New("fake error")
}
Expand All @@ -105,23 +141,28 @@ func TestProcessStreams(t *testing.T) {
}
testRedisStream.queue = make(chan redisMessageWrapper, 10)
go testRedisStream.worker()
testRedisStream.enqueueMessages(context.Background(), fakeConsumerID, fakeHandler, generateRedisStreamTestData(2, 3, expectedData))
testRedisStream.enqueueMessages(context.Background(), fakeConsumerID, fakeHandler, generateRedisStreamTestData(3, expectedData, ""))

// Wait for the handler to finish processing
wg.Wait()

// assert
assert.Equal(t, 1, topicCount)
assert.Equal(t, 3, messageCount)
}

func generateRedisStreamTestData(topicCount, messageCount int, data string) []commonredis.RedisXMessage {
func generateRedisStreamTestData(messageCount int, data string, metadata string) []commonredis.RedisXMessage {
generateXMessage := func(id int) commonredis.RedisXMessage {
values := map[string]interface{}{
"data": data,
}

if metadata != "" {
values["metadata"] = "{\"mymetadata\": \"" + metadata + "\"}"
}

return commonredis.RedisXMessage{
ID: strconv.Itoa(id),
Values: map[string]interface{}{
"data": data,
},
ID: strconv.Itoa(id),
Values: values,
}
}

Expand Down

0 comments on commit 0c687df

Please sign in to comment.