Skip to content

Commit

Permalink
Added avro test to pubsub/kafka test certification
Browse files Browse the repository at this point in the history
Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>
  • Loading branch information
passuied committed Jan 8, 2024
1 parent 8ce3477 commit 42e166d
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 12 deletions.
4 changes: 2 additions & 2 deletions bindings/kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -294,5 +294,5 @@ metadata:
type: duration
description: |
The TTL for schema caching when publishing a message with latest schema available.
example: '"15min"'
default: '"15min"'
example: '"5min"'
default: '"5min"'
4 changes: 2 additions & 2 deletions tests/certification/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/apache/thrift v0.13.0
github.com/aws/aws-sdk-go v1.45.19
github.com/camunda/zeebe/clients/go/v8 v8.2.12
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cenkalti/backoff/v4 v4.2.1
github.com/cloudwego/kitex v0.5.0
github.com/cloudwego/kitex-examples v0.1.1
Expand All @@ -32,6 +33,7 @@ require (
github.com/lestrrat-go/jwx/v2 v2.0.16
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5
github.com/rabbitmq/amqp091-go v1.8.1
github.com/riferrei/srclient v0.6.0
github.com/stretchr/testify v1.8.4
github.com/tylertreat/comcast v1.0.1
go.mongodb.org/mongo-driver v1.12.1
Expand Down Expand Up @@ -98,7 +100,6 @@ require (
github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 // indirect
github.com/bufbuild/protocompile v0.6.0 // indirect
github.com/bytedance/gopkg v0.0.0-20220817015305-b879a72dc90f // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chebyrash/promise v0.0.0-20230709133807-42ec49ba1459 // indirect
github.com/chenzhuoyu/iasm v0.9.0 // indirect
Expand Down Expand Up @@ -253,7 +254,6 @@ require (
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/redis/go-redis/v9 v9.2.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/riferrei/srclient v0.6.0 // indirect
github.com/santhosh-tekuri/jsonschema/v5 v5.0.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shirou/gopsutil/v3 v3.22.2 // indirect
Expand Down
10 changes: 10 additions & 0 deletions tests/certification/pubsub/kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ This project aims to test the Kafka Pub/Sub component under various conditions.
* Start third consumer with a matching consumer group
* Test: Publishes a specific amount of messages
* Component: Between the each of the consumers in the group, all messages should be consumed, but not necessarily in order.
* Start avro consumer with schema registry configured
* Test: Publishes a specific amount of messages with Avro serialization
* Consumer should receive all messages decoded
* Stop 1 broker node so that 2 of 3 are active
* The 2 applications should handle the server rebalance
* Stop another broker so that 1 of 3 are active (loss of quorum)
Expand Down Expand Up @@ -54,3 +57,10 @@ This project aims to test the Kafka Pub/Sub component under various conditions.
* **TODO** Verify raw events
* **TODO** Publish various raw events
* **TODO** App receives Kafka messages and verifies their binary encoding

## Running locally
* Navigate to the `tests/certification/pubsub/kafka` folder
* Run command:
```bash
go test -v --tags certtests
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagebus
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: localhost:19092,localhost:29092,localhost:39092
- name: consumerGroup
value: kafkaCertificationAvro
- name: authType
value: "none"
- name: initialOffset
value: oldest
- name: backOffDuration
value: 50ms
- name: schemaRegistryURL
value: http://localhost:8081
18 changes: 17 additions & 1 deletion tests/certification/pubsub/kafka/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,20 @@ services:
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_NUM_PARTITIONS: 10
KAFKA_NUM_PARTITIONS: 10

schema-registry:
image: confluentinc/cp-schema-registry:7.5.3
hostname: schema-registry
container_name: schema-registry
environment:
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka1:9092,kafka2:9092,kafka3:9092
ports:
- 8081:8081
depends_on:
- kafka1
- kafka2
- kafka3

122 changes: 115 additions & 7 deletions tests/certification/pubsub/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@ package kafka_test

import (
"context"
"encoding/json"
"fmt"
"strconv"
"testing"
"time"

"github.com/IBM/sarama"
"github.com/cenkalti/backoff/v4"
"github.com/cenkalti/backoff"
"github.com/google/uuid"
"github.com/riferrei/srclient"
"github.com/stretchr/testify/require"
"go.uber.org/multierr"

Expand Down Expand Up @@ -55,20 +57,31 @@ const (
sidecarName1 = "dapr-1"
sidecarName2 = "dapr-2"
sidecarName3 = "dapr-3"
sidecarNameAvro = "dapr-avro"
appID1 = "app-1"
appID2 = "app-2"
appID3 = "app-3"
appIDAvro = "app-avro"
clusterName = "kafkacertification"
dockerComposeYAML = "docker-compose.yml"
numMessages = 1000
appPort = 8000
portOffset = 2
messageKey = "partitionKey"

pubsubName = "messagebus"
topicName = "neworder"
pubsubName = "messagebus"
topicName = "neworder"
avroTopicName = "my-topic"

schemaRegistryURL = "http://localhost:8081"
testSchema = `{"type": "record", "name": "cupcake", "fields": [{"name": "flavor", "type": "string"}, {"name": "created_date", "type": {"type": "long","logicalType": "timestamp-millis"}}]}`
)

type ComplexType struct {
CreatedDate int64 `json:"created_date"`
Flavor string `json:"flavor"`
}

var brokers = []string{"localhost:19092", "localhost:29092", "localhost:39092"}

func TestKafka(t *testing.T) {
Expand All @@ -78,6 +91,8 @@ func TestKafka(t *testing.T) {
// so exact ordering is not expected.
consumerGroup2 := watcher.NewUnordered()

consumerGroupAvro := watcher.NewOrdered()

// Application logic that tracks messages from a topic.
application := func(appName string, watcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
Expand All @@ -88,7 +103,7 @@ func TestKafka(t *testing.T) {
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: "messagebus",
Topic: "neworder",
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
Expand All @@ -104,6 +119,31 @@ func TestKafka(t *testing.T) {
}
}

// Application logic that tracks messages from a topic.
applicationAvro := func(appName string, watcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {

// Setup the /orders event handler.
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: "messagebus",
Topic: avroTopicName,
Route: "/my-topic-handler",
Metadata: map[string]string{"valueSchemaType": "Avro", "rawPayload": "true"},
}, func(c context.Context, e *common.TopicEvent) (retry bool, err error) {
ctx.Logf("======== %s received event: %s", appName, e.Data)
// Track/Observe the data of the event.
// Data comes back as a []uint8 so needs to unmarshal it first so it matches
dataStr := fmt.Sprintf("%s", e.Data)
var dataObj ComplexType
json.Unmarshal([]byte(dataStr), &dataObj)
watcher.Observe(dataObj)
return false, nil
}),
)
}
}

// Set the partition key on all messages so they
// are written to the same partition.
// This allows for checking of ordered messages.
Expand Down Expand Up @@ -152,6 +192,48 @@ func TestKafka(t *testing.T) {
}
}

// Test logic that sends messages to a topic and
// verifies the application has received them.
sendRecvAvroTest := func(watchers ...*watcher.Watcher) flow.Runnable {
md := map[string]string{messageKey: "testAvro"}

return func(ctx flow.Context) error {
client := sidecar.GetClient(ctx, sidecarNameAvro)

// Declare what is expected BEFORE performing any steps
// that will satisfy the test.
msgs := make([]interface{}, numMessages)

for i := range msgs {
msg := ComplexType{CreatedDate: time.Now().UnixMilli(), Flavor: fmt.Sprintf("chocolate %03d", i)}
msgs[i] = msg
}
for _, m := range watchers {
m.Expect(msgs...)
}

// Avoiding messages to be wrapped as CloudEvent
md["rawPayload"] = "true"
md["valueSchemaType"] = "Avro"

// Send events that the application above will observe.
ctx.Log("Sending messages!")
for _, msg := range msgs {
err := client.PublishEvent(
ctx, pubsubName, avroTopicName, msg,
dapr.PublishEventWithMetadata(md))
require.NoError(ctx, err, "error publishing message")
}

// Do the messages we observed match what we expect?
for _, m := range watchers {
m.Assert(ctx, time.Minute)
}

return nil
}
}

// sendMessagesInBackground and assertMessages are
// Runnables for testing publishing and consuming
// messages reliably when infrastructure and network
Expand Down Expand Up @@ -239,6 +321,13 @@ func TestKafka(t *testing.T) {

return err
})).
Step("wait", flow.Sleep(5*time.Second)).
Step("Publish Avro schema", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
srClient := srclient.CreateSchemaRegistryClient(schemaRegistryURL)
subjectName := "my-topic-value"
_, err := srClient.CreateSchema(subjectName, testSchema, srclient.Avro)
return err
})).
//
// Run the application logic above.
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
Expand All @@ -247,7 +336,7 @@ func TestKafka(t *testing.T) {
// Run the Dapr sidecar with the Kafka component.
Step(sidecar.Run(sidecarName1,
append(componentRuntimeOptions(),
embedded.WithComponentsPath("./components/consumer1"),
embedded.WithResourcesPath("./components/consumer1"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)),
Expand All @@ -261,7 +350,7 @@ func TestKafka(t *testing.T) {
// Run the Dapr sidecar with the Kafka component.
Step(sidecar.Run(sidecarName2,
append(componentRuntimeOptions(),
embedded.WithComponentsPath("./components/consumer2"),
embedded.WithResourcesPath("./components/consumer2"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset)),
Expand All @@ -272,6 +361,25 @@ func TestKafka(t *testing.T) {
// Send messages using the same metadata/message key so we can expect
// in-order processing.
Step("send and wait(in-order)", sendRecvTest(metadata, consumerGroup1, consumerGroup2)).
// Run the avro consumer
Step(app.Run(appIDAvro, fmt.Sprintf(":%d", appPort+portOffset*3),
applicationAvro(appIDAvro, consumerGroupAvro))).
//
// Run the Dapr sidecar with the Kafka component.
Step(sidecar.Run(sidecarNameAvro,
append(componentRuntimeOptions(),
embedded.WithResourcesPath("./components/consumerAvro"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset*3)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset*3)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset*3)),
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset*3)),
)...,
)).
Step("reset", flow.Reset(consumerGroupAvro)).
//
// Send messages with random keys to test message consumption
// across more than one consumer group and consumers per group.
Step("send avro messages and wait(in-order)", sendRecvAvroTest(consumerGroupAvro)).
//
// Run the third application.
Step(app.Run(appID3, fmt.Sprintf(":%d", appPort+portOffset*2),
Expand All @@ -280,7 +388,7 @@ func TestKafka(t *testing.T) {
// Run the Dapr sidecar with the Kafka component.
Step(sidecar.Run(sidecarName3,
append(componentRuntimeOptions(),
embedded.WithComponentsPath("./components/consumer2"),
embedded.WithResourcesPath("./components/consumer2"),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset*2)),
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset*2)),
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset*2)),
Expand Down

0 comments on commit 42e166d

Please sign in to comment.