From 9b37876e6e941c63b6f8bd374f8fa5db64b66f51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Tue, 21 Feb 2023 12:41:09 +0100 Subject: [PATCH 01/29] chore: move pulsar example to modules --- .../workflows/{pulsar-example.yml => module-pulsar.yml} | 2 +- docs/examples/pulsar.md | 9 --------- docs/modules/pulsar.md | 9 +++++++++ mkdocs.yml | 2 +- {examples => modules}/pulsar/Makefile | 0 {examples => modules}/pulsar/go.mod | 0 {examples => modules}/pulsar/go.sum | 0 {examples => modules}/pulsar/pulsar.go | 0 {examples => modules}/pulsar/pulsar_test.go | 0 {examples => modules}/pulsar/tools/tools.go | 0 10 files changed, 11 insertions(+), 11 deletions(-) rename .github/workflows/{pulsar-example.yml => module-pulsar.yml} (95%) delete mode 100644 docs/examples/pulsar.md create mode 100644 docs/modules/pulsar.md rename {examples => modules}/pulsar/Makefile (100%) rename {examples => modules}/pulsar/go.mod (100%) rename {examples => modules}/pulsar/go.sum (100%) rename {examples => modules}/pulsar/pulsar.go (100%) rename {examples => modules}/pulsar/pulsar_test.go (100%) rename {examples => modules}/pulsar/tools/tools.go (100%) diff --git a/.github/workflows/pulsar-example.yml b/.github/workflows/module-pulsar.yml similarity index 95% rename from .github/workflows/pulsar-example.yml rename to .github/workflows/module-pulsar.yml index df7ee7c9ab..4aef3d249c 100644 --- a/.github/workflows/pulsar-example.yml +++ b/.github/workflows/module-pulsar.yml @@ -1,4 +1,4 @@ -name: Pulsar example pipeline +name: Pulsar module pipeline on: [push, pull_request] diff --git a/docs/examples/pulsar.md b/docs/examples/pulsar.md deleted file mode 100644 index b10430f60f..0000000000 --- a/docs/examples/pulsar.md +++ /dev/null @@ -1,9 +0,0 @@ -# Pulsar - - -[Creating an Apache Pulsar container](../../examples/pulsar/pulsar.go) - - - -[Test for an Apache Pulsar container](../../examples/pulsar/pulsar_test.go) - diff --git a/docs/modules/pulsar.md b/docs/modules/pulsar.md new file mode 100644 index 0000000000..a0b56c8d1b --- /dev/null +++ b/docs/modules/pulsar.md @@ -0,0 +1,9 @@ +# Pulsar + + +[Creating an Apache Pulsar container](../../modules/pulsar/pulsar.go) + + + +[Test for an Apache Pulsar container](../../modules/pulsar/pulsar_test.go) + diff --git a/mkdocs.yml b/mkdocs.yml index 39c544d3e2..64d8134212 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -49,6 +49,7 @@ nav: - Modules: - modules/index.md - modules/localstack.md + - modules/pulsar.md - Examples: - examples/index.md - examples/bigtable.md @@ -61,7 +62,6 @@ nav: - examples/nginx.md - examples/postgres.md - examples/pubsub.md - - examples/pulsar.md - examples/redis.md - examples/spanner.md - examples/toxiproxy.md diff --git a/examples/pulsar/Makefile b/modules/pulsar/Makefile similarity index 100% rename from examples/pulsar/Makefile rename to modules/pulsar/Makefile diff --git a/examples/pulsar/go.mod b/modules/pulsar/go.mod similarity index 100% rename from examples/pulsar/go.mod rename to modules/pulsar/go.mod diff --git a/examples/pulsar/go.sum b/modules/pulsar/go.sum similarity index 100% rename from examples/pulsar/go.sum rename to modules/pulsar/go.sum diff --git a/examples/pulsar/pulsar.go b/modules/pulsar/pulsar.go similarity index 100% rename from examples/pulsar/pulsar.go rename to modules/pulsar/pulsar.go diff --git a/examples/pulsar/pulsar_test.go b/modules/pulsar/pulsar_test.go similarity index 100% rename from examples/pulsar/pulsar_test.go rename to modules/pulsar/pulsar_test.go diff --git a/examples/pulsar/tools/tools.go b/modules/pulsar/tools/tools.go similarity index 100% rename from examples/pulsar/tools/tools.go rename to modules/pulsar/tools/tools.go From 7dcbfcda3c34df938f73d0911087ec43fa9d0a21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Tue, 21 Feb 2023 14:15:13 +0100 Subject: [PATCH 02/29] chore: export Pulsar container type --- modules/pulsar/pulsar.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/pulsar/pulsar.go b/modules/pulsar/pulsar.go index 00af752733..6ec0f8c967 100644 --- a/modules/pulsar/pulsar.go +++ b/modules/pulsar/pulsar.go @@ -9,12 +9,12 @@ import ( "github.com/testcontainers/testcontainers-go/wait" ) -type pulsarContainer struct { +type PulsarContainer struct { testcontainers.Container URI string } -func startContainer(ctx context.Context) (*pulsarContainer, error) { +func startContainer(ctx context.Context) (*PulsarContainer, error) { matchAdminResponse := func(r io.Reader) bool { respBytes, _ := io.ReadAll(r) resp := string(respBytes) @@ -51,7 +51,7 @@ func startContainer(ctx context.Context) (*pulsarContainer, error) { return nil, err } - return &pulsarContainer{ + return &PulsarContainer{ Container: c, URI: fmt.Sprintf("pulsar://127.0.0.1:%v", pulsarPort.Int()), }, nil From 766375d0c661482d87e9ccadfa5dec552c5b9193 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Tue, 21 Feb 2023 14:15:35 +0100 Subject: [PATCH 03/29] chore: export Start container func --- modules/pulsar/pulsar.go | 2 +- modules/pulsar/pulsar_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/pulsar/pulsar.go b/modules/pulsar/pulsar.go index 6ec0f8c967..51d4489dd3 100644 --- a/modules/pulsar/pulsar.go +++ b/modules/pulsar/pulsar.go @@ -14,7 +14,7 @@ type PulsarContainer struct { URI string } -func startContainer(ctx context.Context) (*PulsarContainer, error) { +func StartContainer(ctx context.Context) (*PulsarContainer, error) { matchAdminResponse := func(r io.Reader) bool { respBytes, _ := io.ReadAll(r) resp := string(respBytes) diff --git a/modules/pulsar/pulsar_test.go b/modules/pulsar/pulsar_test.go index 024a4f14e5..f0a9a3c64a 100644 --- a/modules/pulsar/pulsar_test.go +++ b/modules/pulsar/pulsar_test.go @@ -13,7 +13,7 @@ func TestPulsar(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c, err := startContainer(ctx) + c, err := StartContainer(ctx) if err != nil { t.Fatal(err) } From 3a5426a34ad722063d7777c8e654d8737d740746 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Tue, 21 Feb 2023 14:18:16 +0100 Subject: [PATCH 04/29] chore: support changing the pulsar image --- modules/pulsar/pulsar.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/modules/pulsar/pulsar.go b/modules/pulsar/pulsar.go index 51d4489dd3..75a8a47064 100644 --- a/modules/pulsar/pulsar.go +++ b/modules/pulsar/pulsar.go @@ -9,6 +9,10 @@ import ( "github.com/testcontainers/testcontainers-go/wait" ) +const defaultPulsarImage = "docker.io/apachepulsar/pulsar:2.10.2" +const defaultPulsarPort = "6650/tcp" +const defaultPulsarAdminPort = "8080/tcp" + type PulsarContainer struct { testcontainers.Container URI string @@ -21,10 +25,10 @@ func StartContainer(ctx context.Context) (*PulsarContainer, error) { return resp == `["standalone"]` } pulsarRequest := testcontainers.ContainerRequest{ - Image: "docker.io/apachepulsar/pulsar:2.10.2", - ExposedPorts: []string{"6650/tcp", "8080/tcp"}, + Image: defaultPulsarImage, + ExposedPorts: []string{defaultPulsarPort, defaultPulsarAdminPort}, WaitingFor: wait.ForAll( - wait.ForHTTP("/admin/v2/clusters").WithPort("8080/tcp").WithResponseMatcher(matchAdminResponse), + wait.ForHTTP("/admin/v2/clusters").WithPort(defaultPulsarAdminPort).WithResponseMatcher(matchAdminResponse), wait.ForLog("Successfully updated the policies on namespace public/default"), ), Cmd: []string{ @@ -46,7 +50,7 @@ func StartContainer(ctx context.Context) (*PulsarContainer, error) { lc := logConsumer{} c.FollowOutput(&lc) - pulsarPort, err := c.MappedPort(ctx, "6650/tcp") + pulsarPort, err := c.MappedPort(ctx, defaultPulsarPort) if err != nil { return nil, err } From 648e642b81618970286f817c13ee68fdf9aee064 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Tue, 21 Feb 2023 17:08:35 +0100 Subject: [PATCH 05/29] chore: support defining custom image for pulsar --- modules/pulsar/pulsar.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/modules/pulsar/pulsar.go b/modules/pulsar/pulsar.go index 75a8a47064..970e8d597c 100644 --- a/modules/pulsar/pulsar.go +++ b/modules/pulsar/pulsar.go @@ -18,7 +18,21 @@ type PulsarContainer struct { URI string } -func StartContainer(ctx context.Context) (*PulsarContainer, error) { +// PulsarContainerOptions is a function that can be used to configure the Pulsar container +type PulsarContainerOptions func(req *testcontainers.ContainerRequest) + +// WithPulsarImage allows to override the default Pulsar image +func WithPulsarImage(image string) PulsarContainerOptions { + return func(req *testcontainers.ContainerRequest) { + if image == "" { + image = defaultPulsarImage + } + + req.Image = image + } +} + +func StartContainer(ctx context.Context, opts ...PulsarContainerOptions) (*PulsarContainer, error) { matchAdminResponse := func(r io.Reader) bool { respBytes, _ := io.ReadAll(r) resp := string(respBytes) @@ -37,6 +51,11 @@ func StartContainer(ctx context.Context) (*PulsarContainer, error) { "/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf && bin/pulsar standalone --no-functions-worker -nss", }, } + + for _, opt := range opts { + opt(&pulsarRequest) + } + c, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ ContainerRequest: pulsarRequest, Started: true, From bf684941a873b6b1bf459901d57fce965c6088c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Tue, 21 Feb 2023 17:19:56 +0100 Subject: [PATCH 06/29] chore: support overriding the default waiting strategy --- modules/pulsar/pulsar.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/modules/pulsar/pulsar.go b/modules/pulsar/pulsar.go index 970e8d597c..3ca8e1fb5c 100644 --- a/modules/pulsar/pulsar.go +++ b/modules/pulsar/pulsar.go @@ -32,6 +32,13 @@ func WithPulsarImage(image string) PulsarContainerOptions { } } +// WithWaitingFor allows to override the default waiting strategy +func WithWaitingFor(waitingFor wait.Strategy) PulsarContainerOptions { + return func(req *testcontainers.ContainerRequest) { + req.WaitingFor = waitingFor + } +} + func StartContainer(ctx context.Context, opts ...PulsarContainerOptions) (*PulsarContainer, error) { matchAdminResponse := func(r io.Reader) bool { respBytes, _ := io.ReadAll(r) From 066f91136744ebdc1a64436e1bc33c827dc05220 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Tue, 21 Feb 2023 17:28:09 +0100 Subject: [PATCH 07/29] chore: support overriding the default env --- modules/pulsar/pulsar.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/modules/pulsar/pulsar.go b/modules/pulsar/pulsar.go index 3ca8e1fb5c..811f024183 100644 --- a/modules/pulsar/pulsar.go +++ b/modules/pulsar/pulsar.go @@ -21,6 +21,15 @@ type PulsarContainer struct { // PulsarContainerOptions is a function that can be used to configure the Pulsar container type PulsarContainerOptions func(req *testcontainers.ContainerRequest) +// WithEnv will merge the given environment variables with the default ones +func WithEnv(env map[string]string) PulsarContainerOptions { + return func(req *testcontainers.ContainerRequest) { + for k, v := range env { + req.Env[k] = v + } + } +} + // WithPulsarImage allows to override the default Pulsar image func WithPulsarImage(image string) PulsarContainerOptions { return func(req *testcontainers.ContainerRequest) { From 5095d0376b98b921b539e071d3871176e27cd134 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Tue, 21 Feb 2023 17:29:07 +0100 Subject: [PATCH 08/29] chore: support overriding the default command --- modules/pulsar/pulsar.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/modules/pulsar/pulsar.go b/modules/pulsar/pulsar.go index 811f024183..17d4e9b873 100644 --- a/modules/pulsar/pulsar.go +++ b/modules/pulsar/pulsar.go @@ -21,6 +21,13 @@ type PulsarContainer struct { // PulsarContainerOptions is a function that can be used to configure the Pulsar container type PulsarContainerOptions func(req *testcontainers.ContainerRequest) +// WithCmd allows to override the default command for the container +func WithCmd(cmd []string) PulsarContainerOptions { + return func(req *testcontainers.ContainerRequest) { + req.Cmd = cmd + } +} + // WithEnv will merge the given environment variables with the default ones func WithEnv(env map[string]string) PulsarContainerOptions { return func(req *testcontainers.ContainerRequest) { From 65d97c97b9eb61b46a7994265e8249d00759d899 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Tue, 21 Feb 2023 17:33:16 +0100 Subject: [PATCH 09/29] docs: document StartContainer function --- modules/pulsar/pulsar.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/modules/pulsar/pulsar.go b/modules/pulsar/pulsar.go index 17d4e9b873..b83654a165 100644 --- a/modules/pulsar/pulsar.go +++ b/modules/pulsar/pulsar.go @@ -55,6 +55,14 @@ func WithWaitingFor(waitingFor wait.Strategy) PulsarContainerOptions { } } +// StartContainer creates an instance of the Pulsar container type, being possible to pass a custom request and options +// The created container will use the following defaults: +// - image: docker.io/apachepulsar/pulsar:2.10.2 +// - exposed ports: 6650/tcp, 8080/tcp +// - waiting strategy: wait for all the following strategies: +// - the Pulsar admin API ("/admin/v2/clusters") to be ready on port 8080/tcp and return the response `["standalone"]` +// - the log message "Successfully updated the policies on namespace public/default" +// - command: "/bin/bash -c /pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf && bin/pulsar standalone --no-functions-worker -nss" func StartContainer(ctx context.Context, opts ...PulsarContainerOptions) (*PulsarContainer, error) { matchAdminResponse := func(r io.Reader) bool { respBytes, _ := io.ReadAll(r) From 3e44fbdba75bb5730641019a30fc083260ef3b29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Tue, 21 Feb 2023 17:44:50 +0100 Subject: [PATCH 10/29] chore: leverate modifiers --- modules/pulsar/pulsar.go | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/modules/pulsar/pulsar.go b/modules/pulsar/pulsar.go index b83654a165..ff2cdc3512 100644 --- a/modules/pulsar/pulsar.go +++ b/modules/pulsar/pulsar.go @@ -5,6 +5,8 @@ import ( "fmt" "io" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/network" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" ) @@ -28,12 +30,24 @@ func WithCmd(cmd []string) PulsarContainerOptions { } } -// WithEnv will merge the given environment variables with the default ones -func WithEnv(env map[string]string) PulsarContainerOptions { +// WithConfigModifier allows to override the default container config +func WithConfigModifier(modifier func(config *container.Config)) PulsarContainerOptions { return func(req *testcontainers.ContainerRequest) { - for k, v := range env { - req.Env[k] = v - } + req.ConfigModifier = modifier + } +} + +// WithEndpointSettingsModifier allows to override the default endpoint settings +func WithEndpointSettingsModifier(modifier func(settings map[string]*network.EndpointSettings)) PulsarContainerOptions { + return func(req *testcontainers.ContainerRequest) { + req.EnpointSettingsModifier = modifier + } +} + +// WithHostConfigModifier allows to override the default host config +func WithHostConfigModifier(modifier func(hostConfig *container.HostConfig)) PulsarContainerOptions { + return func(req *testcontainers.ContainerRequest) { + req.HostConfigModifier = modifier } } From 391ababad7e7519aafc44ae69567773f73f649d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Tue, 21 Feb 2023 17:51:16 +0100 Subject: [PATCH 11/29] chore: create PulsarContainerRequest abstraction --- modules/pulsar/pulsar.go | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/modules/pulsar/pulsar.go b/modules/pulsar/pulsar.go index ff2cdc3512..c83fea3cdf 100644 --- a/modules/pulsar/pulsar.go +++ b/modules/pulsar/pulsar.go @@ -20,40 +20,44 @@ type PulsarContainer struct { URI string } +type PulsarContainerRequest struct { + testcontainers.ContainerRequest +} + // PulsarContainerOptions is a function that can be used to configure the Pulsar container -type PulsarContainerOptions func(req *testcontainers.ContainerRequest) +type PulsarContainerOptions func(req *PulsarContainerRequest) // WithCmd allows to override the default command for the container func WithCmd(cmd []string) PulsarContainerOptions { - return func(req *testcontainers.ContainerRequest) { + return func(req *PulsarContainerRequest) { req.Cmd = cmd } } // WithConfigModifier allows to override the default container config func WithConfigModifier(modifier func(config *container.Config)) PulsarContainerOptions { - return func(req *testcontainers.ContainerRequest) { + return func(req *PulsarContainerRequest) { req.ConfigModifier = modifier } } // WithEndpointSettingsModifier allows to override the default endpoint settings func WithEndpointSettingsModifier(modifier func(settings map[string]*network.EndpointSettings)) PulsarContainerOptions { - return func(req *testcontainers.ContainerRequest) { + return func(req *PulsarContainerRequest) { req.EnpointSettingsModifier = modifier } } // WithHostConfigModifier allows to override the default host config func WithHostConfigModifier(modifier func(hostConfig *container.HostConfig)) PulsarContainerOptions { - return func(req *testcontainers.ContainerRequest) { + return func(req *PulsarContainerRequest) { req.HostConfigModifier = modifier } } // WithPulsarImage allows to override the default Pulsar image func WithPulsarImage(image string) PulsarContainerOptions { - return func(req *testcontainers.ContainerRequest) { + return func(req *PulsarContainerRequest) { if image == "" { image = defaultPulsarImage } @@ -64,7 +68,7 @@ func WithPulsarImage(image string) PulsarContainerOptions { // WithWaitingFor allows to override the default waiting strategy func WithWaitingFor(waitingFor wait.Strategy) PulsarContainerOptions { - return func(req *testcontainers.ContainerRequest) { + return func(req *PulsarContainerRequest) { req.WaitingFor = waitingFor } } @@ -83,7 +87,7 @@ func StartContainer(ctx context.Context, opts ...PulsarContainerOptions) (*Pulsa resp := string(respBytes) return resp == `["standalone"]` } - pulsarRequest := testcontainers.ContainerRequest{ + req := testcontainers.ContainerRequest{ Image: defaultPulsarImage, ExposedPorts: []string{defaultPulsarPort, defaultPulsarAdminPort}, WaitingFor: wait.ForAll( @@ -97,12 +101,16 @@ func StartContainer(ctx context.Context, opts ...PulsarContainerOptions) (*Pulsa }, } + pulsarRequest := PulsarContainerRequest{ + ContainerRequest: req, + } + for _, opt := range opts { opt(&pulsarRequest) } c, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: pulsarRequest, + ContainerRequest: pulsarRequest.ContainerRequest, Started: true, }) if err != nil { From d382b1996948aea2e06d04c5d99c3b3c6cb69982 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Tue, 21 Feb 2023 18:17:57 +0100 Subject: [PATCH 12/29] chore: make a more advanced setup for the test --- modules/pulsar/go.mod | 7 +++++-- modules/pulsar/go.sum | 1 + modules/pulsar/pulsar_test.go | 29 ++++++++++++++++++++++++++++- 3 files changed, 34 insertions(+), 3 deletions(-) diff --git a/modules/pulsar/go.mod b/modules/pulsar/go.mod index 2e52f83573..2f7f0aa5bd 100644 --- a/modules/pulsar/go.mod +++ b/modules/pulsar/go.mod @@ -4,6 +4,8 @@ go 1.18 require ( github.com/apache/pulsar-client-go v0.9.0 + github.com/docker/docker v23.0.0+incompatible + github.com/stretchr/testify v1.8.1 github.com/testcontainers/testcontainers-go v0.18.0 gotest.tools/gotestsum v1.9.0 ) @@ -23,9 +25,9 @@ require ( github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/containerd/containerd v1.6.18 // indirect github.com/danieljoos/wincred v1.1.2 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/dnephin/pflag v1.0.7 // indirect github.com/docker/distribution v2.8.1+incompatible // indirect - github.com/docker/docker v23.0.0+incompatible // indirect github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/dvsekhvalnov/jose2go v1.5.0 // indirect @@ -56,13 +58,13 @@ require ( github.com/opencontainers/runc v1.1.3 // indirect github.com/pierrec/lz4 v2.0.5+incompatible // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.12.2 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect - github.com/stretchr/objx v0.5.0 // indirect go.uber.org/atomic v1.7.0 // indirect golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect golang.org/x/net v0.7.0 // indirect @@ -75,5 +77,6 @@ require ( google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad // indirect google.golang.org/grpc v1.47.0 // indirect google.golang.org/protobuf v1.28.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect gotest.tools/v3 v3.4.0 // indirect ) diff --git a/modules/pulsar/go.sum b/modules/pulsar/go.sum index e3fff4af5f..ac75025990 100644 --- a/modules/pulsar/go.sum +++ b/modules/pulsar/go.sum @@ -439,6 +439,7 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= diff --git a/modules/pulsar/pulsar_test.go b/modules/pulsar/pulsar_test.go index f0a9a3c64a..e150178944 100644 --- a/modules/pulsar/pulsar_test.go +++ b/modules/pulsar/pulsar_test.go @@ -7,13 +7,40 @@ import ( "time" "github.com/apache/pulsar-client-go/pulsar" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/network" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" ) func TestPulsar(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - c, err := StartContainer(ctx) + nwName := "pulsar-test" + _, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{ + NetworkRequest: testcontainers.NetworkRequest{ + Name: nwName, + }, + }) + require.NoError(t, err) + + c, err := StartContainer( + ctx, + WithConfigModifier(func(config *container.Config) { + config.Env = append(config.Env, "PULSAR_MEM= -Xms512m -Xmx512m -XX:MaxDirectMemorySize=512m") + }), + WithHostConfigModifier(func(hostConfig *container.HostConfig) { + hostConfig.Resources = container.Resources{ + Memory: 1024 * 1024 * 1024, + } + }), + WithEndpointSettingsModifier(func(settings map[string]*network.EndpointSettings) { + settings[nwName] = &network.EndpointSettings{ + Aliases: []string{"pulsar"}, + } + }), + ) if err != nil { t.Fatal(err) } From 1441cc40874b18a8f94a886a147eb350a10e8018 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Tue, 21 Feb 2023 19:11:55 +0100 Subject: [PATCH 13/29] chore: support for setting function workers --- modules/pulsar/pulsar.go | 56 ++++++------- modules/pulsar/pulsar_test.go | 151 ++++++++++++++++++++-------------- 2 files changed, 116 insertions(+), 91 deletions(-) diff --git a/modules/pulsar/pulsar.go b/modules/pulsar/pulsar.go index c83fea3cdf..f57c2ba300 100644 --- a/modules/pulsar/pulsar.go +++ b/modules/pulsar/pulsar.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "strings" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/network" @@ -14,6 +15,17 @@ import ( const defaultPulsarImage = "docker.io/apachepulsar/pulsar:2.10.2" const defaultPulsarPort = "6650/tcp" const defaultPulsarAdminPort = "8080/tcp" +const defaultPulsarCmd = "/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf && bin/pulsar standalone" +const detaultPulsarCmdWithoutFunctionsWorker = "--no-functions-worker -nss" + +var defaultWaitStrategies = []wait.Strategy{ + wait.ForHTTP("/admin/v2/clusters").WithPort(defaultPulsarAdminPort).WithResponseMatcher(func(r io.Reader) bool { + respBytes, _ := io.ReadAll(r) + resp := string(respBytes) + return resp == `["standalone"]` + }), + wait.ForLog("Successfully updated the policies on namespace public/default"), +} type PulsarContainer struct { testcontainers.Container @@ -27,13 +39,6 @@ type PulsarContainerRequest struct { // PulsarContainerOptions is a function that can be used to configure the Pulsar container type PulsarContainerOptions func(req *PulsarContainerRequest) -// WithCmd allows to override the default command for the container -func WithCmd(cmd []string) PulsarContainerOptions { - return func(req *PulsarContainerRequest) { - req.Cmd = cmd - } -} - // WithConfigModifier allows to override the default container config func WithConfigModifier(modifier func(config *container.Config)) PulsarContainerOptions { return func(req *PulsarContainerRequest) { @@ -48,6 +53,20 @@ func WithEndpointSettingsModifier(modifier func(settings map[string]*network.End } } +// WithFunctionsWorker enables the functions worker, which will override the default pulsar command +// and add a waiting strategy for the functions worker +func WithFunctionsWorker() PulsarContainerOptions { + return func(req *PulsarContainerRequest) { + req.Cmd = []string{"/bin/bash", "-c", defaultPulsarCmd} + + // add the waiting strategy for the functions worker + ws := wait.ForAll(defaultWaitStrategies...) + ws.Strategies = append(ws.Strategies, wait.ForLog("Function worker service started")) + + req.WaitingFor = ws + } +} + // WithHostConfigModifier allows to override the default host config func WithHostConfigModifier(modifier func(hostConfig *container.HostConfig)) PulsarContainerOptions { return func(req *PulsarContainerRequest) { @@ -66,13 +85,6 @@ func WithPulsarImage(image string) PulsarContainerOptions { } } -// WithWaitingFor allows to override the default waiting strategy -func WithWaitingFor(waitingFor wait.Strategy) PulsarContainerOptions { - return func(req *PulsarContainerRequest) { - req.WaitingFor = waitingFor - } -} - // StartContainer creates an instance of the Pulsar container type, being possible to pass a custom request and options // The created container will use the following defaults: // - image: docker.io/apachepulsar/pulsar:2.10.2 @@ -82,23 +94,11 @@ func WithWaitingFor(waitingFor wait.Strategy) PulsarContainerOptions { // - the log message "Successfully updated the policies on namespace public/default" // - command: "/bin/bash -c /pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf && bin/pulsar standalone --no-functions-worker -nss" func StartContainer(ctx context.Context, opts ...PulsarContainerOptions) (*PulsarContainer, error) { - matchAdminResponse := func(r io.Reader) bool { - respBytes, _ := io.ReadAll(r) - resp := string(respBytes) - return resp == `["standalone"]` - } req := testcontainers.ContainerRequest{ Image: defaultPulsarImage, ExposedPorts: []string{defaultPulsarPort, defaultPulsarAdminPort}, - WaitingFor: wait.ForAll( - wait.ForHTTP("/admin/v2/clusters").WithPort(defaultPulsarAdminPort).WithResponseMatcher(matchAdminResponse), - wait.ForLog("Successfully updated the policies on namespace public/default"), - ), - Cmd: []string{ - "/bin/bash", - "-c", - "/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf && bin/pulsar standalone --no-functions-worker -nss", - }, + WaitingFor: wait.ForAll(defaultWaitStrategies...), + Cmd: []string{"/bin/bash", "-c", strings.Join([]string{defaultPulsarCmd, detaultPulsarCmdWithoutFunctionsWorker}, " ")}, } pulsarRequest := PulsarContainerRequest{ diff --git a/modules/pulsar/pulsar_test.go b/modules/pulsar/pulsar_test.go index e150178944..7fff60df58 100644 --- a/modules/pulsar/pulsar_test.go +++ b/modules/pulsar/pulsar_test.go @@ -25,75 +25,100 @@ func TestPulsar(t *testing.T) { }) require.NoError(t, err) - c, err := StartContainer( - ctx, - WithConfigModifier(func(config *container.Config) { - config.Env = append(config.Env, "PULSAR_MEM= -Xms512m -Xmx512m -XX:MaxDirectMemorySize=512m") - }), - WithHostConfigModifier(func(hostConfig *container.HostConfig) { - hostConfig.Resources = container.Resources{ - Memory: 1024 * 1024 * 1024, - } - }), - WithEndpointSettingsModifier(func(settings map[string]*network.EndpointSettings) { - settings[nwName] = &network.EndpointSettings{ - Aliases: []string{"pulsar"}, - } - }), - ) - if err != nil { - t.Fatal(err) + tests := []struct { + name string + opts []PulsarContainerOptions + }{ + { + name: "default", + }, + { + name: "with modifiers", + opts: []PulsarContainerOptions{ + WithConfigModifier(func(config *container.Config) { + config.Env = append(config.Env, "PULSAR_MEM= -Xms512m -Xmx512m -XX:MaxDirectMemorySize=512m") + }), + WithHostConfigModifier(func(hostConfig *container.HostConfig) { + hostConfig.Resources = container.Resources{ + Memory: 1024 * 1024 * 1024, + } + }), + WithEndpointSettingsModifier(func(settings map[string]*network.EndpointSettings) { + settings[nwName] = &network.EndpointSettings{ + Aliases: []string{"pulsar"}, + } + }), + }, + }, + { + name: "with functions worker", + opts: []PulsarContainerOptions{ + WithFunctionsWorker(), + }, + }, } - pc, err := pulsar.NewClient(pulsar.ClientOptions{ - URL: c.URI, - OperationTimeout: 30 * time.Second, - ConnectionTimeout: 30 * time.Second, - }) - if err != nil { - t.Fatal(err) - } - t.Cleanup(func() { pc.Close() }) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c, err := StartContainer( + ctx, + tt.opts..., + ) + if err != nil { + t.Fatal(err) + } - consumer, err := pc.Subscribe(pulsar.ConsumerOptions{ - Topic: "test-topic", - SubscriptionName: "pulsar-test", - Type: pulsar.Exclusive, - }) - if err != nil { - t.Fatal(err) - } - t.Cleanup(func() { consumer.Close() }) + pc, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: c.URI, + OperationTimeout: 30 * time.Second, + ConnectionTimeout: 30 * time.Second, + }) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { pc.Close() }) - msgChan := make(chan []byte) - go func() { - msg, err := consumer.Receive(ctx) - if err != nil { - fmt.Println("failed to receive message", err) - return - } - msgChan <- msg.Payload() - consumer.Ack(msg) - }() + consumer, err := pc.Subscribe(pulsar.ConsumerOptions{ + Topic: "test-topic", + SubscriptionName: "pulsar-test", + Type: pulsar.Exclusive, + }) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { consumer.Close() }) - producer, err := pc.CreateProducer(pulsar.ProducerOptions{ - Topic: "test-topic", - }) - if err != nil { - t.Fatal(err) - } + msgChan := make(chan []byte) + go func() { + msg, err := consumer.Receive(ctx) + if err != nil { + fmt.Println("failed to receive message", err) + return + } + msgChan <- msg.Payload() + consumer.Ack(msg) + }() - producer.Send(ctx, &pulsar.ProducerMessage{ - Payload: []byte("hello world"), - }) + producer, err := pc.CreateProducer(pulsar.ProducerOptions{ + Topic: "test-topic", + }) + if err != nil { + t.Fatal(err) + } - ticker := time.NewTicker(1 * time.Minute) - select { - case <-ticker.C: - t.Fatal("did not receive message in time") - case msg := <-msgChan: - if string(msg) != "hello world" { - t.Fatal("received unexpected message bytes") - } + producer.Send(ctx, &pulsar.ProducerMessage{ + Payload: []byte("hello world"), + }) + + ticker := time.NewTicker(1 * time.Minute) + select { + case <-ticker.C: + t.Fatal("did not receive message in time") + case msg := <-msgChan: + if string(msg) != "hello world" { + t.Fatal("received unexpected message bytes") + } + } + }) } } From 94799e6e2c65a048a3513ef315261795a3e893b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Wed, 22 Feb 2023 11:14:29 +0100 Subject: [PATCH 14/29] chore: support for setting transactions --- modules/pulsar/pulsar.go | 35 +++++++++++++++++++++++++++++------ modules/pulsar/pulsar_test.go | 6 ++++++ 2 files changed, 35 insertions(+), 6 deletions(-) diff --git a/modules/pulsar/pulsar.go b/modules/pulsar/pulsar.go index f57c2ba300..06c8552c39 100644 --- a/modules/pulsar/pulsar.go +++ b/modules/pulsar/pulsar.go @@ -17,15 +17,16 @@ const defaultPulsarPort = "6650/tcp" const defaultPulsarAdminPort = "8080/tcp" const defaultPulsarCmd = "/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf && bin/pulsar standalone" const detaultPulsarCmdWithoutFunctionsWorker = "--no-functions-worker -nss" +const transactionTopicEndpoint = "/admin/v2/persistent/pulsar/system/transaction_coordinator_assign/partitions" -var defaultWaitStrategies = []wait.Strategy{ +var defaultWaitStrategies = wait.ForAll( wait.ForHTTP("/admin/v2/clusters").WithPort(defaultPulsarAdminPort).WithResponseMatcher(func(r io.Reader) bool { respBytes, _ := io.ReadAll(r) resp := string(respBytes) return resp == `["standalone"]` }), wait.ForLog("Successfully updated the policies on namespace public/default"), -} +) type PulsarContainer struct { testcontainers.Container @@ -60,10 +61,12 @@ func WithFunctionsWorker() PulsarContainerOptions { req.Cmd = []string{"/bin/bash", "-c", defaultPulsarCmd} // add the waiting strategy for the functions worker - ws := wait.ForAll(defaultWaitStrategies...) - ws.Strategies = append(ws.Strategies, wait.ForLog("Function worker service started")) + defaultWaitStrategies.Strategies = append( + defaultWaitStrategies.Strategies, + wait.ForLog("Function worker service started"), + ) - req.WaitingFor = ws + req.WaitingFor = defaultWaitStrategies } } @@ -85,6 +88,26 @@ func WithPulsarImage(image string) PulsarContainerOptions { } } +func WithTransactions() PulsarContainerOptions { + return func(req *PulsarContainerRequest) { + if req.Env == nil { + req.Env = make(map[string]string) + } + + req.ContainerRequest.Env["PULSAR_PREFIX_transactionCoordinatorEnabled"] = "true" + + // add the waiting strategy for the transaction topic + defaultWaitStrategies.Strategies = append( + defaultWaitStrategies.Strategies, + wait.ForHTTP(transactionTopicEndpoint).WithPort(defaultPulsarAdminPort).WithStatusCodeMatcher(func(statusCode int) bool { + return statusCode == 200 + }), + ) + + req.WaitingFor = defaultWaitStrategies + } +} + // StartContainer creates an instance of the Pulsar container type, being possible to pass a custom request and options // The created container will use the following defaults: // - image: docker.io/apachepulsar/pulsar:2.10.2 @@ -97,7 +120,7 @@ func StartContainer(ctx context.Context, opts ...PulsarContainerOptions) (*Pulsa req := testcontainers.ContainerRequest{ Image: defaultPulsarImage, ExposedPorts: []string{defaultPulsarPort, defaultPulsarAdminPort}, - WaitingFor: wait.ForAll(defaultWaitStrategies...), + WaitingFor: defaultWaitStrategies, Cmd: []string{"/bin/bash", "-c", strings.Join([]string{defaultPulsarCmd, detaultPulsarCmdWithoutFunctionsWorker}, " ")}, } diff --git a/modules/pulsar/pulsar_test.go b/modules/pulsar/pulsar_test.go index 7fff60df58..6227cbc8d7 100644 --- a/modules/pulsar/pulsar_test.go +++ b/modules/pulsar/pulsar_test.go @@ -56,6 +56,12 @@ func TestPulsar(t *testing.T) { WithFunctionsWorker(), }, }, + { + name: "with transactions", + opts: []PulsarContainerOptions{ + WithTransactions(), + }, + }, } for _, tt := range tests { From 3fa65ada905382819f80a4037a4777fdcee8b3ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Wed, 22 Feb 2023 11:53:22 +0100 Subject: [PATCH 15/29] chore: support passing log consumers --- modules/pulsar/pulsar.go | 37 ++++++++++++++++++++++------------- modules/pulsar/pulsar_test.go | 16 +++++++++++++++ 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/modules/pulsar/pulsar.go b/modules/pulsar/pulsar.go index 06c8552c39..ec8fb6e2eb 100644 --- a/modules/pulsar/pulsar.go +++ b/modules/pulsar/pulsar.go @@ -30,11 +30,13 @@ var defaultWaitStrategies = wait.ForAll( type PulsarContainer struct { testcontainers.Container - URI string + logConsumers []testcontainers.LogConsumer + URI string } type PulsarContainerRequest struct { testcontainers.ContainerRequest + logConsumers []testcontainers.LogConsumer } // PulsarContainerOptions is a function that can be used to configure the Pulsar container @@ -77,6 +79,13 @@ func WithHostConfigModifier(modifier func(hostConfig *container.HostConfig)) Pul } } +// WithLogConsumer allows to add log consumers to the container +func WithLogConsumers(consumer ...testcontainers.LogConsumer) PulsarContainerOptions { + return func(req *PulsarContainerRequest) { + req.logConsumers = append(req.logConsumers, consumer...) + } +} + // WithPulsarImage allows to override the default Pulsar image func WithPulsarImage(image string) PulsarContainerOptions { return func(req *PulsarContainerRequest) { @@ -126,6 +135,7 @@ func StartContainer(ctx context.Context, opts ...PulsarContainerOptions) (*Pulsa pulsarRequest := PulsarContainerRequest{ ContainerRequest: req, + logConsumers: []testcontainers.LogConsumer{}, } for _, opt := range opts { @@ -140,24 +150,23 @@ func StartContainer(ctx context.Context, opts ...PulsarContainerOptions) (*Pulsa return nil, err } - c.StartLogProducer(ctx) - defer c.StopLogProducer() - lc := logConsumer{} - c.FollowOutput(&lc) - pulsarPort, err := c.MappedPort(ctx, defaultPulsarPort) if err != nil { return nil, err } - return &PulsarContainer{ - Container: c, - URI: fmt.Sprintf("pulsar://127.0.0.1:%v", pulsarPort.Int()), - }, nil -} + pc := &PulsarContainer{ + Container: c, + logConsumers: pulsarRequest.logConsumers, + URI: fmt.Sprintf("pulsar://127.0.0.1:%v", pulsarPort.Int()), + } -type logConsumer struct{} + if len(pc.logConsumers) > 0 { + c.StartLogProducer(ctx) + } + for _, lc := range pc.logConsumers { + c.FollowOutput(lc) + } -func (lc *logConsumer) Accept(l testcontainers.Log) { - fmt.Print(string(l.Content)) + return pc, nil } diff --git a/modules/pulsar/pulsar_test.go b/modules/pulsar/pulsar_test.go index 6227cbc8d7..69e7a0e2c3 100644 --- a/modules/pulsar/pulsar_test.go +++ b/modules/pulsar/pulsar_test.go @@ -13,6 +13,12 @@ import ( "github.com/testcontainers/testcontainers-go" ) +type testLogConsumer struct{} + +func (lc *testLogConsumer) Accept(l testcontainers.Log) { + fmt.Print(string(l.Content)) +} + func TestPulsar(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -62,6 +68,12 @@ func TestPulsar(t *testing.T) { WithTransactions(), }, }, + { + name: "with log consumers", + opts: []PulsarContainerOptions{ + WithLogConsumers(&testLogConsumer{}), + }, + }, } for _, tt := range tests { @@ -74,6 +86,10 @@ func TestPulsar(t *testing.T) { t.Fatal(err) } + if len(c.logConsumers) > 0 { + defer c.StopLogProducer() + } + pc, err := pulsar.NewClient(pulsar.ClientOptions{ URL: c.URI, OperationTimeout: 30 * time.Second, From 7a6ca91acbaa5a45b53a53bd67164b271e95a1ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Wed, 22 Feb 2023 11:54:03 +0100 Subject: [PATCH 16/29] chore: simplify initialisation of env --- modules/pulsar/pulsar.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/modules/pulsar/pulsar.go b/modules/pulsar/pulsar.go index ec8fb6e2eb..1931d3050e 100644 --- a/modules/pulsar/pulsar.go +++ b/modules/pulsar/pulsar.go @@ -99,10 +99,6 @@ func WithPulsarImage(image string) PulsarContainerOptions { func WithTransactions() PulsarContainerOptions { return func(req *PulsarContainerRequest) { - if req.Env == nil { - req.Env = make(map[string]string) - } - req.ContainerRequest.Env["PULSAR_PREFIX_transactionCoordinatorEnabled"] = "true" // add the waiting strategy for the transaction topic @@ -128,6 +124,7 @@ func WithTransactions() PulsarContainerOptions { func StartContainer(ctx context.Context, opts ...PulsarContainerOptions) (*PulsarContainer, error) { req := testcontainers.ContainerRequest{ Image: defaultPulsarImage, + Env: map[string]string{}, ExposedPorts: []string{defaultPulsarPort, defaultPulsarAdminPort}, WaitingFor: defaultWaitStrategies, Cmd: []string{"/bin/bash", "-c", strings.Join([]string{defaultPulsarCmd, detaultPulsarCmdWithoutFunctionsWorker}, " ")}, From a72855fcb6cf4a294b9773cee6f1e66dc96b0307 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Wed, 22 Feb 2023 12:04:16 +0100 Subject: [PATCH 17/29] fix: use right module path --- modules/pulsar/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/pulsar/go.mod b/modules/pulsar/go.mod index 2f7f0aa5bd..de16c1e443 100644 --- a/modules/pulsar/go.mod +++ b/modules/pulsar/go.mod @@ -1,4 +1,4 @@ -module github.com/testcontainers/testcontainers-go/examples/pulsar +module github.com/testcontainers/testcontainers-go/modules/pulsar go 1.18 From 6d8b4da824f77c20bea587d8d0920725c2563adb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Wed, 22 Feb 2023 12:20:13 +0100 Subject: [PATCH 18/29] chore: migrate tests to its own package --- modules/pulsar/pulsar.go | 11 ++++++----- modules/pulsar/pulsar_test.go | 29 +++++++++++++++-------------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/modules/pulsar/pulsar.go b/modules/pulsar/pulsar.go index 1931d3050e..5ba65d6ecf 100644 --- a/modules/pulsar/pulsar.go +++ b/modules/pulsar/pulsar.go @@ -30,7 +30,7 @@ var defaultWaitStrategies = wait.ForAll( type PulsarContainer struct { testcontainers.Container - logConsumers []testcontainers.LogConsumer + LogConsumers []testcontainers.LogConsumer // Needs to be exported to control the stop from the caller URI string } @@ -79,7 +79,8 @@ func WithHostConfigModifier(modifier func(hostConfig *container.HostConfig)) Pul } } -// WithLogConsumer allows to add log consumers to the container +// WithLogConsumer allows to add log consumers to the container. They will be automatically started and stopped by the StartContainer function +// but it's a responsibility of the caller to stop them calling StopLogProducer func WithLogConsumers(consumer ...testcontainers.LogConsumer) PulsarContainerOptions { return func(req *PulsarContainerRequest) { req.logConsumers = append(req.logConsumers, consumer...) @@ -154,14 +155,14 @@ func StartContainer(ctx context.Context, opts ...PulsarContainerOptions) (*Pulsa pc := &PulsarContainer{ Container: c, - logConsumers: pulsarRequest.logConsumers, + LogConsumers: pulsarRequest.logConsumers, URI: fmt.Sprintf("pulsar://127.0.0.1:%v", pulsarPort.Int()), } - if len(pc.logConsumers) > 0 { + if len(pc.LogConsumers) > 0 { c.StartLogProducer(ctx) } - for _, lc := range pc.logConsumers { + for _, lc := range pc.LogConsumers { c.FollowOutput(lc) } diff --git a/modules/pulsar/pulsar_test.go b/modules/pulsar/pulsar_test.go index 69e7a0e2c3..42c6eba529 100644 --- a/modules/pulsar/pulsar_test.go +++ b/modules/pulsar/pulsar_test.go @@ -1,4 +1,4 @@ -package pulsar +package pulsar_test import ( "context" @@ -11,6 +11,7 @@ import ( "github.com/docker/docker/api/types/network" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" + testcontainerspulsar "github.com/testcontainers/testcontainers-go/modules/pulsar" ) type testLogConsumer struct{} @@ -33,23 +34,23 @@ func TestPulsar(t *testing.T) { tests := []struct { name string - opts []PulsarContainerOptions + opts []testcontainerspulsar.PulsarContainerOptions }{ { name: "default", }, { name: "with modifiers", - opts: []PulsarContainerOptions{ - WithConfigModifier(func(config *container.Config) { + opts: []testcontainerspulsar.PulsarContainerOptions{ + testcontainerspulsar.WithConfigModifier(func(config *container.Config) { config.Env = append(config.Env, "PULSAR_MEM= -Xms512m -Xmx512m -XX:MaxDirectMemorySize=512m") }), - WithHostConfigModifier(func(hostConfig *container.HostConfig) { + testcontainerspulsar.WithHostConfigModifier(func(hostConfig *container.HostConfig) { hostConfig.Resources = container.Resources{ Memory: 1024 * 1024 * 1024, } }), - WithEndpointSettingsModifier(func(settings map[string]*network.EndpointSettings) { + testcontainerspulsar.WithEndpointSettingsModifier(func(settings map[string]*network.EndpointSettings) { settings[nwName] = &network.EndpointSettings{ Aliases: []string{"pulsar"}, } @@ -58,27 +59,27 @@ func TestPulsar(t *testing.T) { }, { name: "with functions worker", - opts: []PulsarContainerOptions{ - WithFunctionsWorker(), + opts: []testcontainerspulsar.PulsarContainerOptions{ + testcontainerspulsar.WithFunctionsWorker(), }, }, { name: "with transactions", - opts: []PulsarContainerOptions{ - WithTransactions(), + opts: []testcontainerspulsar.PulsarContainerOptions{ + testcontainerspulsar.WithTransactions(), }, }, { name: "with log consumers", - opts: []PulsarContainerOptions{ - WithLogConsumers(&testLogConsumer{}), + opts: []testcontainerspulsar.PulsarContainerOptions{ + testcontainerspulsar.WithLogConsumers(&testLogConsumer{}), }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - c, err := StartContainer( + c, err := testcontainerspulsar.StartContainer( ctx, tt.opts..., ) @@ -86,7 +87,7 @@ func TestPulsar(t *testing.T) { t.Fatal(err) } - if len(c.logConsumers) > 0 { + if len(c.LogConsumers) > 0 { defer c.StopLogProducer() } From 670773d6dba9ad8103a81e6718b53b96364fc61f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Wed, 22 Feb 2023 12:22:10 +0100 Subject: [PATCH 19/29] chore: remove Pulsar from exported fields --- modules/pulsar/pulsar.go | 42 +++++++++++++++++------------------ modules/pulsar/pulsar_test.go | 10 ++++----- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/modules/pulsar/pulsar.go b/modules/pulsar/pulsar.go index 5ba65d6ecf..26815ce751 100644 --- a/modules/pulsar/pulsar.go +++ b/modules/pulsar/pulsar.go @@ -28,38 +28,38 @@ var defaultWaitStrategies = wait.ForAll( wait.ForLog("Successfully updated the policies on namespace public/default"), ) -type PulsarContainer struct { +type Container struct { testcontainers.Container LogConsumers []testcontainers.LogConsumer // Needs to be exported to control the stop from the caller URI string } -type PulsarContainerRequest struct { +type ContainerRequest struct { testcontainers.ContainerRequest logConsumers []testcontainers.LogConsumer } -// PulsarContainerOptions is a function that can be used to configure the Pulsar container -type PulsarContainerOptions func(req *PulsarContainerRequest) +// ContainerOptions is a function that can be used to configure the Pulsar container +type ContainerOptions func(req *ContainerRequest) // WithConfigModifier allows to override the default container config -func WithConfigModifier(modifier func(config *container.Config)) PulsarContainerOptions { - return func(req *PulsarContainerRequest) { +func WithConfigModifier(modifier func(config *container.Config)) ContainerOptions { + return func(req *ContainerRequest) { req.ConfigModifier = modifier } } // WithEndpointSettingsModifier allows to override the default endpoint settings -func WithEndpointSettingsModifier(modifier func(settings map[string]*network.EndpointSettings)) PulsarContainerOptions { - return func(req *PulsarContainerRequest) { +func WithEndpointSettingsModifier(modifier func(settings map[string]*network.EndpointSettings)) ContainerOptions { + return func(req *ContainerRequest) { req.EnpointSettingsModifier = modifier } } // WithFunctionsWorker enables the functions worker, which will override the default pulsar command // and add a waiting strategy for the functions worker -func WithFunctionsWorker() PulsarContainerOptions { - return func(req *PulsarContainerRequest) { +func WithFunctionsWorker() ContainerOptions { + return func(req *ContainerRequest) { req.Cmd = []string{"/bin/bash", "-c", defaultPulsarCmd} // add the waiting strategy for the functions worker @@ -73,23 +73,23 @@ func WithFunctionsWorker() PulsarContainerOptions { } // WithHostConfigModifier allows to override the default host config -func WithHostConfigModifier(modifier func(hostConfig *container.HostConfig)) PulsarContainerOptions { - return func(req *PulsarContainerRequest) { +func WithHostConfigModifier(modifier func(hostConfig *container.HostConfig)) ContainerOptions { + return func(req *ContainerRequest) { req.HostConfigModifier = modifier } } // WithLogConsumer allows to add log consumers to the container. They will be automatically started and stopped by the StartContainer function // but it's a responsibility of the caller to stop them calling StopLogProducer -func WithLogConsumers(consumer ...testcontainers.LogConsumer) PulsarContainerOptions { - return func(req *PulsarContainerRequest) { +func WithLogConsumers(consumer ...testcontainers.LogConsumer) ContainerOptions { + return func(req *ContainerRequest) { req.logConsumers = append(req.logConsumers, consumer...) } } // WithPulsarImage allows to override the default Pulsar image -func WithPulsarImage(image string) PulsarContainerOptions { - return func(req *PulsarContainerRequest) { +func WithPulsarImage(image string) ContainerOptions { + return func(req *ContainerRequest) { if image == "" { image = defaultPulsarImage } @@ -98,8 +98,8 @@ func WithPulsarImage(image string) PulsarContainerOptions { } } -func WithTransactions() PulsarContainerOptions { - return func(req *PulsarContainerRequest) { +func WithTransactions() ContainerOptions { + return func(req *ContainerRequest) { req.ContainerRequest.Env["PULSAR_PREFIX_transactionCoordinatorEnabled"] = "true" // add the waiting strategy for the transaction topic @@ -122,7 +122,7 @@ func WithTransactions() PulsarContainerOptions { // - the Pulsar admin API ("/admin/v2/clusters") to be ready on port 8080/tcp and return the response `["standalone"]` // - the log message "Successfully updated the policies on namespace public/default" // - command: "/bin/bash -c /pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf && bin/pulsar standalone --no-functions-worker -nss" -func StartContainer(ctx context.Context, opts ...PulsarContainerOptions) (*PulsarContainer, error) { +func StartContainer(ctx context.Context, opts ...ContainerOptions) (*Container, error) { req := testcontainers.ContainerRequest{ Image: defaultPulsarImage, Env: map[string]string{}, @@ -131,7 +131,7 @@ func StartContainer(ctx context.Context, opts ...PulsarContainerOptions) (*Pulsa Cmd: []string{"/bin/bash", "-c", strings.Join([]string{defaultPulsarCmd, detaultPulsarCmdWithoutFunctionsWorker}, " ")}, } - pulsarRequest := PulsarContainerRequest{ + pulsarRequest := ContainerRequest{ ContainerRequest: req, logConsumers: []testcontainers.LogConsumer{}, } @@ -153,7 +153,7 @@ func StartContainer(ctx context.Context, opts ...PulsarContainerOptions) (*Pulsa return nil, err } - pc := &PulsarContainer{ + pc := &Container{ Container: c, LogConsumers: pulsarRequest.logConsumers, URI: fmt.Sprintf("pulsar://127.0.0.1:%v", pulsarPort.Int()), diff --git a/modules/pulsar/pulsar_test.go b/modules/pulsar/pulsar_test.go index 42c6eba529..4255aa9db6 100644 --- a/modules/pulsar/pulsar_test.go +++ b/modules/pulsar/pulsar_test.go @@ -34,14 +34,14 @@ func TestPulsar(t *testing.T) { tests := []struct { name string - opts []testcontainerspulsar.PulsarContainerOptions + opts []testcontainerspulsar.ContainerOptions }{ { name: "default", }, { name: "with modifiers", - opts: []testcontainerspulsar.PulsarContainerOptions{ + opts: []testcontainerspulsar.ContainerOptions{ testcontainerspulsar.WithConfigModifier(func(config *container.Config) { config.Env = append(config.Env, "PULSAR_MEM= -Xms512m -Xmx512m -XX:MaxDirectMemorySize=512m") }), @@ -59,19 +59,19 @@ func TestPulsar(t *testing.T) { }, { name: "with functions worker", - opts: []testcontainerspulsar.PulsarContainerOptions{ + opts: []testcontainerspulsar.ContainerOptions{ testcontainerspulsar.WithFunctionsWorker(), }, }, { name: "with transactions", - opts: []testcontainerspulsar.PulsarContainerOptions{ + opts: []testcontainerspulsar.ContainerOptions{ testcontainerspulsar.WithTransactions(), }, }, { name: "with log consumers", - opts: []testcontainerspulsar.PulsarContainerOptions{ + opts: []testcontainerspulsar.ContainerOptions{ testcontainerspulsar.WithLogConsumers(&testLogConsumer{}), }, }, From 7b9e8c6f9340e649e7fa5449695146e9e85244c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Wed, 22 Feb 2023 12:34:17 +0100 Subject: [PATCH 20/29] chore: support retrieving broker and http admin URLs from the container --- modules/pulsar/pulsar.go | 40 +++++++++++++++++++++++++++++------ modules/pulsar/pulsar_test.go | 7 +++++- 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/modules/pulsar/pulsar.go b/modules/pulsar/pulsar.go index 26815ce751..cbc8172104 100644 --- a/modules/pulsar/pulsar.go +++ b/modules/pulsar/pulsar.go @@ -8,6 +8,7 @@ import ( "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/network" + "github.com/docker/go-connections/nat" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" ) @@ -31,7 +32,38 @@ var defaultWaitStrategies = wait.ForAll( type Container struct { testcontainers.Container LogConsumers []testcontainers.LogConsumer // Needs to be exported to control the stop from the caller - URI string +} + +func (c *Container) BrokerURL(ctx context.Context) (string, error) { + return c.resolveURL(ctx, defaultPulsarPort) +} + +func (c *Container) HTTPServiceURL(ctx context.Context) (string, error) { + return c.resolveURL(ctx, defaultPulsarAdminPort) +} + +func (c *Container) resolveURL(ctx context.Context, port nat.Port) (string, error) { + provider, err := testcontainers.NewDockerProvider() + if err != nil { + return "", err + } + + host, err := provider.DaemonHost(ctx) + if err != nil { + return "", err + } + + pulsarPort, err := c.MappedPort(ctx, port) + if err != nil { + return "", err + } + + proto := "pulsar" + if port == defaultPulsarAdminPort { + proto = "http" + } + + return fmt.Sprintf("%s://%s:%v", proto, host, pulsarPort.Int()), nil } type ContainerRequest struct { @@ -148,15 +180,9 @@ func StartContainer(ctx context.Context, opts ...ContainerOptions) (*Container, return nil, err } - pulsarPort, err := c.MappedPort(ctx, defaultPulsarPort) - if err != nil { - return nil, err - } - pc := &Container{ Container: c, LogConsumers: pulsarRequest.logConsumers, - URI: fmt.Sprintf("pulsar://127.0.0.1:%v", pulsarPort.Int()), } if len(pc.LogConsumers) > 0 { diff --git a/modules/pulsar/pulsar_test.go b/modules/pulsar/pulsar_test.go index 4255aa9db6..ebe59faf31 100644 --- a/modules/pulsar/pulsar_test.go +++ b/modules/pulsar/pulsar_test.go @@ -91,8 +91,13 @@ func TestPulsar(t *testing.T) { defer c.StopLogProducer() } + brokerURL, err := c.BrokerURL(ctx) + if err != nil { + t.Fatal(err) + } + pc, err := pulsar.NewClient(pulsar.ClientOptions{ - URL: c.URI, + URL: brokerURL, OperationTimeout: 30 * time.Second, ConnectionTimeout: 30 * time.Second, }) From 15bec80d143b9eb9dc5933fff01facbcf17f87d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Wed, 22 Feb 2023 13:11:16 +0100 Subject: [PATCH 21/29] chore: support passing pulsar envs to the container request --- modules/pulsar/pulsar.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/modules/pulsar/pulsar.go b/modules/pulsar/pulsar.go index cbc8172104..4260ac9305 100644 --- a/modules/pulsar/pulsar.go +++ b/modules/pulsar/pulsar.go @@ -119,6 +119,13 @@ func WithLogConsumers(consumer ...testcontainers.LogConsumer) ContainerOptions { } } +// WithPulsarEnv allows to use the native APIs and set each variable with PULSAR_PREFIX_ as prefix. +func WithPulsarEnv(configVar string, configValue string) ContainerOptions { + return func(req *ContainerRequest) { + req.ContainerRequest.Env["PULSAR_PREFIX_"+configVar] = configValue + } +} + // WithPulsarImage allows to override the default Pulsar image func WithPulsarImage(image string) ContainerOptions { return func(req *ContainerRequest) { @@ -132,7 +139,7 @@ func WithPulsarImage(image string) ContainerOptions { func WithTransactions() ContainerOptions { return func(req *ContainerRequest) { - req.ContainerRequest.Env["PULSAR_PREFIX_transactionCoordinatorEnabled"] = "true" + WithPulsarEnv("transactionCoordinatorEnabled", "true")(req) // add the waiting strategy for the transaction topic defaultWaitStrategies.Strategies = append( From 751f51fd800ff1b71e8b688e58d276e9315ca38d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Wed, 22 Feb 2023 13:11:32 +0100 Subject: [PATCH 22/29] docs: copy Java docs --- docs/modules/pulsar.md | 57 +++++++++++++++++++++++++++++++++-- modules/pulsar/pulsar_test.go | 21 +++++++++++++ 2 files changed, 75 insertions(+), 3 deletions(-) diff --git a/docs/modules/pulsar.md b/docs/modules/pulsar.md index a0b56c8d1b..819254b464 100644 --- a/docs/modules/pulsar.md +++ b/docs/modules/pulsar.md @@ -1,9 +1,60 @@ -# Pulsar +# Apache Pulsar + +Testcontainers can be used to automatically create [Apache Pulsar](https://pulsar.apache.org) containers without external services. + +It's based on the official Apache Pulsar docker image, so it is recommended to read the [official guide](https://pulsar.apache.org/docs/next/getting-started-docker/). + +## Adding this module to your project dependencies + +Please run the following command to add the LocalStack module to your Go dependencies: + +``` +go get github.com/testcontainers/testcontainers-go/modules/pulsar +``` + +## Usage example + +Create a `Pulsar` container to use it in your tests: -[Creating an Apache Pulsar container](../../modules/pulsar/pulsar.go) +[Creating a Pulsar container](../../modules/pulsar/pulsar_test.go) inside_block:startPulsarContainer +where the `tt.opts` are the options to configure the container. See the [Container Options](#container-options) section for more details. + +Then you can retrieve the broker and the admin url: + + +[Get broker and admin urls](../../modules/pulsar/pulsar_test.go) inside_block:getPulsarURLs + + +## Options + +When starting the Pulsar container, you can pass options in a variadic way to configure it. + +### Pulsar Configuration +If you need to set Pulsar configuration variables you can use the `WithPulsarEnv` to set Pulsar environment variables: the `PULSAR_PREFIX_` prefix will be automatically added for you. + +For example, if you want to enable `brokerDeduplicationEnabled`: + + +[Set configuration variables](../../modules/pulsar/pulsar_test.go) inside_block:addPulsarEnv + + +It will result in the `PULSAR_PREFIX_brokerDeduplicationEnabled=true` environment variable being set in the container request. + +### Pulsar IO + +If you need to test Pulsar IO framework you can enable the Pulsar Functions Worker with the `WithFunctionsWorker` option: + + +[Create a Pulsar container with functions worker](../../modules/pulsar/pulsar_test.go) inside_block:withFunctionsWorker + + +### Pulsar Transactions + +If you need to test Pulsar Transactions you can enable the transactions feature: + -[Test for an Apache Pulsar container](../../modules/pulsar/pulsar_test.go) +[Create a Pulsar container with transactions](../../modules/pulsar/pulsar_test.go) inside_block:withTransactions diff --git a/modules/pulsar/pulsar_test.go b/modules/pulsar/pulsar_test.go index ebe59faf31..54efaba7a1 100644 --- a/modules/pulsar/pulsar_test.go +++ b/modules/pulsar/pulsar_test.go @@ -3,12 +3,14 @@ package pulsar_test import ( "context" "fmt" + "strings" "testing" "time" "github.com/apache/pulsar-client-go/pulsar" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/network" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" testcontainerspulsar "github.com/testcontainers/testcontainers-go/modules/pulsar" @@ -42,6 +44,9 @@ func TestPulsar(t *testing.T) { { name: "with modifiers", opts: []testcontainerspulsar.ContainerOptions{ + // addPulsarEnv { + testcontainerspulsar.WithPulsarEnv("brokerDeduplicationEnabled", "true"), + // } testcontainerspulsar.WithConfigModifier(func(config *container.Config) { config.Env = append(config.Env, "PULSAR_MEM= -Xms512m -Xmx512m -XX:MaxDirectMemorySize=512m") }), @@ -60,13 +65,17 @@ func TestPulsar(t *testing.T) { { name: "with functions worker", opts: []testcontainerspulsar.ContainerOptions{ + // withFunctionsWorker { testcontainerspulsar.WithFunctionsWorker(), + // } }, }, { name: "with transactions", opts: []testcontainerspulsar.ContainerOptions{ + // withTransactions { testcontainerspulsar.WithTransactions(), + // } }, }, { @@ -79,10 +88,12 @@ func TestPulsar(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + // startPulsarContainer { c, err := testcontainerspulsar.StartContainer( ctx, tt.opts..., ) + // } if err != nil { t.Fatal(err) } @@ -91,11 +102,21 @@ func TestPulsar(t *testing.T) { defer c.StopLogProducer() } + // getPulsarURLs { brokerURL, err := c.BrokerURL(ctx) if err != nil { t.Fatal(err) } + serviceURL, err := c.HTTPServiceURL(ctx) + if err != nil { + t.Fatal(err) + } + // } + + assert.True(t, strings.HasPrefix(brokerURL, "pulsar://")) + assert.True(t, strings.HasPrefix(serviceURL, "http://")) + pc, err := pulsar.NewClient(pulsar.ClientOptions{ URL: brokerURL, OperationTimeout: 30 * time.Second, From 6a55b6650e668200820d1548631a12225ae187ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Wed, 22 Feb 2023 13:26:43 +0100 Subject: [PATCH 23/29] docs: document modifiers --- docs/modules/pulsar.md | 36 +++++++++++++++++++++++++++++++++++ modules/pulsar/pulsar_test.go | 12 ++++++++++++ 2 files changed, 48 insertions(+) diff --git a/docs/modules/pulsar.md b/docs/modules/pulsar.md index 819254b464..0652115470 100644 --- a/docs/modules/pulsar.md +++ b/docs/modules/pulsar.md @@ -32,6 +32,13 @@ Then you can retrieve the broker and the admin url: When starting the Pulsar container, you can pass options in a variadic way to configure it. +### Pulsar Image +If you need to set a different Pulsar image you can use the `WithPulsarImage`. + + +[Set Pulsar image](../../modules/pulsar/pulsar_test.go) inside_block:setPulsarImage + + ### Pulsar Configuration If you need to set Pulsar configuration variables you can use the `WithPulsarEnv` to set Pulsar environment variables: the `PULSAR_PREFIX_` prefix will be automatically added for you. @@ -58,3 +65,32 @@ If you need to test Pulsar Transactions you can enable the transactions feature: [Create a Pulsar container with transactions](../../modules/pulsar/pulsar_test.go) inside_block:withTransactions + +### Log consumers +If you need to collect the logs from the Pulsar container, you can add your own LogConsumer with the `WithLogConsumers` function, which accepts a variadic argument of LogConsumers. + + +[Adding LogConsumers](../../modules/pulsar/pulsar_test.go) inside_block:withLogConsumers + + +An example of a LogConsumer could be the following: + + +[Example LogConsumer](../../modules/pulsar/pulsar_test.go) inside_block:logConsumerForTesting + + +!!!warning + You will need to explicitly stop the producer in your tests. + +If you want to know more about LogConsumers, please check the [Following Container Logs](../features/follow_logs.md) documentation. + +## Advanced configuration + +In the case you need a more advanced settings regarding the config, host config and endpoint settings Docker types, you can leverage the modifier functions that are available in +the ContainerRequest. The Pulsar container exposes a way to interact with those modifiers in a simple manner, using the aforementioned options in the `StartContainer` function: + + +[Advanced Docker settings](../../modules/pulsar/pulsar_test.go) inside_block:advancedDockerSettings + + +Please check out the [Advanced Settings](../features/creating_container.md#advanced-settings) for creating containers documentation. diff --git a/modules/pulsar/pulsar_test.go b/modules/pulsar/pulsar_test.go index 54efaba7a1..56d3606285 100644 --- a/modules/pulsar/pulsar_test.go +++ b/modules/pulsar/pulsar_test.go @@ -16,12 +16,17 @@ import ( testcontainerspulsar "github.com/testcontainers/testcontainers-go/modules/pulsar" ) +// logConsumerForTesting { +// logConsumer is a testcontainers.LogConsumer that prints the log to stdout type testLogConsumer struct{} +// Accept prints the log to stdout func (lc *testLogConsumer) Accept(l testcontainers.Log) { fmt.Print(string(l.Content)) } +// } + func TestPulsar(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -44,9 +49,13 @@ func TestPulsar(t *testing.T) { { name: "with modifiers", opts: []testcontainerspulsar.ContainerOptions{ + // setPulsarImage { + testcontainerspulsar.WithPulsarImage("docker.io/apachepulsar/pulsar:2.10.2"), + // } // addPulsarEnv { testcontainerspulsar.WithPulsarEnv("brokerDeduplicationEnabled", "true"), // } + // advancedDockerSettings { testcontainerspulsar.WithConfigModifier(func(config *container.Config) { config.Env = append(config.Env, "PULSAR_MEM= -Xms512m -Xmx512m -XX:MaxDirectMemorySize=512m") }), @@ -60,6 +69,7 @@ func TestPulsar(t *testing.T) { Aliases: []string{"pulsar"}, } }), + // } }, }, { @@ -81,7 +91,9 @@ func TestPulsar(t *testing.T) { { name: "with log consumers", opts: []testcontainerspulsar.ContainerOptions{ + // withLogConsumers { testcontainerspulsar.WithLogConsumers(&testLogConsumer{}), + // } }, }, } From d464301fc769a3c9b1b7d83308946e060c3c17e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Wed, 22 Feb 2023 15:07:33 +0100 Subject: [PATCH 24/29] docs: fix heading --- docs/modules/pulsar.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/modules/pulsar.md b/docs/modules/pulsar.md index 0652115470..2af462a929 100644 --- a/docs/modules/pulsar.md +++ b/docs/modules/pulsar.md @@ -28,7 +28,7 @@ Then you can retrieve the broker and the admin url: [Get broker and admin urls](../../modules/pulsar/pulsar_test.go) inside_block:getPulsarURLs -## Options +## Container Options When starting the Pulsar container, you can pass options in a variadic way to configure it. From 66ef87bd1b42e0099a13c02881ce6d68818bc3ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Wed, 22 Feb 2023 16:15:08 +0100 Subject: [PATCH 25/29] chore: improve tests to check that subscriptions are there --- modules/pulsar/pulsar_test.go | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/modules/pulsar/pulsar_test.go b/modules/pulsar/pulsar_test.go index 56d3606285..9b77a256d6 100644 --- a/modules/pulsar/pulsar_test.go +++ b/modules/pulsar/pulsar_test.go @@ -2,7 +2,10 @@ package pulsar_test import ( "context" + "encoding/json" "fmt" + "io" + "net/http" "strings" "testing" "time" @@ -139,9 +142,11 @@ func TestPulsar(t *testing.T) { } t.Cleanup(func() { pc.Close() }) + subscriptionName := "pulsar-test" + consumer, err := pc.Subscribe(pulsar.ConsumerOptions{ Topic: "test-topic", - SubscriptionName: "pulsar-test", + SubscriptionName: subscriptionName, Type: pulsar.Exclusive, }) if err != nil { @@ -180,6 +185,31 @@ func TestPulsar(t *testing.T) { t.Fatal("received unexpected message bytes") } } + + // get topic statistics using the Admin endpoint + httpClient := http.Client{ + Timeout: 30 * time.Second, + } + + resp, err := httpClient.Get(serviceURL + "/admin/v2/persistent/public/default/test-topic/stats") + require.Nil(t, err) + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + require.Nil(t, err) + + var stats map[string]interface{} + err = json.Unmarshal(body, &stats) + require.Nil(t, err) + + subscriptions := stats["subscriptions"] + require.NotNil(t, subscriptions) + + subscriptionsMap := subscriptions.(map[string]interface{}) + + // check that the subscription exists + _, ok := subscriptionsMap[subscriptionName] + assert.True(t, ok) }) } } From 09d0893e51afa8dcd2c50e2d8ba266a88f2f73d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Wed, 22 Feb 2023 16:23:26 +0100 Subject: [PATCH 26/29] chore: simplify asserts --- modules/pulsar/pulsar_test.go | 24 ++++++------------------ 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/modules/pulsar/pulsar_test.go b/modules/pulsar/pulsar_test.go index 9b77a256d6..833e457fb1 100644 --- a/modules/pulsar/pulsar_test.go +++ b/modules/pulsar/pulsar_test.go @@ -109,9 +109,7 @@ func TestPulsar(t *testing.T) { tt.opts..., ) // } - if err != nil { - t.Fatal(err) - } + require.Nil(t, err) if len(c.LogConsumers) > 0 { defer c.StopLogProducer() @@ -119,14 +117,10 @@ func TestPulsar(t *testing.T) { // getPulsarURLs { brokerURL, err := c.BrokerURL(ctx) - if err != nil { - t.Fatal(err) - } + require.Nil(t, err) serviceURL, err := c.HTTPServiceURL(ctx) - if err != nil { - t.Fatal(err) - } + require.Nil(t, err) // } assert.True(t, strings.HasPrefix(brokerURL, "pulsar://")) @@ -137,9 +131,7 @@ func TestPulsar(t *testing.T) { OperationTimeout: 30 * time.Second, ConnectionTimeout: 30 * time.Second, }) - if err != nil { - t.Fatal(err) - } + require.Nil(t, err) t.Cleanup(func() { pc.Close() }) subscriptionName := "pulsar-test" @@ -149,9 +141,7 @@ func TestPulsar(t *testing.T) { SubscriptionName: subscriptionName, Type: pulsar.Exclusive, }) - if err != nil { - t.Fatal(err) - } + require.Nil(t, err) t.Cleanup(func() { consumer.Close() }) msgChan := make(chan []byte) @@ -168,9 +158,7 @@ func TestPulsar(t *testing.T) { producer, err := pc.CreateProducer(pulsar.ProducerOptions{ Topic: "test-topic", }) - if err != nil { - t.Fatal(err) - } + require.Nil(t, err) producer.Send(ctx, &pulsar.ProducerMessage{ Payload: []byte("hello world"), From 279034e1b7534549af4f1427abe12ea5eb251017 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Wed, 22 Feb 2023 16:26:05 +0100 Subject: [PATCH 27/29] fix: update paths in GH workflow --- .github/workflows/module-pulsar.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/module-pulsar.yml b/.github/workflows/module-pulsar.yml index 4aef3d249c..ef4b544eb6 100644 --- a/.github/workflows/module-pulsar.yml +++ b/.github/workflows/module-pulsar.yml @@ -24,15 +24,15 @@ jobs: uses: actions/checkout@v3 - name: modVerify - working-directory: ./examples/pulsar + working-directory: ./modules/pulsar run: go mod verify - name: modTidy - working-directory: ./examples/pulsar + working-directory: ./modules/pulsar run: make tools-tidy - name: gotestsum - working-directory: ./examples/pulsar + working-directory: ./modules/pulsar run: make test-unit - name: Run checker From f673674696a4df25bced5f93d4e78149c6885546 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Wed, 22 Feb 2023 16:43:58 +0100 Subject: [PATCH 28/29] fix: update dependabot for pulsar module --- .github/dependabot.yml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 92f9ee5cf1..0f3215cccc 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -24,6 +24,12 @@ updates: interval: weekly open-pull-requests-limit: 3 rebase-strategy: disabled + - package-ecosystem: gomod + directory: /modules/pulsar + schedule: + interval: weekly + open-pull-requests-limit: 3 + rebase-strategy: disabled - package-ecosystem: gomod directory: /examples/bigtable schedule: @@ -84,12 +90,6 @@ updates: interval: weekly open-pull-requests-limit: 3 rebase-strategy: disabled - - package-ecosystem: gomod - directory: /examples/pulsar - schedule: - interval: weekly - open-pull-requests-limit: 3 - rebase-strategy: disabled - package-ecosystem: gomod directory: /examples/redis schedule: From c1fb4088866a288c16163366fd8de0c7fbb989d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Manuel=20de=20la=20Pe=C3=B1a?= Date: Mon, 6 Mar 2023 11:09:48 +0100 Subject: [PATCH 29/29] fix: typos --- docs/modules/pulsar.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/modules/pulsar.md b/docs/modules/pulsar.md index 2af462a929..a28d7693d7 100644 --- a/docs/modules/pulsar.md +++ b/docs/modules/pulsar.md @@ -6,7 +6,7 @@ It's based on the official Apache Pulsar docker image, so it is recommended to r ## Adding this module to your project dependencies -Please run the following command to add the LocalStack module to your Go dependencies: +Please run the following command to add the Apache Pulsar module to your Go dependencies: ``` go get github.com/testcontainers/testcontainers-go/modules/pulsar @@ -86,7 +86,7 @@ If you want to know more about LogConsumers, please check the [Following Contain ## Advanced configuration -In the case you need a more advanced settings regarding the config, host config and endpoint settings Docker types, you can leverage the modifier functions that are available in +In the case you need a more advanced configuration regarding the config, host config and endpoint settings Docker types, you can leverage the modifier functions that are available in the ContainerRequest. The Pulsar container exposes a way to interact with those modifiers in a simple manner, using the aforementioned options in the `StartContainer` function: