Skip to content

Commit 90721ee

Browse files
committed
feat: add sqla broker
1 parent f140a7e commit 90721ee

File tree

18 files changed

+956
-1
lines changed

18 files changed

+956
-1
lines changed

faststream/sqla/__init__.py

Whitespace-only changes.

faststream/sqla/broker/__init__.py

Whitespace-only changes.

faststream/sqla/broker/broker.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import logging
2+
from typing import Any, Iterable, Optional, Union
3+
4+
from sqlalchemy.ext.asyncio import AsyncEngine
5+
6+
from faststream._internal.basic_types import LoggerProto
7+
from faststream._internal.broker import BrokerUsecase
8+
from faststream._internal.constants import EMPTY
9+
from faststream.security import BaseSecurity
10+
from faststream.specification.schema.broker import BrokerSpec
11+
from faststream.specification.schema.extra.tag import Tag, TagDict
12+
from faststream.sqla.broker.registrator import SqlaRegistrator
13+
from faststream.sqla.configs.broker import SqlaBrokerConfig
14+
from faststream.sqla.broker.logging import make_sqla_logger_state
15+
16+
17+
class SqlaBroker(
18+
SqlaRegistrator,
19+
BrokerUsecase[
20+
Any,
21+
Any,
22+
],
23+
):
24+
url: list[str]
25+
26+
def __init__(
27+
self,
28+
*,
29+
engine: AsyncEngine,
30+
# broker base args
31+
routers: Iterable[SqlaRegistrator] = (),
32+
# AsyncAPI args
33+
security: Optional["BaseSecurity"] = None,
34+
specification_url: str | Iterable[str] | None = None,
35+
protocol: str | None = None,
36+
protocol_version: str | None = "auto",
37+
description: str | None = None,
38+
tags: Iterable[Union["Tag", "TagDict"]] = (),
39+
# logging args
40+
logger: Optional["LoggerProto"] = EMPTY,
41+
log_level: int = logging.INFO,
42+
) -> None:
43+
44+
super().__init__(
45+
routers=routers,
46+
config=SqlaBrokerConfig(
47+
logger=make_sqla_logger_state(
48+
logger=logger,
49+
log_level=log_level,
50+
),
51+
),
52+
specification=BrokerSpec(
53+
description=description,
54+
url=specification_url,
55+
protocol=protocol,
56+
protocol_version=protocol_version,
57+
security=security,
58+
tags=tags,
59+
),
60+
)
61+
62+
async def _connect(self) -> Any:
63+
return True
64+
65+
async def start(self) -> None:
66+
await self.connect()
67+
await super().start()

faststream/sqla/broker/logging.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import logging
2+
from functools import partial
3+
from typing import TYPE_CHECKING, Any
4+
5+
from faststream._internal.logger import DefaultLoggerStorage, make_logger_state
6+
from faststream._internal.logger.logging import get_broker_logger
7+
8+
if TYPE_CHECKING:
9+
from faststream._internal.basic_types import LoggerProto
10+
from faststream._internal.context import ContextRepo
11+
12+
13+
class SqlaParamsStorage(DefaultLoggerStorage):
14+
def __init__(self) -> None:
15+
super().__init__()
16+
17+
self.logger_log_level = logging.INFO
18+
19+
def set_level(self, level: int) -> None:
20+
self.logger_log_level = level
21+
22+
def register_subscriber(self, params: dict[str, Any]) -> None:
23+
return
24+
25+
def get_logger(self, *, context: "ContextRepo") -> "LoggerProto":
26+
message_id_ln = 10
27+
28+
# TODO: generate unique logger names to not share between brokers
29+
if not (lg := self._get_logger_ref()):
30+
lg = get_broker_logger(
31+
name="sqla",
32+
default_context={},
33+
message_id_ln=message_id_ln,
34+
fmt="".join((
35+
"%(asctime)s %(levelname)-8s - ",
36+
f"%(message_id)-{message_id_ln}s ",
37+
"- %(message)s",
38+
)),
39+
context=context,
40+
log_level=self.logger_log_level,
41+
)
42+
self._logger_ref.add(lg)
43+
44+
return lg
45+
46+
47+
make_sqla_logger_state = partial(
48+
make_logger_state,
49+
default_storage_cls=SqlaParamsStorage,
50+
)
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
from typing import Any, cast
2+
3+
from sqlalchemy.ext.asyncio import AsyncEngine
4+
from faststream._internal.broker.registrator import Registrator
5+
from faststream._internal.constants import EMPTY
6+
from faststream.middlewares.acknowledgement.config import AckPolicy
7+
from faststream.sqla.configs.broker import SqlaBrokerConfig
8+
from faststream.sqla.subscriber.factory import create_subscriber
9+
from faststream.sqla.retry import RetryStrategy
10+
11+
12+
class SqlaRegistrator(Registrator[Any, Any]):
13+
def subscriber(
14+
self,
15+
engine: AsyncEngine,
16+
queue: str,
17+
max_workers: int,
18+
retry_strategy: RetryStrategy,
19+
fetch_interval: float,
20+
fetch_batch_size: int,
21+
overfetch_factor: float,
22+
flush_interval: float,
23+
release_stuck_interval: float,
24+
graceful_shutdown_timeout: float,
25+
release_stuck_timeout: int,
26+
ack_policy: AckPolicy = AckPolicy.NACK_ON_ERROR,
27+
) -> Any:
28+
workers = max_workers or 1
29+
30+
subscriber = create_subscriber(
31+
engine=engine,
32+
queue=queue,
33+
max_workers=max_workers,
34+
retry_strategy=retry_strategy,
35+
fetch_interval=fetch_interval,
36+
fetch_batch_size=fetch_batch_size,
37+
overfetch_factor=overfetch_factor,
38+
flush_interval=flush_interval,
39+
release_stuck_interval=release_stuck_interval,
40+
graceful_shutdown_timeout=graceful_shutdown_timeout,
41+
release_stuck_timeout=release_stuck_timeout,
42+
config=cast("SqlaBrokerConfig", self.config),
43+
ack_policy=ack_policy,
44+
)
45+
46+
super().subscriber(subscriber)
47+
48+
# subscriber.add_call(
49+
# parser_=parser,
50+
# decoder_=decoder,
51+
# dependencies_=dependencies,
52+
# middlewares_=middlewares,
53+
# )
54+
55+
return subscriber

0 commit comments

Comments
 (0)