Skip to content

Commit 67f7a3a

Browse files
committed
mqtt basic support
1 parent d6c1419 commit 67f7a3a

File tree

27 files changed

+797
-6
lines changed

27 files changed

+797
-6
lines changed

.github/workflows/pr_tests.yaml

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,6 +463,69 @@ jobs:
463463
if-no-files-found: error
464464
include-hidden-files: true
465465

466+
test-mqtt-smoke:
467+
if: github.event.pull_request.draft == false
468+
runs-on: ubuntu-latest
469+
steps:
470+
- uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
471+
with:
472+
persist-credentials: false
473+
- uses: astral-sh/setup-uv@1e862dfacbd1d6d858c55d9b792c756523627244 # v7.1.4
474+
with:
475+
version: "latest"
476+
- name: Set up Python
477+
uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
478+
with:
479+
python-version: "3.13"
480+
- name: Install Dependencies
481+
run: |
482+
uv pip install --system --group optionals --group testing .
483+
- name: Test
484+
run: >
485+
pytest -n auto
486+
-vv -m "mqtt and not connected"
487+
488+
test-mqtt-real:
489+
# if: github.event.pull_request.draft == false
490+
runs-on: ubuntu-latest
491+
# needs:
492+
# - test-basic
493+
# - test-mqtt-smoke
494+
services:
495+
mosquitto:
496+
image: eclipse-mosquitto:latest
497+
ports:
498+
- 1883:1883
499+
volumes:
500+
- ./dev/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf
501+
steps:
502+
- uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
503+
with:
504+
persist-credentials: false
505+
- uses: astral-sh/setup-uv@1e862dfacbd1d6d858c55d9b792c756523627244 # v7.1.4
506+
with:
507+
version: "latest"
508+
- name: Set up Python
509+
uses: actions/setup-python@e797f83bcb11b83ae66e0230d6156d7c80228e7c # v6.0.0
510+
with:
511+
python-version: "3.13"
512+
- name: Install Dependencies
513+
run: |
514+
uv pip install --system --group optionals --group testing .
515+
- name: Test
516+
run: >
517+
pytest --cov --cov-report=
518+
-vv -m "(slow and mqtt and connected) or (mqtt and connected)"
519+
- name: Rename coverage file
520+
run: mkdir coverage && mv .coverage coverage/.coverage.mqtt
521+
- name: Store coverage files
522+
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0
523+
with:
524+
name: .coverage.mqtt
525+
path: coverage
526+
if-no-files-found: error
527+
include-hidden-files: true
528+
466529
coverage-combine:
467530
if: github.event.pull_request.draft == false
468531
needs:

dev/mosquitto/mosquitto.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
listener 1883
2+
allow_anonymous true

docker-compose.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,15 @@ services:
4141
security_opt:
4242
- no-new-privileges:true
4343

44+
mosquitto:
45+
image: eclipse-mosquitto:latest
46+
ports:
47+
- 1883:1883
48+
security_opt:
49+
- no-new-privileges:true
50+
volumes:
51+
- ./dev/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf
52+
4453
faststream:
4554
build: .
4655
volumes:

faststream/exceptions.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,11 @@ def __str__(self) -> str:
195195
pip install "faststream[nats]"
196196
"""
197197

198+
INSTALL_FASTSTREAM_MQTT = """
199+
To use MQTT with FastStream, please install dependencies:\n
200+
pip install "faststream[mqtt]"
201+
"""
202+
198203
INSTALL_UVICORN = """
199204
To run FastStream ASGI App via CLI, please install uvicorn:\n
200205
pip install uvicorn

faststream/mqtt/__init__.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
try:
2+
from .broker import MQTTBroker
3+
except ImportError as e:
4+
if "'aiomqtt'" not in e.msg:
5+
raise
6+
7+
from faststream.exceptions import INSTALL_FASTSTREAM_MQTT
8+
9+
raise ImportError(INSTALL_FASTSTREAM_MQTT) from e
10+
11+
__all__ = ("MQTTBroker",)

faststream/mqtt/broker/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .broker import MQTTBroker
2+
3+
__all__ = ("MQTTBroker",)

