A robust, production-ready middleware for integrating RabbitMQ (AMQP) with HTTP services (e.g., 1C:Enterprise, ERPs, legacy systems).
Українська версія (Ukrainian Version)
This middleware acts as a reliable bridge between HTTP clients and RabbitMQ, ensuring:
- Publishing: HTTP POST -> RabbitMQ Exchange (Reliable, At-Least-Once)
- Consuming: HTTP POST (Long-polling) <- RabbitMQ Queue
- Protocol Bridging: Converts HTTP REST requests into AMQP messages.
- Reliability: Uses RabbitMQ Publisher Confirms and manual Acknowledgments to guarantee "At-Least-Once" delivery.
- Topology Management: Automatically sets up Exchanges, Queues, and Dead Letter Exchanges (DLX).
- Security:
- HTTP Basic Authentication: Proxies credentials directly to RabbitMQ.
- Connection Pooling: Manages efficient user sessions with auto-cleanup.
- Rate Limiting & Input Validation (OWASP standards).
- Observability: Structured JSON logging with Correlation IDs and Prometheus metrics.
- Strict Schema: Pydantic V2 models for reliable data handling.
This service uses HTTP Basic Authentication.
CRITICAL: Do NOT expose this service directly to the internet. You MUST deploy it behind a Reverse Proxy (Nginx, Traefik, AWS ALB) configured with HTTPS (TLS). Sending credentials (username/password) over plain HTTP is insecure.
- Python 3.11+
- uv (recommended) or pip
- Docker & Docker Compose
-
Clone the repository:
git clone <repo-url> cd rmq_middleware
-
Install dependencies:
uv sync
-
Run tests (including integration tests with Testcontainers):
uv run pytest
- Create a
.envfile (see.env.example). - Start services:
docker-compose up -d
- Check health status:
curl http://localhost:8000/health
The middleware attempts to handle message bodies intelligently:
- JSON: If the message content-type is
application/jsonor the body is valid JSON, it is deserialized into a Dictionary/List. - String: If JSON parsing fails, it attempts to decode as a UTF-8 string.
- Hex: If UTF-8 decoding fails (binary data), the body is returned as a Hexadecimal string.
- Fetch Timeout: The
timeoutparameter in/v1/fetchdefines how long to wait for a message to arrive. It is NOT the processing time. - Unacked State: When a message is fetched with
auto_ack: false, it enters theUnackedstate in RabbitMQ and is locked to the current user session. - Session Cleanup: If a client crashes without sending an Ack/Reject, the middleware will automatically close the idle connection after 5 minutes. Only then will RabbitMQ return the message to the queue for other consumers.
Client Recommendations:
- Success: Always send
POST /v1/ack/{tag}. - Failure: Send
POST /v1/reject/{tag}(userequeue: trueto retry, orfalsefor DLX).
| Endpoint | Method | Description | Auth |
|---|---|---|---|
/v1/publish |
POST | Publish message (Strict Schema) | Basic |
/v1/fetch |
POST | Consume message (Long-polling) | Basic |
/v1/ack/{tag} |
POST | Acknowledge message | Basic |
/v1/reject/{tag} |
POST | Reject message | Basic |
/health |
GET | Liveness probe | None |
/ready |
GET | Readiness probe | None |
import requests
from requests.auth import HTTPBasicAuth
url = "http://localhost:8000/v1/publish"
auth = HTTPBasicAuth('my_rmq_user', 'my_secret_pass')
payload = {
"exchange": "enterprise.core",
"routing_key": "order.created",
"payload": {"id": 123},
"persistent": True,
"mandatory": True
}
resp = requests.post(url, json=payload, auth=auth)
print(resp.status_code) # 202This project is licensed under the MIT License - see the LICENSE file for details.