Skip to content

Commit 0ce4fec

Browse files
authoredJul 4, 2024··
fix(kafka): add a flag to limit to first hostname for use with networks (#638)
fix #637
1 parent 41fbdd0 commit 0ce4fec

File tree

2 files changed

+41
-3
lines changed

2 files changed

+41
-3
lines changed
 

‎modules/kafka/testcontainers/kafka/__init__.py

+20-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import tarfile
22
import time
3+
from dataclasses import dataclass, field
34
from io import BytesIO
5+
from os import environ
46
from textwrap import dedent
57

68
from typing_extensions import Self
@@ -14,7 +16,21 @@
1416
__all__ = [
1517
"KafkaContainer",
1618
"RedpandaContainer",
19+
"kafka_config",
1720
]
21+
LIMIT_BROKER_ENV_VAR = "TC_KAFKA_LIMIT_BROKER_TO_FIRST_HOST"
22+
23+
24+
@dataclass
25+
class _KafkaConfig:
26+
limit_broker_to_first_host: bool = field(default_factory=lambda: environ.get(LIMIT_BROKER_ENV_VAR) == "true")
27+
"""
28+
This option is useful for a setup with a network,
29+
see testcontainers/testcontainers-python#637 for more details
30+
"""
31+
32+
33+
kafka_config = _KafkaConfig()
1834

1935

2036
class KafkaContainer(DockerContainer):
@@ -136,7 +152,10 @@ def get_bootstrap_server(self) -> str:
136152
def tc_start(self) -> None:
137153
host = self.get_container_host_ip()
138154
port = self.get_exposed_port(self.port)
139-
listeners = f"PLAINTEXT://{host}:{port},BROKER://$(hostname -i):9092"
155+
if kafka_config.limit_broker_to_first_host:
156+
listeners = f"PLAINTEXT://{host}:{port},BROKER://$(hostname -i | cut -d' ' -f1):9092"
157+
else:
158+
listeners = f"PLAINTEXT://{host}:{port},BROKER://$(hostname -i):9092"
140159
data = (
141160
dedent(
142161
f"""

‎modules/kafka/tests/test_kafka.py

+21-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
from kafka import KafkaConsumer, KafkaProducer, TopicPartition
1+
import pytest
2+
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer, TopicPartition
23

3-
from testcontainers.kafka import KafkaContainer
4+
from testcontainers.core.network import Network
5+
from testcontainers.kafka import KafkaContainer, kafka_config
46

57

68
def test_kafka_producer_consumer():
@@ -20,6 +22,23 @@ def test_kafka_producer_consumer_custom_port():
2022
produce_and_consume_kafka_message(container)
2123

2224

25+
def test_kafka_on_networks(monkeypatch: pytest.MonkeyPatch):
26+
"""
27+
this test case comes from testcontainers/testcontainers-python#637
28+
"""
29+
monkeypatch.setattr(kafka_config, "limit_broker_to_first_host", True)
30+
31+
with Network() as network:
32+
kafka_ctr = KafkaContainer()
33+
kafka_ctr.with_network(network)
34+
kafka_ctr.with_network_aliases("kafka")
35+
36+
with kafka_ctr:
37+
print("started") # Will not reach here and timeout
38+
admin_client = KafkaAdminClient(bootstrap_servers=[kafka_ctr.get_bootstrap_server()])
39+
print(admin_client.describe_cluster())
40+
41+
2342
def produce_and_consume_kafka_message(container):
2443
topic = "test-topic"
2544
bootstrap_server = container.get_bootstrap_server()

0 commit comments

Comments
 (0)
Please sign in to comment.