A Python framework that provides lightweight abstractions for building remote procedure call (RPC) systems and distributed task pipelines for decoupling and offloading IO-bound and CPU-bound tasks to worker services, using RabbitMQ as the message broker.
pip install rmq-remotermq-remote is inspired by libraries like celery and nameko, but aims to be much simpler and more lightweight,
minimizing configuration and dependencies, while still providing powerful abstractions for building distributed systems.
It does not attempt to solve every distributed-workflow problem, but rather focuses on providing a simple foundation
upon which developers can build their own solutions to their own business-specific problems.
Built on top of aio_pika and RabbitMQ, it leverages Python's asyncio for high concurrency and efficient I/O operations.
For, CPU-bound tasks, it can be combined with concurrent.futures.ProcessPoolExecutor to offload work to separate processes.
Workers do not need to register or decorate functions. They can dynamically handle any function call received via RabbitMQ based on the function fully-qualified-name, making it easy to add new functionality without necessarily redeploying workers.
Each worker Supports concurrent processing of multiple tasks and rate limiting to control the number of tasks processed per minute, which is useful for interacting with external services that may impose rate limits. For IO-bound tasks, a single worker can easily handle hundreds of tasks concurrently.
The package is designed to be a minimal wrappers around aio_pika, that makes full utilization of RabbitMQ capabilities,
while completely abstracting away the concept of messages, queues, and exchanges from the user, and providing a simple function call interface.
This allows developers to focus on writing business logic without worrying about the underlying messaging infrastructure.
The package provides two main abstractions: producer.py for sending RPC requests (or tasks) to remote workers,
and consumer.py for processing them on the worker side.
- Channel Pooling: Efficiently manages a pool of RabbitMQ channels for concurrent message publishing.
- remote_call: Sends an RPC request to a remote queue, optionally waiting for a result with timeout and priority support.
- remote_callback: Sends an RPC request and specifies a callback function to handle the result asynchronously.
- remote_task: Decorator to mark a function as a remote task, enabling it to be called via RabbitMQ.
- remote_task_callback: Decorator for remote tasks that require a callback upon completion.
- RemoteConsumer: Base class for consuming and processing messages from a RabbitMQ queue.
- Handles message deserialization, function lookup, and invocation.
- Supports concurrency and rate limiting (messages per minute).
- Processes both request and result messages, including error handling and callback invocation.
These abstractions enable robust, asynchronous RPC workflows between distributed Python services using RabbitMQ.
import asyncio
import aio_pika
import remote
import random
@remote.remote_task(
queue="rpc_lane",
return_result=True
)
async def divide(x, y):
print(f"DIV-thinking about {x} / {y}")
await asyncio.sleep(random.random())
print(f"DIV-returning {x} / {y}")
return x / y
async def main():
connection = await aio_pika.connect_robust()
# Set up the consumer
cons = remote.RemoteConsumer(connection, queue="rpc_lane", concurrency=1, limit_per_minute=0)
await cons.consume()
# Set up the producer
await remote.init_channel_pool(connection)
# Call the remote task - this interpreter is both producer and consumer
print(await divide(6, 3)) # Should return 2.0
try:
print(await divide(6, 0)) # Should raise a remote exception that gets propagated back to the caller
except ZeroDivisionError as e:
print(f"Caught expected exception: {e}")
await cons.cancel()
await connection.close()
if __name__ == "__main__":
asyncio.run(main())