faststream/mqtt/broker/broker.py

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
from collections.abc import Iterable
2+
from typing import TYPE_CHECKING, Any, Optional
3+
4+
import aiomqtt
5+
import anyio
6+
from fast_depends import dependency_provider
7+
from typing_extensions import override
8+
9+
from faststream._internal.basic_types import SendableMessage
10+
from faststream._internal.broker import BrokerUsecase
11+
from faststream._internal.constants import EMPTY
12+
from faststream._internal.context.repository import ContextRepo
13+
from faststream._internal.di import FastDependsConfig
14+
from faststream.mqtt.broker.registrator import MQTTRegistrator
15+
from faststream.mqtt.configs.broker import MQTTBrokerConfig
16+
from faststream.mqtt.response import MQTTPublishCommand
17+
from faststream.response import PublishType
18+
from faststream.security import BaseSecurity
19+
from faststream.specification.schema import BrokerSpec, Tag, TagDict
20+
21+
if TYPE_CHECKING:
22+
from types import TracebackType
23+
24+
from fast_depends import Provider
25+
from fast_depends.library.serializer import SerializerProto
26+
27+
28+
class MQTTBroker(
29+
MQTTRegistrator,
30+
BrokerUsecase[aiomqtt.Message, aiomqtt.Client],
31+
):
32+
def __init__(
33+
self,
34+
*,
35+
# mqtt broker params
36+
hostname: str = "localhost",
37+
port: int = 1883,
38+
username: str | None = None,
39+
password: str | None = None,
40+
keepalive: int = 60,
41+
bind_address: str = "",
42+
bind_port: int = 0,
43+
routers: Iterable[MQTTRegistrator] = (),
44+
# FastDepends args
45+
apply_types: bool = True,
46+
serializer: Optional["SerializerProto"] = EMPTY,
47+
provider: Optional["Provider"] = None,
48+
context: Optional["ContextRepo"] = None,
49+
# AsyncAPI args
50+
security: Optional["BaseSecurity"] = None,
51+
specification_url: str | None = None,
52+
protocol: str | None = None,
53+
protocol_version: str | None = "auto",
54+
description: str | None = None,
55+
tags: Iterable["Tag | TagDict"] = (),
56+
) -> None:
57+
if specification_url is None:
58+
specification_url = hostname
59+
super().__init__(
60+
config=MQTTBrokerConfig(
61+
hostname="localhost",
62+
port=port,
63+
username=username,
64+
password=password,
65+
keepalive=keepalive,
66+
bind_address=bind_address,
67+
bind_port=bind_port,
68+
extra_context={
69+
"broker": self,
70+
},
71+
fd_config=FastDependsConfig(
72+
use_fastdepends=apply_types,
73+
serializer=serializer,
74+
provider=provider or dependency_provider,
75+
context=context or ContextRepo(),
76+
),
77+
),
78+
specification=BrokerSpec(
79+
url=[specification_url],
80+
protocol=None,
81+
protocol_version=None,
82+
description="MQTT Broker",
83+
tags=[],
84+
security=None,
85+
),
86+
routers=routers,
87+
)
88+
89+
@override
90+
async def _connect(self) -> aiomqtt.Client:
91+
return await self.config.broker_config.connect()
92+
93+
async def start(self) -> None:
94+
await self.connect()
95+
await super().start()
96+
97+
async def stop(
98+
self,
99+
exc_type: type[BaseException] | None = None,
100+
exc_val: BaseException | None = None,
101+
exc_tb: Optional["TracebackType"] = None,
102+
) -> None:
103+
await super().stop(exc_type, exc_val, exc_tb)
104+
await self.config.broker_config.disconnect(exc_type, exc_val, exc_tb)
105+
self._connection = None
106+
107+
async def publish(self, message: "SendableMessage", topic: str) -> Any:
108+
cmd = MQTTPublishCommand(
109+
body=message,
110+
destination=topic,
111+
_publish_type=PublishType.PUBLISH,
112+
)
113+
return await super()._basic_publish(
114+
cmd, producer=self.config.broker_config.producer
115+
)
116+
117+
async def ping(self, timeout: float | None) -> bool:
118+
sleep_time = (timeout or 10) / 10
119+
ping_client = self.config.broker_config.create_client()
120+
121+
with anyio.move_on_after(timeout) as cancel_scope:
122+
while True:
123+
if cancel_scope.cancel_called:
124+
return False
125+
try:
126+
async with ping_client:
127+
pass
128+
except aiomqtt.MqttError:
129+
await anyio.sleep(sleep_time)
130+
else:
131+
return True
132+
return False
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from collections.abc import Iterable, Sequence
2+
from typing import TYPE_CHECKING, Any, cast
3+
4+
from aiomqtt import Message
5+
from typing_extensions import override
6+
7+
from faststream._internal.broker.registrator import Registrator
8+
from faststream.mqtt.configs.broker import MQTTBrokerConfig
9+
from faststream.mqtt.subscriber.factory import create_subscriber
10+
from faststream.mqtt.subscriber.usecase import MQTTSubscriber
11+
12+
if TYPE_CHECKING:
13+
from fast_depends.dependencies import Dependant
14+
15+
from faststream._internal.types import CustomCallable, SubscriberMiddleware
16+
17+
18+
class MQTTRegistrator(Registrator[Message, MQTTBrokerConfig]):
19+
@override
20+
def subscriber( # type: ignore[override]
21+
self,
22+
topic: str,
23+
*,
24+
parser: "CustomCallable | None" = None,
25+
decoder: "CustomCallable | None" = None,
26+
dependencies: Iterable["Dependant"] = (),
27+
middlewares: Sequence["SubscriberMiddleware[Any]"] = (),
28+
title: str | None = None,
29+
description: str | None = None,
30+
include_in_schema: bool = True,
31+
) -> MQTTSubscriber:
32+
subscriber = create_subscriber(
33+
topic=topic,
34+
config=cast("MQTTBrokerConfig", self.config),
35+
title_=title,
36+
description_=description,
37+
include_in_schema=include_in_schema,
38+
)
39+
super().subscriber(subscriber)
40+
subscriber.add_call(
41+
parser_=parser,
42+
decoder_=decoder,
43+
dependencies_=dependencies,
44+
middlewares_=middlewares,
45+
)
46+
return subscriber

