Skip to content

Commit 59cb6fc

Browse files
f18malexanderankin
andauthoredJun 18, 2024··
fix(mqtt): Add mqtt.MosquittoContainer (#568) (#599)
This PR is adding a new MosquittoContainer class that helps creating integration tests for MQTT clients. The MosquittoContainer class contains a bunch of methods to help with testing: * checking number of messages received * watching topics * check last payload published on a particular topic * etc This PR lacks tests. I can add them if there is interest in this PR... --------- Co-authored-by: Dave Ankin <daveankin@gmail.com>
1 parent ec76df2 commit 59cb6fc

File tree

7 files changed

+215
-1
lines changed

7 files changed

+215
-1
lines changed
 

‎index.rst

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ testcontainers-python facilitates the use of Docker containers for functional an
3131
modules/milvus/README
3232
modules/minio/README
3333
modules/mongodb/README
34+
modules/mqtt/README
3435
modules/mssql/README
3536
modules/mysql/README
3637
modules/nats/README

‎modules/mqtt/README.rst

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
.. autoclass:: testcontainers.mqtt.MosquittoContainer
2+
.. title:: testcontainers.mqtt.MosquittoContainer
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
#
2+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
3+
# not use this file except in compliance with the License. You may obtain
4+
# a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
10+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
11+
# License for the specific language governing permissions and limitations
12+
# under the License.
13+
14+
from pathlib import Path
15+
from typing import TYPE_CHECKING, Optional
16+
17+
from typing_extensions import Self
18+
19+
from testcontainers.core.container import DockerContainer
20+
from testcontainers.core.waiting_utils import wait_container_is_ready, wait_for_logs
21+
22+
if TYPE_CHECKING:
23+
from paho.mqtt.client import Client
24+
from paho.mqtt.enums import MQTTErrorCode
25+
26+
27+
class MosquittoContainer(DockerContainer):
28+
"""
29+
Specialization of DockerContainer for MQTT broker Mosquitto.
30+
Example:
31+
32+
.. doctest::
33+
34+
>>> from testcontainers.mqtt import MosquittoContainer
35+
36+
>>> with MosquittoContainer() as mosquitto_broker:
37+
... mqtt_client = mosquitto_broker.get_client()
38+
"""
39+
40+
TESTCONTAINERS_CLIENT_ID = "TESTCONTAINERS-CLIENT"
41+
MQTT_PORT = 1883
42+
CONFIG_FILE = "testcontainers-mosquitto-default-configuration.conf"
43+
44+
def __init__(
45+
self,
46+
image: str = "eclipse-mosquitto:latest",
47+
# password: Optional[str] = None,
48+
**kwargs,
49+
) -> None:
50+
super().__init__(image, **kwargs)
51+
# self.password = password
52+
# reusable client context:
53+
self.client: Optional["Client"] = None
54+
55+
@wait_container_is_ready()
56+
def get_client(self) -> "Client":
57+
"""
58+
Creates and connects a client, caching the result in `self.client`
59+
returning that if it exists.
60+
61+
Connection attempts are retried using `@wait_container_is_ready`.
62+
63+
Returns:
64+
a client from the paho library
65+
"""
66+
if self.client:
67+
return self.client
68+
client, err = self.new_client()
69+
# 0 is a conventional "success" value in C, which is falsy in python
70+
if err:
71+
# retry, maybe it is not available yet
72+
raise ConnectionError(f"Failed to establish a connection: {err}")
73+
if not client.is_connected():
74+
raise TimeoutError("The Paho MQTT secondary thread has not connected yet!")
75+
self.client = client
76+
return client
77+
78+
def new_client(self, **kwargs) -> tuple["Client", "MQTTErrorCode"]:
79+
"""
80+
Get a paho.mqtt client connected to this container.
81+
Check the returned object is_connected() method before use
82+
83+
Usage of this method is required for versions <2;
84+
versions >=2 will wait for log messages to determine container readiness.
85+
There is no way to pass arguments to new_client in versions <2,
86+
please use an up-to-date version.
87+
88+
Args:
89+
**kwargs: Keyword arguments passed to `paho.mqtt.client`.
90+
91+
Returns:
92+
client: MQTT client to connect to the container.
93+
error: an error code or MQTT_ERR_SUCCESS.
94+
"""
95+
try:
96+
from paho.mqtt.client import CallbackAPIVersion, Client
97+
from paho.mqtt.enums import MQTTErrorCode
98+
except ImportError as i:
99+
raise ImportError("'pip install paho-mqtt' required for MosquittoContainer.new_client") from i
100+
101+
err = MQTTErrorCode.MQTT_ERR_SUCCESS
102+
if self.client is None:
103+
self.client = Client(
104+
client_id=MosquittoContainer.TESTCONTAINERS_CLIENT_ID,
105+
callback_api_version=CallbackAPIVersion.VERSION2,
106+
userdata=self,
107+
**kwargs,
108+
)
109+
self.client._connect_timeout = 1.0
110+
111+
# connect() is a blocking call:
112+
err = self.client.connect(self.get_container_host_ip(), int(self.get_exposed_port(self.MQTT_PORT)))
113+
self.client.loop_start() # launch a thread to call loop() and dequeue the message
114+
115+
return self.client, err
116+
117+
def start(self, configfile: Optional[str] = None) -> Self:
118+
# setup container:
119+
self.with_exposed_ports(self.MQTT_PORT)
120+
if configfile is None:
121+
# default config file
122+
configfile = Path(__file__).parent / MosquittoContainer.CONFIG_FILE
123+
self.with_volume_mapping(configfile, "/mosquitto/config/mosquitto.conf")
124+
# if self.password:
125+
# # TODO: add authentication
126+
# pass
127+
128+
# do container start
129+
super().start()
130+
131+
self._wait()
132+
return self
133+
134+
def _wait(self):
135+
if self.image.split(":")[-1].startswith("1"):
136+
import logging
137+
138+
logging.warning(
139+
"You are using version 1 of eclipse-mosquitto which is not supported for use by this module without paho-mqtt also installed"
140+
)
141+
self.get_client()
142+
else:
143+
wait_for_logs(self, r"mosquitto version \d+.\d+.\d+ running", timeout=30)
144+
145+
def stop(self, force=True, delete_volume=True) -> None:
146+
if self.client is not None:
147+
self.client.disconnect()
148+
self.client = None # force recreation of the client object at next start()
149+
super().stop(force, delete_volume)
150+
151+
def publish_message(self, topic: str, payload: str, timeout: int = 2) -> None:
152+
ret = self.get_client().publish(topic, payload)
153+
ret.wait_for_publish(timeout=timeout)
154+
if not ret.is_published():
155+
raise RuntimeError(f"Could not publish a message on topic {topic} to Mosquitto broker: {ret}")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# see https://mosquitto.org/man/mosquitto-conf-5.html
2+
3+
protocol mqtt
4+
user root
5+
log_dest stdout
6+
allow_anonymous true
7+
8+
log_type error
9+
log_type warning
10+
log_type notice
11+
log_type information
12+
13+
log_timestamp_format %Y-%m-%d %H:%M:%S
14+
persistence true
15+
persistence_location /data/
16+
17+
listener 1883
18+
protocol mqtt
19+
20+
sys_interval 1

‎modules/mqtt/tests/test_mosquitto.py

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import pytest
2+
3+
from testcontainers.mqtt import MosquittoContainer
4+
5+
VERSIONS = ["1.6.15", "2.0.18"]
6+
7+
8+
@pytest.mark.parametrize("version", VERSIONS)
9+
def test_mosquitto(version):
10+
with MosquittoContainer(image=f"eclipse-mosquitto:{version}") as container:
11+
external_port = int(container.get_exposed_port(container.MQTT_PORT))
12+
print(f"listening on port: {external_port}")
13+
14+
15+
@pytest.mark.parametrize("version", VERSIONS)
16+
def test_mosquitto_client(version):
17+
with MosquittoContainer(image=f"eclipse-mosquitto:{version}") as container:
18+
container.get_client()

‎poetry.lock

+16-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎pyproject.toml

+3
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ packages = [
4444
{ include = "testcontainers", from = "modules/minio" },
4545
{ include = "testcontainers", from = "modules/milvus" },
4646
{ include = "testcontainers", from = "modules/mongodb" },
47+
{ include = "testcontainers", from = "modules/mqtt" },
4748
{ include = "testcontainers", from = "modules/mssql" },
4849
{ include = "testcontainers", from = "modules/mysql" },
4950
{ include = "testcontainers", from = "modules/nats" },
@@ -117,6 +118,7 @@ memcached = []
117118
minio = ["minio"]
118119
milvus = []
119120
mongodb = ["pymongo"]
121+
mqtt = []
120122
mssql = ["sqlalchemy", "pymssql"]
121123
mysql = ["sqlalchemy", "pymysql"]
122124
nats = ["nats-py"]
@@ -153,6 +155,7 @@ pytest-asyncio = "0.23.5"
153155
kafka-python-ng = "^2.2.0"
154156
hvac = "*"
155157
pymilvus = "2.4.3"
158+
paho-mqtt = "2.1.0"
156159

157160
[[tool.poetry.source]]
158161
name = "PyPI"

0 commit comments

Comments
 (0)
Please sign in to comment.