From 762784113491162e87fcb4d2e4861be60abd676a Mon Sep 17 00:00:00 2001 From: Clive Jevons Date: Fri, 28 Oct 2022 20:21:31 +0200 Subject: [PATCH 1/4] add example of using Pulsar with testcontainers-go --- docs/examples/pulsar.md | 96 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 docs/examples/pulsar.md diff --git a/docs/examples/pulsar.md b/docs/examples/pulsar.md new file mode 100644 index 0000000000..ccf12cded3 --- /dev/null +++ b/docs/examples/pulsar.md @@ -0,0 +1,96 @@ +# Pulsar + +```go +package main + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "testing" + "time" + + "github.com/apache/pulsar-client-go/pulsar" + . "github.com/onsi/gomega" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" +) + +func TestPulsar(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + g := NewGomegaWithT(t) + + matchAdminResponse := func(r io.Reader) bool { + respBytes, _ := ioutil.ReadAll(r) + resp := string(respBytes) + return resp == `["standalone"]` + } + pulsarRequest := testcontainers.ContainerRequest{ + Image: "docker.io/apachepulsar/pulsar:2.10.2", + ExposedPorts: []string{"6650/tcp", "8080/tcp"}, + WaitingFor: wait.ForHTTP("/admin/v2/clusters").WithPort("8080/tcp").WithResponseMatcher(matchAdminResponse), + Cmd: []string{ + "/bin/bash", + "-c", + "/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf && bin/pulsar standalone --no-functions-worker -nss", + }, + } + pulsarContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: pulsarRequest, + Started: true, + }) + g.Expect(err).ToNot(HaveOccurred()) + t.Cleanup(func() { pulsarContainer.Terminate(ctx) }) + + pulsarContainer.StartLogProducer(ctx) + defer pulsarContainer.StopLogProducer() + lc := logConsumer{} + pulsarContainer.FollowOutput(&lc) + + pulsarPort, err := pulsarContainer.MappedPort(ctx, "6650/tcp") + g.Expect(err).ToNot(HaveOccurred()) + + pc, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: fmt.Sprintf("pulsar://127.0.0.1:%v", pulsarPort.Int()), + OperationTimeout: 30 * time.Second, + ConnectionTimeout: 30 * time.Second, + }) + g.Expect(err).ToNot(HaveOccurred()) + t.Cleanup(func() { pc.Close() }) + + consumer, err := pc.Subscribe(pulsar.ConsumerOptions{ + Topic: "test-topic", + SubscriptionName: "pulsar-test", + Type: pulsar.Exclusive, + }) + g.Expect(err).ToNot(HaveOccurred()) + t.Cleanup(func() { consumer.Close() }) + + msgChan := make(chan []byte) + go func() { + msg, err := consumer.Receive(ctx) + g.Expect(err).ToNot(HaveOccurred()) + msgChan <- msg.Payload() + consumer.Ack(msg) + }() + + producer, err := pc.CreateProducer(pulsar.ProducerOptions{ + Topic: "test-topic", + }) + g.Expect(err).ToNot(HaveOccurred()) + + producer.Send(ctx, &pulsar.ProducerMessage{ + Payload: []byte("hello world"), + }) + + g.Eventually(msgChan).Should(Receive(Equal([]byte("hello world")))) +} + +type logConsumer struct{} + +func (lc *logConsumer) Accept(l testcontainers.Log) { + fmt.Print(string(l.Content)) +} +``` \ No newline at end of file From 9bfdcb19b32eec51e57e7b965193be36b44c226c Mon Sep 17 00:00:00 2001 From: Clive Jevons Date: Mon, 31 Oct 2022 10:07:14 +0100 Subject: [PATCH 2/4] remove use of gomega from pulsar example, add pulsar example to mkdocs config --- docs/examples/pulsar.md | 40 +++++++++++++++++++++++++++++----------- mkdocs.yml | 1 + 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/docs/examples/pulsar.md b/docs/examples/pulsar.md index ccf12cded3..954555409d 100644 --- a/docs/examples/pulsar.md +++ b/docs/examples/pulsar.md @@ -7,12 +7,10 @@ import ( "context" "fmt" "io" - "io/ioutil" "testing" "time" "github.com/apache/pulsar-client-go/pulsar" - . "github.com/onsi/gomega" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" ) @@ -20,10 +18,9 @@ import ( func TestPulsar(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - g := NewGomegaWithT(t) matchAdminResponse := func(r io.Reader) bool { - respBytes, _ := ioutil.ReadAll(r) + respBytes, _ := io.ReadAll(r) resp := string(respBytes) return resp == `["standalone"]` } @@ -41,7 +38,9 @@ func TestPulsar(t *testing.T) { ContainerRequest: pulsarRequest, Started: true, }) - g.Expect(err).ToNot(HaveOccurred()) + if err != nil { + t.Fatal(err) + } t.Cleanup(func() { pulsarContainer.Terminate(ctx) }) pulsarContainer.StartLogProducer(ctx) @@ -50,14 +49,18 @@ func TestPulsar(t *testing.T) { pulsarContainer.FollowOutput(&lc) pulsarPort, err := pulsarContainer.MappedPort(ctx, "6650/tcp") - g.Expect(err).ToNot(HaveOccurred()) + if err != nil { + t.Fatal(err) + } pc, err := pulsar.NewClient(pulsar.ClientOptions{ URL: fmt.Sprintf("pulsar://127.0.0.1:%v", pulsarPort.Int()), OperationTimeout: 30 * time.Second, ConnectionTimeout: 30 * time.Second, }) - g.Expect(err).ToNot(HaveOccurred()) + if err != nil { + t.Fatal(err) + } t.Cleanup(func() { pc.Close() }) consumer, err := pc.Subscribe(pulsar.ConsumerOptions{ @@ -65,13 +68,18 @@ func TestPulsar(t *testing.T) { SubscriptionName: "pulsar-test", Type: pulsar.Exclusive, }) - g.Expect(err).ToNot(HaveOccurred()) + if err != nil { + t.Fatal(err) + } t.Cleanup(func() { consumer.Close() }) msgChan := make(chan []byte) go func() { msg, err := consumer.Receive(ctx) - g.Expect(err).ToNot(HaveOccurred()) + if err != nil { + fmt.Println("failed to receive message", err) + return + } msgChan <- msg.Payload() consumer.Ack(msg) }() @@ -79,13 +87,23 @@ func TestPulsar(t *testing.T) { producer, err := pc.CreateProducer(pulsar.ProducerOptions{ Topic: "test-topic", }) - g.Expect(err).ToNot(HaveOccurred()) + if err != nil { + t.Fatal(err) + } producer.Send(ctx, &pulsar.ProducerMessage{ Payload: []byte("hello world"), }) - g.Eventually(msgChan).Should(Receive(Equal([]byte("hello world")))) + ticker := time.NewTicker(1 * time.Minute) + select { + case <-ticker.C: + t.Fatal("did not receive message in time") + case msg := <-msgChan: + if string(msg) != "hello world" { + t.Fatal("received unexpected message bytes") + } + } } type logConsumer struct{} diff --git a/mkdocs.yml b/mkdocs.yml index 77b732ed7f..1c945bf123 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -50,6 +50,7 @@ nav: - examples/cockroachdb.md - examples/nginx.md - examples/redis.md + - examples/pulsar.md - System Requirements: - system_requirements/index.md - system_requirements/using_colima.md From 1b06b2f9dddb27b17acb94acb195b5801c9f70cb Mon Sep 17 00:00:00 2001 From: Clive Jevons Date: Mon, 31 Oct 2022 12:46:55 +0100 Subject: [PATCH 3/4] extract pulsar setup to own method --- docs/examples/pulsar.md | 39 ++++++++++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/docs/examples/pulsar.md b/docs/examples/pulsar.md index 954555409d..ba9da990bc 100644 --- a/docs/examples/pulsar.md +++ b/docs/examples/pulsar.md @@ -15,10 +15,12 @@ import ( "github.com/testcontainers/testcontainers-go/wait" ) -func TestPulsar(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +type pulsarContainer struct { + testcontainers.Container + URI string +} +func setupPulsar(ctx context.Context) (*pulsarContainer, error) { matchAdminResponse := func(r io.Reader) bool { respBytes, _ := io.ReadAll(r) resp := string(respBytes) @@ -34,27 +36,42 @@ func TestPulsar(t *testing.T) { "/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf && bin/pulsar standalone --no-functions-worker -nss", }, } - pulsarContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + c, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ ContainerRequest: pulsarRequest, Started: true, }) if err != nil { - t.Fatal(err) + return nil, err } - t.Cleanup(func() { pulsarContainer.Terminate(ctx) }) - pulsarContainer.StartLogProducer(ctx) - defer pulsarContainer.StopLogProducer() + c.StartLogProducer(ctx) + defer c.StopLogProducer() lc := logConsumer{} - pulsarContainer.FollowOutput(&lc) + c.FollowOutput(&lc) + + pulsarPort, err := c.MappedPort(ctx, "6650/tcp") + if err != nil { + return nil, err + } + + return &pulsarContainer{ + Container: c, + URI: fmt.Sprintf("pulsar://127.0.0.1:%v", pulsarPort.Int()), + }, nil +} + +func TestPulsar(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - pulsarPort, err := pulsarContainer.MappedPort(ctx, "6650/tcp") + c, err := setupPulsar(ctx) if err != nil { t.Fatal(err) } + t.Cleanup(func() { c.Container.Terminate(ctx) }) pc, err := pulsar.NewClient(pulsar.ClientOptions{ - URL: fmt.Sprintf("pulsar://127.0.0.1:%v", pulsarPort.Int()), + URL: c.URI, OperationTimeout: 30 * time.Second, ConnectionTimeout: 30 * time.Second, }) From 4976de7970bb08e2db2b9307b3facb2768a42023 Mon Sep 17 00:00:00 2001 From: Clive Jevons Date: Mon, 31 Oct 2022 14:45:32 +0100 Subject: [PATCH 4/4] tweak wait strategy to prevent flakiness in pulsar tests --- docs/examples/pulsar.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/examples/pulsar.md b/docs/examples/pulsar.md index ba9da990bc..6aec7a3bde 100644 --- a/docs/examples/pulsar.md +++ b/docs/examples/pulsar.md @@ -29,7 +29,10 @@ func setupPulsar(ctx context.Context) (*pulsarContainer, error) { pulsarRequest := testcontainers.ContainerRequest{ Image: "docker.io/apachepulsar/pulsar:2.10.2", ExposedPorts: []string{"6650/tcp", "8080/tcp"}, - WaitingFor: wait.ForHTTP("/admin/v2/clusters").WithPort("8080/tcp").WithResponseMatcher(matchAdminResponse), + WaitingFor: wait.ForAll( + wait.ForHTTP("/admin/v2/clusters").WithPort("8080/tcp").WithResponseMatcher(matchAdminResponse), + wait.ForLog("Successfully updated the policies on namespace public/default"), + ), Cmd: []string{ "/bin/bash", "-c",