Skip to content

Commit 451d278

Browse files
gudjonragnarGudjon Ragnar Brynjarssonalexanderankin
authoredMar 31, 2024··
fix(kafka): Add redpanda testcontainer module (#441)
Co-authored-by: Gudjon Ragnar Brynjarsson <gudjon.brynjarsson@controlant.com> Co-authored-by: Dave Ankin <daveankin@gmail.com>
1 parent 302c73d commit 451d278

File tree

6 files changed

+154
-7
lines changed

6 files changed

+154
-7
lines changed
 

‎modules/kafka/README.rst

+1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
.. autoclass:: testcontainers.kafka.KafkaContainer
22
.. title:: testcontainers.kafka.KafkaContainer
3+
.. autoclass:: testcontainers.kafka.RedpandaContainer

‎modules/kafka/testcontainers/kafka/__init__.py

+6
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@
66
from testcontainers.core.container import DockerContainer
77
from testcontainers.core.utils import raise_for_deprecated_parameter
88
from testcontainers.core.waiting_utils import wait_for_logs
9+
from testcontainers.kafka._redpanda import RedpandaContainer
10+
11+
__all__ = [
12+
"KafkaContainer",
13+
"RedpandaContainer",
14+
]
915

1016

1117
class KafkaContainer(DockerContainer):
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import tarfile
2+
import time
3+
from io import BytesIO
4+
from textwrap import dedent
5+
6+
from testcontainers.core.container import DockerContainer
7+
from testcontainers.core.waiting_utils import wait_for_logs
8+
9+
10+
class RedpandaContainer(DockerContainer):
11+
"""
12+
Redpanda container.
13+
14+
Example:
15+
16+
.. doctest::
17+
18+
>>> from testcontainers.kafka import RedpandaContainer
19+
20+
>>> with RedpandaContainer() as redpanda:
21+
... connection = redpanda.get_bootstrap_server()
22+
"""
23+
24+
TC_START_SCRIPT = "/tc-start.sh"
25+
26+
def __init__(
27+
self,
28+
image: str = "docker.redpanda.com/redpandadata/redpanda:v23.1.13",
29+
**kwargs,
30+
) -> None:
31+
kwargs["entrypoint"] = "sh"
32+
super().__init__(image, **kwargs)
33+
self.redpanda_port = 9092
34+
self.schema_registry_port = 8081
35+
self.with_exposed_ports(self.redpanda_port, self.schema_registry_port)
36+
37+
def get_bootstrap_server(self) -> str:
38+
host = self.get_container_host_ip()
39+
port = self.get_exposed_port(self.redpanda_port)
40+
return f"{host}:{port}"
41+
42+
def get_schema_registry_address(self) -> str:
43+
host = self.get_container_host_ip()
44+
port = self.get_exposed_port(self.schema_registry_port)
45+
return f"http://{host}:{port}"
46+
47+
def tc_start(self) -> None:
48+
host = self.get_container_host_ip()
49+
port = self.get_exposed_port(self.redpanda_port)
50+
51+
data = (
52+
dedent(
53+
f"""
54+
#!/bin/bash
55+
/usr/bin/rpk redpanda start --mode dev-container --smp 1 --memory 1G \
56+
--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 \
57+
--advertise-kafka-addr PLAINTEXT://127.0.0.1:29092,OUTSIDE://{host}:{port}
58+
"""
59+
)
60+
.strip()
61+
.encode("utf-8")
62+
)
63+
64+
self.create_file(data, RedpandaContainer.TC_START_SCRIPT)
65+
66+
def start(self, timeout=10) -> "RedpandaContainer":
67+
script = RedpandaContainer.TC_START_SCRIPT
68+
command = f'-c "while [ ! -f {script} ]; do sleep 0.1; done; sh {script}"'
69+
self.with_command(command)
70+
super().start()
71+
self.tc_start()
72+
wait_for_logs(self, r".*Started Kafka API server.*", timeout=timeout)
73+
return self
74+
75+
def create_file(self, content: bytes, path: str) -> None:
76+
with BytesIO() as archive, tarfile.TarFile(fileobj=archive, mode="w") as tar:
77+
tarinfo = tarfile.TarInfo(name=path)
78+
tarinfo.size = len(content)
79+
tarinfo.mtime = time.time()
80+
tar.addfile(tarinfo, BytesIO(content))
81+
archive.seek(0)
82+
self.get_wrapped_container().put_archive("/", archive)

‎modules/kafka/tests/test_redpanda.py

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import pytest
2+
from requests import post, get
3+
from json import dumps
4+
5+
from kafka import KafkaConsumer, KafkaProducer, TopicPartition, KafkaAdminClient
6+
from kafka.admin import NewTopic
7+
8+
from testcontainers.kafka import RedpandaContainer
9+
10+
11+
def test_redpanda_producer_consumer():
12+
with RedpandaContainer() as container:
13+
produce_and_consume_message(container)
14+
15+
16+
@pytest.mark.parametrize("version", ["v23.1.13", "v23.3.10"])
17+
def test_redpanda_confluent_version(version):
18+
with RedpandaContainer(image=f"docker.redpanda.com/redpandadata/redpanda:{version}") as container:
19+
produce_and_consume_message(container)
20+
21+
22+
def test_schema_registry():
23+
with RedpandaContainer() as container:
24+
address = container.get_schema_registry_address()
25+
subject_name = "test-subject-value"
26+
url = f"{address}/subjects"
27+
28+
payload = {"schema": dumps({"type": "string"})}
29+
headers = {"Content-Type": "application/vnd.schemaregistry.v1+json"}
30+
create_result = post(f"{url}/{subject_name}/versions", data=dumps(payload), headers=headers)
31+
assert create_result.status_code == 200
32+
33+
result = get(url)
34+
assert result.status_code == 200
35+
assert subject_name in result.json()
36+
37+
38+
def produce_and_consume_message(container):
39+
topic = "test-topic"
40+
bootstrap_server = container.get_bootstrap_server()
41+
42+
admin = KafkaAdminClient(bootstrap_servers=[bootstrap_server])
43+
admin.create_topics([NewTopic(topic, 1, 1)])
44+
45+
producer = KafkaProducer(bootstrap_servers=[bootstrap_server])
46+
future = producer.send(topic, b"verification message")
47+
future.get(timeout=10)
48+
producer.close()
49+
50+
consumer = KafkaConsumer(bootstrap_servers=[bootstrap_server])
51+
tp = TopicPartition(topic, 0)
52+
consumer.assign([tp])
53+
consumer.seek_to_beginning()
54+
assert consumer.end_offsets([tp])[tp] == 1, "Expected exactly one test message to be present on test topic !"

‎poetry.lock

+10-6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,9 @@ psycopg2-binary = "*"
137137
pg8000 = "*"
138138
sqlalchemy = "*"
139139
psycopg = "*"
140-
kafka-python = "^2.0.2"
141140
cassandra-driver = "*"
142141
pytest-asyncio = "0.23.5"
142+
kafka-python-ng = "^2.2.0"
143143

144144
[[tool.poetry.source]]
145145
name = "PyPI"

0 commit comments

Comments
 (0)
Please sign in to comment.