faststream/mqtt/configs/__init__.py

Whitespace-only changes.

faststream/mqtt/configs/broker.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
from dataclasses import dataclass, field
2+
from typing import TYPE_CHECKING
3+
4+
import aiomqtt
5+
from paho.mqtt.client import MQTT_CLEAN_START_FIRST_ONLY, CleanStartOption
6+
7+
from faststream._internal.configs import BrokerConfig
8+
from faststream.mqtt.publisher.producer import AiomqttFastProducer
9+
10+
if TYPE_CHECKING:
11+
from types import TracebackType
12+
13+
14+
@dataclass(kw_only=True)
15+
class MQTTBrokerConfig(BrokerConfig):
16+
hostname: str
17+
port: int = 1883
18+
username: str | None = None
19+
password: str | None = None
20+
keepalive: int = 60
21+
bind_address: str = ""
22+
bind_port: int = 0
23+
clean_start: CleanStartOption = MQTT_CLEAN_START_FIRST_ONLY
24+
__client: aiomqtt.Client | None = field(init=False, default=None)
25+
26+
async def connect(self) -> aiomqtt.Client:
27+
return await self.client.__aenter__()
28+
29+
async def disconnect(
30+
self,
31+
exc_type: type[BaseException] | None = None,
32+
exc_val: BaseException | None = None,
33+
exc_tb: "TracebackType | None" = None,
34+
) -> None:
35+
await self.client.__aexit__(exc_type, exc_val, exc_tb)
36+
37+
@property
38+
def client(self) -> aiomqtt.Client:
39+
self.__client = self.__client or self.__get_client()
40+
self.producer = AiomqttFastProducer(self.__client)
41+
return self.__client
42+
43+
def create_client(self) -> aiomqtt.Client:
44+
return self.__get_client()
45+
46+
def __get_client(self) -> aiomqtt.Client:
47+
return aiomqtt.Client(
48+
self.hostname,
49+
self.port,
50+
username=self.username,
51+
password=self.password,
52+
keepalive=self.keepalive,
53+
bind_address=self.bind_address,
54+
bind_port=self.bind_port,
55+
clean_start=self.clean_start,
56+
protocol=aiomqtt.ProtocolVersion.V5,
57+
)

0 commit comments

Comments
 (0)