Skip to content

plataux/rmq-remote

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

What is rmq-remote?

A Python framework that provides lightweight abstractions for building remote procedure call (RPC) systems and distributed task pipelines for decoupling and offloading IO-bound and CPU-bound tasks to worker services, using RabbitMQ as the message broker.

Installation

pip install rmq-remote

Key Benefits and Features

rmq-remote is inspired by libraries like celery and nameko, but aims to be much simpler and more lightweight, minimizing configuration and dependencies, while still providing powerful abstractions for building distributed systems. It does not attempt to solve every distributed-workflow problem, but rather focuses on providing a simple foundation upon which developers can build their own solutions to their own business-specific problems.

AsyncIO-first Non-blocking Architecture

Built on top of aio_pika and RabbitMQ, it leverages Python's asyncio for high concurrency and efficient I/O operations. For, CPU-bound tasks, it can be combined with concurrent.futures.ProcessPoolExecutor to offload work to separate processes.

Dynamic RPC dispatching

Workers do not need to register or decorate functions. They can dynamically handle any function call received via RabbitMQ based on the function fully-qualified-name, making it easy to add new functionality without necessarily redeploying workers.

Concurrency and Rate Limiting

Each worker Supports concurrent processing of multiple tasks and rate limiting to control the number of tasks processed per minute, which is useful for interacting with external services that may impose rate limits. For IO-bound tasks, a single worker can easily handle hundreds of tasks concurrently.

Minimal Implementation and Configuration

The package is designed to be a minimal wrappers around aio_pika, that makes full utilization of RabbitMQ capabilities, while completely abstracting away the concept of messages, queues, and exchanges from the user, and providing a simple function call interface. This allows developers to focus on writing business logic without worrying about the underlying messaging infrastructure.

Remote Producer and Consumer

The package provides two main abstractions: producer.py for sending RPC requests (or tasks) to remote workers, and consumer.py for processing them on the worker side.

producer.py (Remote Producer)

  • Channel Pooling: Efficiently manages a pool of RabbitMQ channels for concurrent message publishing.
  • remote_call: Sends an RPC request to a remote queue, optionally waiting for a result with timeout and priority support.
  • remote_callback: Sends an RPC request and specifies a callback function to handle the result asynchronously.
  • remote_task: Decorator to mark a function as a remote task, enabling it to be called via RabbitMQ.
  • remote_task_callback: Decorator for remote tasks that require a callback upon completion.

consumer.py (Remote Consumer)

  • RemoteConsumer: Base class for consuming and processing messages from a RabbitMQ queue.
    • Handles message deserialization, function lookup, and invocation.
    • Supports concurrency and rate limiting (messages per minute).
    • Processes both request and result messages, including error handling and callback invocation.

These abstractions enable robust, asynchronous RPC workflows between distributed Python services using RabbitMQ.

Usage

import asyncio

import aio_pika
import remote
import random


@remote.remote_task(
    queue="rpc_lane",
    return_result=True
)
async def divide(x, y):
    print(f"DIV-thinking about {x} / {y}")
    await asyncio.sleep(random.random())
    print(f"DIV-returning {x} / {y}")
    return x / y


async def main():
    connection = await aio_pika.connect_robust()

    # Set up the consumer
    cons = remote.RemoteConsumer(connection, queue="rpc_lane", concurrency=1, limit_per_minute=0)
    await cons.consume()

    # Set up the producer
    await remote.init_channel_pool(connection)

    # Call the remote task - this interpreter is both producer and consumer
    print(await divide(6, 3))  # Should return 2.0
    
    try:
        print(await divide(6, 0))  # Should raise a remote exception that gets propagated back to the caller
    except ZeroDivisionError as e:
        print(f"Caught expected exception: {e}")

    await cons.cancel()
    await connection.close()


if __name__ == "__main__":
    asyncio.run(main())

About

Remote Procedure Call (RPC) library for Python using RabbitMQ

Topics

Resources

License

Stars

Watchers

Forks

Languages