From 8c31af15b10578677da1706fc95b78b4fcf53f30 Mon Sep 17 00:00:00 2001 From: Meir Tseitlin Date: Wed, 28 May 2025 10:56:38 -0500 Subject: [PATCH 1/4] add reconnect handling --- src/amqp_fabric/amq_broker_connector.py | 35 ++++++++++++++++--- tests/conftest.py | 2 +- tests/test_amqp_broker_connector.py | 46 +++++++++++++++++++++++++ 3 files changed, 77 insertions(+), 6 deletions(-) diff --git a/src/amqp_fabric/amq_broker_connector.py b/src/amqp_fabric/amq_broker_connector.py index 1920064..002188e 100644 --- a/src/amqp_fabric/amq_broker_connector.py +++ b/src/amqp_fabric/amq_broker_connector.py @@ -57,11 +57,17 @@ def deserialize(self, data: Any) -> bytes: return super().deserialize(gzip.decompress(data)) +BROKER_RECONNECT_RETRY_DELAY = os.environ.get( + "AMQFAB_BROKER_RECONNECT_RETRY_DELAY", 5.0 +) +BROKER_HEARTBEAT = os.environ.get("AMQFAB_BROKER_HEARTBEAT", 60) MSG_TYPE_KEEP_ALIVE = "keep_alive" -MAX_DISCOVERY_CACHE_ENTRIES = os.environ.get("MAX_DISCOVERY_CACHE_ENTRIES", 100) -DISCOVERY_CACHE_TTL = os.environ.get("DISCOVERY_CACHE_TTL", 5) -DATA_EXCHANGE_NAME = os.environ.get("DATA_EXCHANGE_NAME", "data") -DISCOVERY_EXCHANGE_NAME = os.environ.get("DISCOVERY_EXCHANGE_NAME", "msc.discovery") +MAX_DISCOVERY_CACHE_ENTRIES = os.environ.get("AMQFAB_MAX_DISCOVERY_CACHE_ENTRIES", 100) +DISCOVERY_CACHE_TTL = os.environ.get("AMQFAB_DISCOVERY_CACHE_TTL", 5) +DATA_EXCHANGE_NAME = os.environ.get("AMQFAB_DATA_EXCHANGE_NAME", "data") +DISCOVERY_EXCHANGE_NAME = os.environ.get( + "AMQFAB_DISCOVERY_EXCHANGE_NAME", "msc.discovery" +) REGEX_FQN_PATTERN = r"^(?:[A-Za-z0-9-_]{1,63}\.){1,255}[A-Za-z0-9-_]{1,63}$" @@ -146,14 +152,30 @@ def data_exchange(self): def fqn(self): return broker_fqn(self._service_domain, self._service_type, self._service_id) + async def _on_reconnect(self, connection=None): + # This will create the exchange if it doesn't already exist. + channel = await self._broker_conn.channel() + + self._data_exchange = await channel.declare_exchange( + name=self._data_exchange_name, type=ExchangeType.HEADERS, durable=True + ) + self._discovery_exchange = await channel.declare_exchange( + name=self._discovery_exchange_name, type=ExchangeType.HEADERS, durable=True + ) + async def open(self, **kwargs: Any): self._broker_conn = await connect_robust( url=self._amqp_uri, client_properties={"connection_name": "rpc_srv"}, + connection_attempts=None, # None means infinite retries + retry_delay=BROKER_RECONNECT_RETRY_DELAY, # wait 5 s between attempts + heartbeat=BROKER_HEARTBEAT, # send heartbeats every minute **kwargs, ) - # This will create the exchange if it doesn't already exist. + self._broker_conn.reconnect_callbacks.add(self._on_reconnect) + + await self._on_reconnect() channel = await self._broker_conn.channel() self._data_exchange = await channel.declare_exchange( @@ -162,6 +184,7 @@ async def open(self, **kwargs: Any): self._discovery_exchange = await channel.declare_exchange( name=self._discovery_exchange_name, type=ExchangeType.HEADERS, durable=True ) + await aio.sleep(0.1) # Initialize keep-alive messages @@ -183,6 +206,8 @@ async def open(self, **kwargs: Any): # Initialize keep-alive listener if self._keep_alive_listen: + channel = await self._broker_conn.channel() + self._discovery_cache = TTLCache( maxsize=MAX_DISCOVERY_CACHE_ENTRIES, ttl=self._discovery_cache_ttl ) diff --git a/tests/conftest.py b/tests/conftest.py index 8493fd5..9e3b156 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -17,4 +17,4 @@ RPC_EXCHANGE_NAME = os.environ.get( "RPC_EXCHANGE_NAME", f"{SERVICE_DOMAIN}.api.{SERVICE_TYPE}.{SERVICE_ID}" ) -DATA_EXCHANGE_NAME = os.environ.get("DATA_EXCHANGE_NAME", f"{SERVICE_DOMAIN}.daq.data") +DATA_EXCHANGE_NAME = os.environ.get("DATA_EXCHANGE_NAME", f"{SERVICE_DOMAIN}.data") diff --git a/tests/test_amqp_broker_connector.py b/tests/test_amqp_broker_connector.py index 619c7dc..8d716bb 100644 --- a/tests/test_amqp_broker_connector.py +++ b/tests/test_amqp_broker_connector.py @@ -315,3 +315,49 @@ async def on_new_data(message: IncomingMessage): await srv_conn.close() await client_conn.close() + + +@pytest.mark.asyncio +async def test_reconnects(): + api = TestApi() + + srv_conn = AmqBrokerConnector( + amqp_uri=AMQP_URL, + service_domain=SERVICE_DOMAIN, + service_type=SERVICE_TYPE, + service_id=SERVICE_ID, + keep_alive_seconds=2, + ) + await srv_conn.open() + + assert len(srv_conn._broker_conn.close_callbacks) == 1 + + assert srv_conn.fqn == f"{SERVICE_DOMAIN}.{SERVICE_TYPE}.{SERVICE_ID}" + assert srv_conn.service_id == SERVICE_ID + assert srv_conn.service_type == SERVICE_TYPE + assert srv_conn.domain == SERVICE_DOMAIN + assert srv_conn.data_exchange == f"{SERVICE_DOMAIN}.data" + + # Init server + await srv_conn.rpc_register(api) + + # Init client + client_conn = AmqBrokerConnector( + amqp_uri=AMQP_URL, + service_domain=SERVICE_DOMAIN, + service_type="client", + service_id="client", + ) + await client_conn.open() + + proxy = await client_conn.rpc_proxy( + service_domain=SERVICE_DOMAIN, + service_id=SERVICE_ID, + service_type=SERVICE_TYPE, + ) + + assert await proxy.multiply(x=100, y=2) + assert srv_conn._scheduler + + await srv_conn._broker_conn.close() + await client_conn.close() From 68184797be7ee372a085be4b68c5a2c81a5c9601 Mon Sep 17 00:00:00 2001 From: Meir Tseitlin Date: Wed, 28 May 2025 11:37:03 -0500 Subject: [PATCH 2/4] WIP --- src/amqp_fabric/amq_broker_connector.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/amqp_fabric/amq_broker_connector.py b/src/amqp_fabric/amq_broker_connector.py index 002188e..a7caa3f 100644 --- a/src/amqp_fabric/amq_broker_connector.py +++ b/src/amqp_fabric/amq_broker_connector.py @@ -176,15 +176,6 @@ async def open(self, **kwargs: Any): self._broker_conn.reconnect_callbacks.add(self._on_reconnect) await self._on_reconnect() - channel = await self._broker_conn.channel() - - self._data_exchange = await channel.declare_exchange( - name=self._data_exchange_name, type=ExchangeType.HEADERS, durable=True - ) - self._discovery_exchange = await channel.declare_exchange( - name=self._discovery_exchange_name, type=ExchangeType.HEADERS, durable=True - ) - await aio.sleep(0.1) # Initialize keep-alive messages @@ -349,6 +340,7 @@ async def _on_send_keep_alive(self): message=Message(body="".encode(), headers=headers), routing_key="" ) ) + except Exception as e: log.error(e) From aeab57ef06edbc4c583aee00e63ebbd06f577db0 Mon Sep 17 00:00:00 2001 From: Meir Tseitlin Date: Wed, 28 May 2025 13:20:44 -0500 Subject: [PATCH 3/4] wrapping connectivity into try except --- src/amqp_fabric/amq_broker_connector.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/amqp_fabric/amq_broker_connector.py b/src/amqp_fabric/amq_broker_connector.py index a7caa3f..fd0f040 100644 --- a/src/amqp_fabric/amq_broker_connector.py +++ b/src/amqp_fabric/amq_broker_connector.py @@ -153,15 +153,23 @@ def fqn(self): return broker_fqn(self._service_domain, self._service_type, self._service_id) async def _on_reconnect(self, connection=None): - # This will create the exchange if it doesn't already exist. - channel = await self._broker_conn.channel() + try: + # This will create the exchange if it doesn't already exist. + channel = await self._broker_conn.channel() - self._data_exchange = await channel.declare_exchange( - name=self._data_exchange_name, type=ExchangeType.HEADERS, durable=True - ) - self._discovery_exchange = await channel.declare_exchange( - name=self._discovery_exchange_name, type=ExchangeType.HEADERS, durable=True - ) + self._data_exchange = await channel.declare_exchange( + name=self._data_exchange_name, type=ExchangeType.HEADERS, durable=True + ) + self._discovery_exchange = await channel.declare_exchange( + name=self._discovery_exchange_name, + type=ExchangeType.HEADERS, + durable=True, + ) + log.info("Connected to broker.") + + except Exception as e: + log.error("Error reconnecting....") + log.error(e) async def open(self, **kwargs: Any): self._broker_conn = await connect_robust( From 6960ae6fc988c9b667709a1fbbbd35e0f2900c60 Mon Sep 17 00:00:00 2001 From: Meir Tseitlin Date: Wed, 25 Jun 2025 14:58:21 -0500 Subject: [PATCH 4/4] update reconnect mechanism --- setup.cfg | 9 +++--- src/amqp_fabric/amq_broker_connector.py | 43 +++++++++++++++---------- tests/conftest.py | 23 ++++++++++++- tests/test_amqp_broker_connector.py | 35 ++++++++++++++++---- 4 files changed, 80 insertions(+), 30 deletions(-) diff --git a/setup.cfg b/setup.cfg index ed748aa..4450a45 100644 --- a/setup.cfg +++ b/setup.cfg @@ -34,11 +34,7 @@ install_requires = apscheduler cachetools -# The usage of test_requires is discouraged, see `Dependency Management` docs -tests_require = - pytest - pytest-cov - pytest-asyncio + # Require a specific Python version, e.g. Python 2.7 or >= 3.4 python_requires = >=3.8 @@ -56,6 +52,7 @@ testing = pytest pytest-cov pytest-asyncio + aiomisc-pytest [options.entry_points] # Add here console scripts like: @@ -90,6 +87,8 @@ testpaths = tests # markers = # slow: mark tests as slow (deselect with '-m "not slow"') +asyncio_default_fixture_loop_scope = function + [aliases] dists = bdist_wheel diff --git a/src/amqp_fabric/amq_broker_connector.py b/src/amqp_fabric/amq_broker_connector.py index fd0f040..357a90a 100644 --- a/src/amqp_fabric/amq_broker_connector.py +++ b/src/amqp_fabric/amq_broker_connector.py @@ -132,6 +132,8 @@ def __init__( self._keepalive_subscriber_service_type = None self._keepalive_subscriber_service_id = None + self._api = None + @property def domain(self): return self._service_domain @@ -165,7 +167,11 @@ async def _on_reconnect(self, connection=None): type=ExchangeType.HEADERS, durable=True, ) - log.info("Connected to broker.") + + log.info(f"Service '{self.fqn}' connected to broker.") + + if self._api: + await self.rpc_register(self._api) except Exception as e: log.error("Error reconnecting....") @@ -227,11 +233,14 @@ async def close(self): self._scheduler.shutdown(wait=True) self._scheduler = None + self._api = None await self._broker_conn.close() # --- Service management routines --- async def rpc_register(self, api): + self._api = api + # Creating channel channel = await self._broker_conn.channel() await channel.set_qos(prefetch_count=1) @@ -249,9 +258,7 @@ async def rpc_register(self, api): await rpc.register(api_name, awaitify(callee), auto_delete=True) log.info( - 'RPC Server Registered on Exchange "{}"'.format( - self._rpc_server_exchange_name - ) + f'RPC Server Registered on Exchange "{self._rpc_server_exchange_name}"' ) async def rpc_proxy(self, service_domain, service_id, service_type): @@ -335,20 +342,21 @@ async def subscribe_data(self, subscriber_name, headers, callback): await queue.consume(callback) async def _on_send_keep_alive(self): - try: - headers = { - "msg_type": MSG_TYPE_KEEP_ALIVE, - "service_domain": self._service_domain, - "service_id": self._service_id, - "service_type": self._service_type, - } - - aio.create_task( - self._discovery_exchange.publish( - message=Message(body="".encode(), headers=headers), routing_key="" - ) + headers = { + "msg_type": MSG_TYPE_KEEP_ALIVE, + "service_domain": self._service_domain, + "service_id": self._service_id, + "service_type": self._service_type, + } + + task = aio.create_task( + self._discovery_exchange.publish( + message=Message(body="".encode(), headers=headers), routing_key="" ) + ) + try: + await task # Exception is raised here except Exception as e: log.error(e) @@ -384,7 +392,8 @@ async def _on_get_keep_alive(self, message: IncomingMessage): or headers["service_id"] == self._keepalive_service_service_id ) ): - aio.create_task(self._keepalive_subscriber_callback(headers)) + task = aio.create_task(self._keepalive_subscriber_callback(headers)) + await task # Exception is raised here except Exception as e: log.error(e) diff --git a/tests/conftest.py b/tests/conftest.py index 9e3b156..c250c61 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,8 +9,14 @@ """ import os +from typing import Type -AMQP_URL = os.environ.get("AMQP_URL", "amqp://guest:guest@localhost/") +import pytest +from aiomisc_pytest import TCPProxy + +AMQP_HOST = os.environ.get("AMQP_HOST", "localhost") +AMQP_PORT = os.environ.get("AMQP_PORT", "5672") +AMQP_URL = os.environ.get("AMQP_URL", f"amqp://guest:guest@{AMQP_HOST}:{AMQP_PORT}/") SERVICE_ID = os.environ.get("SERVICE_ID", "amqp-fabric") SERVICE_TYPE = os.environ.get("SERVICE_TYPE", "no-type") SERVICE_DOMAIN = os.environ.get("SERVICE_DOMAIN", "some-domain") @@ -18,3 +24,18 @@ "RPC_EXCHANGE_NAME", f"{SERVICE_DOMAIN}.api.{SERVICE_TYPE}.{SERVICE_ID}" ) DATA_EXCHANGE_NAME = os.environ.get("DATA_EXCHANGE_NAME", f"{SERVICE_DOMAIN}.data") + + +@pytest.fixture +async def proxy(tcp_proxy: Type[TCPProxy]): + p = tcp_proxy( + AMQP_HOST, + AMQP_PORT, + buffered=False, + ) + + await p.start() + try: + yield p + finally: + await p.close() diff --git a/tests/test_amqp_broker_connector.py b/tests/test_amqp_broker_connector.py index 8d716bb..7c4b996 100644 --- a/tests/test_amqp_broker_connector.py +++ b/tests/test_amqp_broker_connector.py @@ -2,10 +2,12 @@ import datetime as dt import json +import aiomisc import pytest from aio_pika import IncomingMessage, connect_robust -from aio_pika.exceptions import MessageProcessError +from aio_pika.exceptions import CONNECTION_EXCEPTIONS, MessageProcessError from aio_pika.patterns.rpc import JsonRPCError +from aiomisc_pytest import TCPProxy from conftest import ( AMQP_URL, RPC_EXCHANGE_NAME, @@ -317,12 +319,16 @@ async def on_new_data(message: IncomingMessage): await client_conn.close() +@aiomisc.timeout(30) @pytest.mark.asyncio -async def test_reconnects(): +async def test_server_reconnects(proxy: TCPProxy): api = TestApi() + amqp_url = f"amqp://guest:guest@{proxy.proxy_host}:{proxy.proxy_port}/" + reconnect_event = asyncio.Event() + srv_conn = AmqBrokerConnector( - amqp_uri=AMQP_URL, + amqp_uri=amqp_url, service_domain=SERVICE_DOMAIN, service_type=SERVICE_TYPE, service_id=SERVICE_ID, @@ -330,7 +336,9 @@ async def test_reconnects(): ) await srv_conn.open() - assert len(srv_conn._broker_conn.close_callbacks) == 1 + srv_conn._broker_conn.reconnect_callbacks.add( + lambda *_: reconnect_event.set(), + ) assert srv_conn.fqn == f"{SERVICE_DOMAIN}.{SERVICE_TYPE}.{SERVICE_ID}" assert srv_conn.service_id == SERVICE_ID @@ -350,14 +358,27 @@ async def test_reconnects(): ) await client_conn.open() - proxy = await client_conn.rpc_proxy( + rpc_proxy = await client_conn.rpc_proxy( service_domain=SERVICE_DOMAIN, service_id=SERVICE_ID, service_type=SERVICE_TYPE, ) - assert await proxy.multiply(x=100, y=2) + assert await rpc_proxy.multiply(x=100, y=2) assert srv_conn._scheduler - await srv_conn._broker_conn.close() + # Disconnect existing client + await proxy.disconnect_all() + + with pytest.raises(CONNECTION_EXCEPTIONS): + await rpc_proxy.multiply(x=100, y=2) + + # Wait for reconnect + await asyncio.wait_for(reconnect_event.wait(), timeout=10) + # + # # Test RPC again + # await rpc_proxy.multiply(x=100, y=2) + # await client_conn.close() + + await srv_conn._broker_conn.close()