Skip to content

Msameim181/Fastrict

Repository files navigation

🚀 Fastrict - Enterprise FastAPI Rate Limiter

The most powerful, flexible, and production-ready rate limiting system for FastAPI applications.

Fastrict provides enterprise-grade rate limiting with Redis and in-memory backends, supporting everything from simple API throttling to complex multi-tenant rate limiting strategies.

Python 3.8+ FastAPI Redis License: MIT PyPI version Downloads Performance Latency

✨ Features

🏗️ Dual Architecture Support

  • 🌐 Global Rate Limiting: Shared limits across all endpoints
  • 🎯 Per-Route Rate Limiting: Independent limits for each endpoint
  • 🔄 Hybrid Mode: Mix global and per-route limits in the same application

🚀 Extreme Performance

  • ⚡ Sub-millisecond latency: Ultra-fast rate limit checks
  • 📊 1K-30K concurrent connections: Enterprise-scale performance
  • 🧮 Sliding window algorithm: Precise rate limiting with Redis sorted sets
  • 🗑️ Automatic cleanup: Expired keys removed automatically

Advanced Key Extraction

  • 🌍 IP-based limiting: Traditional client IP throttling
  • 🔑 Header-based: API keys, user tokens, custom headers
  • 📋 Query parameters: Rate limit by user ID, tenant, etc.
  • 📝 Form fields: POST form data extraction
  • 🎭 Custom functions: Complex business logic extraction
  • 🔗 Combined keys: Multi-factor rate limiting (IP + API key + tenant)

�️ Intelligent Bypass System

  • 👑 Role-based bypass: Skip limits for admin users
  • 🎫 Premium tier bypass: Different limits for paid users
  • 🔧 Maintenance mode: Conditional bypass during deployments
  • 🤖 Custom logic: Any business rule for bypass decisions

📊 Production Monitoring

  • 📈 Standard HTTP headers: X-RateLimit-* headers
  • 📱 Real-time usage: Current count, remaining, usage percentage
  • ⏱️ Retry-After: Smart retry timing
  • 📋 Comprehensive logging: Structured logs for monitoring
  • 🎯 Usage statistics: Track rate limit effectiveness

🏭 Enterprise Ready

  • ☁️ Redis Cluster support: Horizontal scaling
  • 💾 Memory fallback: In-memory storage for development
  • 🔄 Graceful degradation: Continues working if Redis fails
  • 🔒 Thread-safe: Concurrent request handling
  • 🧪 100% test coverage: Thoroughly tested codebase
  • 📋 Clean Architecture: SOLID principles, easy to extend

📦 Installation

# Install from PyPI
pip install fastrict

# Install with development dependencies
pip install fastrict[dev]

# Install with documentation dependencies
pip install fastrict[docs]

🔧 System Requirements

Component Version Purpose
Python 3.8+ Core runtime
FastAPI 0.68+ Web framework
Redis 4.0+ Primary storage backend
Pydantic 1.8+ Data validation
Starlette 0.14+ ASGI framework

🚀 Quick Start

🎯 1. Basic Setup (30 seconds)

from fastapi import FastAPI
from fastrict import RateLimitMiddleware, RedisRateLimitRepository
from fastrict import RateLimitUseCase, KeyExtractionUseCase

# Create FastAPI app
app = FastAPI(title="My Rate Limited API")

# Setup rate limiting (Redis)
repository = RedisRateLimitRepository.from_url("redis://localhost:6379")
key_extraction = KeyExtractionUseCase()
rate_limiter = RateLimitUseCase(repository, key_extraction)

# Create default key extraction strategy (NEW in v0.1.0)
# Try API key, then Authorization header, then fall back to IP
from fastrict import create_api_key_fallback
default_key_extraction = create_api_key_fallback()

# Add global rate limiting middleware
app.add_middleware(
    RateLimitMiddleware,
    rate_limit_use_case=rate_limiter,
    excluded_paths=["/health", "/docs", "/metrics"],
    default_key_extraction=default_key_extraction  # NEW: Default for all routes
)

@app.get("/api/data")
async def get_data():
    return {"message": "This endpoint is globally rate limited"}

🎨 2. Route-Specific Rate Limiting

from fastrict import throttle, RateLimitStrategyName, RateLimitMode

# Strict rate limiting for authentication
@app.post("/auth/login")
@throttle(strategy=RateLimitStrategyName.SHORT)  # 3 requests per minute
async def login():
    return {"token": "jwt-token-here"}

# Custom rate limiting for file uploads
@app.post("/api/upload")
@throttle(limit=5, ttl=300)  # 5 uploads per 5 minutes
async def upload_file():
    return {"file_id": "12345", "status": "uploaded"}

# Premium endpoint with generous limits
@app.get("/api/premium")
@throttle(limit=1000, ttl=3600)  # 1000 requests per hour
async def premium_data():
    return {"data": "premium content"}

🔑 3. Advanced Key Extraction

from fastrict import KeyExtractionType

# API key-based rate limiting
@app.get("/api/protected")
@throttle(
    limit=100, 
    ttl=3600,
    key_type=KeyExtractionType.HEADER,
    key_field="X-API-Key",
    key_default="anonymous"
)
async def protected_endpoint():
    return {"data": "API key limited content"}

# User-specific rate limiting
@app.get("/api/user-data")
@throttle(
    limit=50,
    ttl=600,
    key_type=KeyExtractionType.QUERY_PARAM,
    key_field="user_id",
    key_default="guest"
)
async def user_data():
    return {"data": "user-specific data"}

# Multi-factor rate limiting (IP + API key)
@app.get("/api/sensitive")
@throttle(
    limit=10,
    ttl=300,
    key_type=KeyExtractionType.COMBINED,
    key_combination=["ip", "header:X-API-Key"]
)
async def sensitive_data():
    return {"data": "highly sensitive information"}

🎛️ Rate Limiting Modes

Fastrict offers two powerful rate limiting modes that can be mixed and matched:

🌐 Global Mode

All endpoints share the same rate limit pool. Perfect for overall API protection.

from fastrict import RateLimitMode

app.add_middleware(
    RateLimitMiddleware,
    rate_limit_use_case=rate_limiter,
    rate_limit_mode=RateLimitMode.GLOBAL,  # All routes share limits
    default_strategy_name=RateLimitStrategyName.MEDIUM
)

@app.get("/api/data")      # ──┐ 
async def get_data():      #   ├── All share same
    return {"data": "..."}  #   │   20 req/10min pool

@app.get("/api/users")     #   │
async def get_users():     #   │ 
    return {"users": []}   # ──┘

🎯 Per-Route Mode

Each endpoint has independent rate limit pools. Ideal for fine-grained control.

app.add_middleware(
    RateLimitMiddleware,
    rate_limit_use_case=rate_limiter,
    rate_limit_mode=RateLimitMode.PER_ROUTE  # Independent limits per route
)

@app.get("/api/data")      # ── 20 req/10min (independent)
async def get_data():
    return {"data": "..."}

@app.get("/api/users")     # ── 20 req/10min (independent)  
async def get_users():
    return {"users": []}

🔄 Hybrid Mode

Mix global middleware with per-route decorators for ultimate flexibility:

# Global middleware (GLOBAL mode)
app.add_middleware(
    RateLimitMiddleware,
    rate_limit_use_case=rate_limiter,
    rate_limit_mode=RateLimitMode.GLOBAL
)

@app.get("/api/public")     # Uses global pool
async def public_data():
    return {"data": "public"}

@app.get("/api/special")    # Gets its own independent pool
@throttle(limit=100, ttl=3600, rate_limit_mode=RateLimitMode.PER_ROUTE)
async def special_endpoint():
    return {"data": "special"}

� Fallback Key Extraction Strategies

NEW in v0.1.0: Advanced fallback mechanisms that try multiple extraction methods in sequence.

🏗️ Built-in Fallback Helpers

Fastrict provides convenient helper functions for common fallback patterns:

from fastrict import (
    create_auth_header_fallback,
    create_api_key_fallback, 
    create_user_id_fallback
)

# Try Authorization header, then IP
auth_fallback = create_auth_header_fallback(
    header_name="Authorization",  # Default
    default_value="anonymous"      # Optional
)

# Try API key, then Authorization, then IP
api_fallback = create_api_key_fallback(
    api_key_header="X-API-Key",    # Default 
    auth_header="Authorization",    # Default
    default_value=None             # Will use IP if headers missing
)

# Try user ID from query param, then header, then IP
user_fallback = create_user_id_fallback(
    user_id_param="user_id",       # Default
    user_id_header="X-User-ID",    # Default
    default_value="anonymous"      # Optional
)

⚙️ Middleware Default Strategy

Set a default key extraction strategy that applies to all routes:

from fastrict import RateLimitMiddleware, create_api_key_fallback

# Create fallback strategy for middleware
default_strategy = create_api_key_fallback(
    api_key_header="X-API-Key",
    auth_header="Authorization"
    # Falls back to IP if neither header is present
)

app.add_middleware(
    RateLimitMiddleware,
    rate_limit_use_case=rate_limiter,
    default_key_extraction=default_strategy,  # Applied to all routes
    rate_limit_mode=RateLimitMode.GLOBAL
)

# This endpoint will use the middleware default strategy
@app.get("/api/data")
async def get_data():
    return {"data": "Uses API key → Auth header → IP fallback"}

# This endpoint overrides with its own strategy  
@app.get("/api/users")
@throttle(
    limit=50, ttl=3600,
    key_extraction_strategy=create_user_id_fallback()
)
async def get_users():
    return {"users": "Uses user ID → header → IP fallback"}

🎯 Route-Specific Fallback

Override the middleware default for specific routes:

# Use helper function directly
@app.get("/api/auth-required")
@throttle(
    limit=100, ttl=3600,
    key_extraction_strategy=create_auth_header_fallback()
)
async def auth_endpoint():
    return {"data": "auth-protected"}

# Custom fallback strategy
from fastrict import KeyExtractionStrategy, KeyExtractionType

custom_fallback = KeyExtractionStrategy(
    type=KeyExtractionType.FALLBACK,
    fallback_strategies=[
        KeyExtractionStrategy(
            type=KeyExtractionType.HEADER,
            field_name="X-Session-ID"
        ),
        KeyExtractionStrategy(
            type=KeyExtractionType.HEADER, 
            field_name="X-API-Key"
        ),
        KeyExtractionStrategy(
            type=KeyExtractionType.IP
        )
    ]
)

@app.get("/api/session-data")
@throttle(
    limit=50, ttl=600,
    key_extraction_strategy=custom_fallback
)
async def session_endpoint():
    return {"data": "session-based rate limiting"}

🔄 How Fallback Works

  1. Try first strategy: Attempt to extract key using the first method
  2. Check success: If extraction succeeds and returns a valid key, use it
  3. Try next strategy: If extraction fails or returns empty, try next method
  4. Continue sequence: Repeat until a strategy succeeds
  5. IP fallback: If all strategies fail, fall back to IP address
# Example: API key → Auth header → IP fallback
api_fallback = create_api_key_fallback()

# For a request with these headers:
# X-API-Key: "" (empty)
# Authorization: "Bearer token123"
# Client IP: "192.168.1.100"

# Fallback process:
# 1. Try X-API-Key → empty, skip
# 2. Try Authorization → "Bearer token123" ✓
# Result: Rate limiting key = "Bearer token123"

🏢 Real-World Example

# Multi-tenant SaaS with intelligent key extraction
from fastrict import create_api_key_fallback, RateLimitMode

# Middleware default: API key for tenant isolation
default_strategy = create_api_key_fallback(
    api_key_header="X-API-Key",
    auth_header="Authorization"
)

app.add_middleware(
    RateLimitMiddleware,
    rate_limit_use_case=rate_limiter,
    default_key_extraction=default_strategy,
    rate_limit_mode=RateLimitMode.GLOBAL,
    default_strategy_name=RateLimitStrategyName.MEDIUM
)

# Public endpoints use IP-based limiting
@app.get("/api/public")
@throttle(
    limit=100, ttl=3600,
    key_extraction_strategy=KeyExtractionStrategy(type=KeyExtractionType.IP)
)
async def public_data():
    return {"data": "public"}

# User endpoints prefer user ID over API key
@app.get("/api/user-profile")
@throttle(
    limit=200, ttl=3600,
    key_extraction_strategy=create_user_id_fallback()
)
async def user_profile():
    return {"profile": "user data"}

# Admin endpoints use session-based limiting
admin_fallback = KeyExtractionStrategy(
    type=KeyExtractionType.FALLBACK,
    fallback_strategies=[
        KeyExtractionStrategy(type=KeyExtractionType.HEADER, field_name="Admin-Session"),
        KeyExtractionStrategy(type=KeyExtractionType.HEADER, field_name="X-API-Key"),
        KeyExtractionStrategy(type=KeyExtractionType.IP)
    ]
)

@app.get("/api/admin")
@throttle(
    limit=1000, ttl=3600,
    key_extraction_strategy=admin_fallback
)
async def admin_endpoint():
    return {"data": "admin-only"}

�🔑 Key Extraction Strategies

📍 IP-Based (Default)

@throttle(limit=100, ttl=3600)  # Rate limit per client IP

🎫 Header-Based

# API key rate limiting
@throttle(
    limit=1000, ttl=3600,
    key_type=KeyExtractionType.HEADER,
    key_field="X-API-Key",
    key_default="anonymous"
)

# User token rate limiting  
@throttle(
    limit=500, ttl=3600,
    key_type=KeyExtractionType.HEADER,
    key_field="Authorization",
    key_default="unauthenticated"
)

📋 Query Parameter-Based

# User-specific limits
@throttle(
    limit=200, ttl=3600,
    key_type=KeyExtractionType.QUERY_PARAM,
    key_field="user_id",
    key_default="anonymous"
)

# Tenant-based limits (SaaS)
@throttle(
    limit=10000, ttl=3600,
    key_type=KeyExtractionType.QUERY_PARAM, 
    key_field="tenant_id",
    key_default="free_tier"
)

🔗 Combined Key Strategies

# Multi-factor rate limiting
@throttle(
    limit=50, ttl=300,
    key_type=KeyExtractionType.COMBINED,
    key_combination=[
        "ip",                    # Client IP
        "header:X-API-Key",      # API key
        "query_param:tenant_id"  # Tenant
    ]
)
# Results in key: "192.168.1.1:abc123:tenant_456"

🎭 Custom Key Extraction

def extract_session_key(request: Request) -> str:
    """Complex business logic for key extraction."""
    session_id = request.headers.get("Session-ID")
    user_tier = request.headers.get("User-Tier", "free")
    
    if user_tier == "premium":
        return f"premium:session:{session_id}"
    elif user_tier == "enterprise":
        return f"enterprise:session:{session_id}"
    else:
        return f"free:ip:{request.client.host}"

@throttle(
    limit=100, ttl=3600,
    key_type=KeyExtractionType.CUSTOM,
    key_extractor=extract_session_key
)
async def complex_endpoint():
    return {"data": "complex rate limiting"}

🛡️ Smart Bypass System

Create intelligent bypass rules for different user roles, maintenance modes, or business logic.

👑 Role-Based Bypass

def bypass_for_admins(request: Request) -> bool:
    """Bypass rate limiting for admin users."""
    user_role = request.headers.get("User-Role")
    return user_role in ["admin", "superuser"]

@app.get("/api/admin-only")
@throttle(
    limit=10, ttl=60,
    bypass_function=bypass_for_admins,
    custom_error_message="Admin endpoint requires admin privileges"
)
async def admin_endpoint():
    return {"data": "admin-only data"}

🎫 Premium User Bypass

def bypass_for_premium(request: Request) -> bool:
    """Bypass limits for premium subscribers."""
    subscription = request.headers.get("Subscription-Tier")
    return subscription in ["premium", "enterprise"]

@app.get("/api/premium-features")
@throttle(
    limit=5, ttl=60,  # Limits for free users
    bypass_function=bypass_for_premium
)
async def premium_features():
    return {"features": ["advanced", "priority"]}

🔧 Maintenance Mode Bypass

import os

def bypass_during_maintenance(request: Request) -> bool:
    """Bypass rate limiting during maintenance."""
    maintenance_mode = os.getenv("MAINTENANCE_MODE", "false").lower() == "true"
    maintenance_key = request.headers.get("Maintenance-Key")
    
    return maintenance_mode and maintenance_key == os.getenv("MAINTENANCE_SECRET")

@app.get("/api/critical")
@throttle(
    limit=100, ttl=3600,
    bypass_function=bypass_during_maintenance
)
async def critical_endpoint():
    return {"data": "critical system data"}

📊 Built-in Strategies

Fastrict comes with pre-configured strategies for common use cases:

from fastrict import RateLimitStrategy, RateLimitStrategyName

# Define custom strategies
custom_strategies = [
    RateLimitStrategy(
        name=RateLimitStrategyName.SHORT, 
        limit=3, 
        ttl=60
    ),      # Strict: 3 requests per minute
    
    RateLimitStrategy(
        name=RateLimitStrategyName.MEDIUM, 
        limit=20, 
        ttl=600
    ),     # Moderate: 20 requests per 10 minutes
    
    RateLimitStrategy(
        name=RateLimitStrategyName.LONG, 
        limit=100, 
        ttl=3600
    ),    # Generous: 100 requests per hour
]

app.add_middleware(
    RateLimitMiddleware,
    rate_limit_use_case=rate_limiter,
    default_strategies=custom_strategies,
    default_strategy_name=RateLimitStrategyName.MEDIUM
)

# Use predefined strategies
@app.post("/auth/login")
@throttle(strategy=RateLimitStrategyName.SHORT)  # Use strict limits
async def login():
    return {"message": "Login attempt"}

@app.get("/api/search") 
@throttle(strategy=RateLimitStrategyName.LONG)   # Use generous limits
async def search():
    return {"results": []}

🏗️ Storage Backends

⚡ Redis Backend (Recommended)

Perfect for production, supports clustering and persistence.

from fastrict import RedisRateLimitRepository

# Simple connection
repository = RedisRateLimitRepository.from_url("redis://localhost:6379")

# Advanced configuration
repository = RedisRateLimitRepository.from_url(
    redis_url="redis://:password@localhost:6379/0",
    key_prefix="myapp_limits",
    logger=my_logger
)

# Custom Redis client
import redis
redis_client = redis.Redis(
    host="localhost",
    port=6379,
    password="secret",
    decode_responses=True,
    socket_timeout=5,
    retry_on_timeout=True
)
repository = RedisRateLimitRepository(
    redis_client=redis_client,
    key_prefix="production_limits"
)

💾 Memory Backend (Development)

Great for testing and development environments.

from fastrict import MemoryRateLimitRepository

# In-memory storage (no persistence)
repository = MemoryRateLimitRepository(
    key_prefix="dev_limits",
    cleanup_interval=300  # Cleanup every 5 minutes
)

📊 Monitoring & Observability

📈 Standard HTTP Headers

Fastrict automatically adds industry-standard rate limiting headers:

HTTP/1.1 200 OK
X-RateLimit-Limit: 100           # Maximum requests in window
X-RateLimit-Remaining: 75        # Requests remaining in window  
X-RateLimit-Used: 25             # Requests used in window
X-RateLimit-Window: 3600         # Window duration in seconds

When rate limited (HTTP 429):

HTTP/1.1 429 Too Many Requests
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 0
X-RateLimit-Used: 100
X-RateLimit-Window: 3600
Retry-After: 1847                # Seconds until window resets

📱 Real-time Status Endpoint

@app.get("/api/rate-limit-status")
@throttle(bypass=True)  # Don't count status checks against limits
async def rate_limit_status(request: Request):
    """Get current rate limit status without incrementing counter."""
    result = rate_limiter.get_current_usage(
        request=request,
        middleware_rate_limit_mode=RateLimitMode.GLOBAL,
        route_path=request.url.path
    )
    
    return {
        "allowed": result.allowed,
        "current_count": result.current_count,
        "limit": result.limit,
        "remaining": result.remaining_requests,
        "reset_in_seconds": result.ttl,
        "usage_percentage": result.usage_percentage,
        "strategy": result.strategy_name,
        "key": result.key  # Rate limiting key used
    }

📋 Structured Error Responses

{
  "message": "Rate limit exceeded. Maximum 100 requests per 3600 seconds. Please try again in 1847 seconds.",
  "retry_after": 1847,
  "limit": 100,
  "window": 3600,
  "current_count": 100,
  "usage_percentage": 100.0,
  "strategy": "medium"
}

🔧 Custom Error Messages

@app.post("/api/critical")
@throttle(
    limit=5, ttl=60,
    custom_error_message="Critical endpoint allows only 5 requests per minute. Please use batch operations for bulk requests."
)
async def critical_operation():
    return {"status": "processing"}

🧪 Testing Your Rate Limits

📝 Unit Testing

import pytest
from fastapi.testclient import TestClient
from unittest.mock import Mock

def test_rate_limiting():
    # Mock Redis for testing
    mock_redis = Mock()
    repository = RedisRateLimitRepository(mock_redis)
    
    with TestClient(app) as client:
        # First request should succeed
        response = client.get("/api/data")
        assert response.status_code == 200
        assert "X-RateLimit-Remaining" in response.headers
        
        # Simulate rate limit exceeded
        mock_redis.zcard.return_value = 100  # Over limit
        response = client.get("/api/data")
        assert response.status_code == 429
        assert "Retry-After" in response.headers

🔄 Integration Testing

import asyncio
import httpx

async def test_concurrent_requests():
    """Test rate limiting under concurrent load."""
    async with httpx.AsyncClient() as client:
        # Fire 10 concurrent requests
        tasks = [
            client.get("http://localhost:8000/api/data")
            for _ in range(10)
        ]
        responses = await asyncio.gather(*tasks)
        
        # Check that some are rate limited
        success_count = sum(1 for r in responses if r.status_code == 200)
        rate_limited_count = sum(1 for r in responses if r.status_code == 429)
        
        assert success_count <= 5  # Our test limit
        assert rate_limited_count >= 5

🚨 Load Testing

# Install hey for load testing
go install github.com/rakyll/hey@latest

# Test rate limiting under load
hey -n 100 -c 10 -H "X-API-Key: test123" http://localhost:8000/api/data

# Expected output shows rate limiting in action:
# Status code distribution:
#   [200] 20 responses  (successful requests)
#   [429] 80 responses  (rate limited)

🚀 Performance Characteristics

⚡ Benchmarks

Metric Value Notes
Latency < 1ms Rate limit check overhead
Throughput 30K+ req/s Redis backend, single instance
Memory ~10MB Per 100K active keys
CPU < 1% Minimal overhead

📊 Scalability

# Horizontal scaling with Redis Cluster
repository = RedisRateLimitRepository.from_url(
    "redis://node1:7000,node2:7000,node3:7000",
    key_prefix="cluster_limits"
)

# Multiple app instances can share rate limits
# Perfect for microservices and load-balanced deployments

🏗️ Architecture & Design

Fastrict follows Clean Architecture principles:

src/fastrict/
├── entities/          # 🏛️  Core business models & enums
│   ├── models.py      #     RateLimitStrategy, RateLimitResult
│   └── enums.py       #     KeyExtractionType, RateLimitMode
├── use_cases/         # 🧠  Business logic & orchestration  
│   ├── rate_limit.py  #     Core rate limiting logic
│   └── key_extraction.py    Key extraction strategies
├── adapters/          # 🔌  External integrations
│   ├── redis_repository.py   Redis storage backend
│   └── memory_repository.py  In-memory storage backend  
└── frameworks/        # 🌐  FastAPI integration
    ├── middleware.py  #     Global rate limiting middleware
    └── decorator.py   #     @throttle route decorator

🎯 Design Principles

  • 🔒 Immutable Entities: Thread-safe by design
  • 🧪 Dependency Injection: Easy testing and mocking
  • 🔌 Interface Segregation: Swap backends seamlessly
  • 📦 Single Responsibility: Each component has one job
  • 🚀 Performance First: Optimized for high throughput

📊 Performance Benchmarks

Last updated: 2025-10-02 (MacOS 26, M3 Pro, conda chat environment)

Fastrict has been extensively tested for performance under various load conditions. Here are the benchmark results:

⚡ Single Request Performance

Metric Value Description
Single Request Latency 0.37 ms Ultra-fast rate limit check overhead

🏃‍♂️ Sequential Performance

Metric Value Description
Total Requests 1,000 Sequential test requests
Duration 0.35 seconds Total test time
Requests/Second 2,857 RPS Sequential throughput
Average Response Time 0.35 ms Mean response time
P95 Response Time 0.41 ms 95th percentile

🚀 Concurrent Performance (High Load)

Metric Value Description
Total Requests 1,000 50 users × 20 requests each
Duration 0.27 seconds Concurrent execution time
Requests/Second 3,676 RPS Concurrent throughput
Success Rate 100.0% Zero failures under load
Average Response Time 13.41 ms Mean response time
P95 Response Time 28.64 ms 95th percentile
P99 Response Time 28.93 ms 99th percentile

🛡️ Rate Limiting Accuracy

Metric Value Description
Total Requests 100 Concurrent requests to limited endpoint
Successful Requests 50 Requests within limit
Rate Limited Requests 50 Correctly blocked requests
Accuracy 100% Perfect rate limiting enforcement
Average Response Time 10.37 ms Fast even when blocking

💪 Extreme Load Test

Metric Value Description
Total Requests 1,000 100 users × 10 requests each
Requests/Second 3,639 RPS Sustained under extreme load
Success Rate 100.0% No failures under pressure
Error Rate 0.0% System stability maintained
P99 Response Time 32.56 ms Excellent tail latency

🔄 Sustained Load Endurance

Metric Value Description
Total Requests 913 10-second endurance test
Achieved RPS 91.24 Target: 100 RPS
Success Rate 100.0% No degradation over time
Average Response Time 1.95 ms Consistent performance
Performance Degradation 21.7% Minimal performance loss

🏆 Performance Highlights

  • Sub-millisecond latency: 0.37ms average response time
  • 🚀 3,600+ RPS: Exceptional concurrent throughput
  • 🎯 100% success rate: Perfect stability under load
  • 🛡️ 100% rate limiting accuracy: Precise enforcement
  • 💾 Memory efficient: Handles thousands of unique keys
  • 🔄 Minimal degradation: Stable performance over time

🧪 Test Environment

  • Hardware: MacOS, M1 Pro
  • Python: 3.10.16 (conda environment)
  • Backend: In-memory storage (optimal performance)
  • Test Framework: pytest + httpx + asyncio
  • Load Patterns: Sequential, concurrent, sustained, extreme scenarios

🔬 Run Performance Tests Yourself

Want to verify these results? Run the performance tests on your own system:

# Install dependencies
conda activate chat  # or your preferred environment
pip install pytest httpx pytest-asyncio uvicorn
pip install -e .

# Run comprehensive performance test suite
python -m pytest tests/test_performance.py -v

# Run live performance demo
python test/demo_performance.py

# Generate performance report
python test/run_performance_tests.py

See PERFORMANCE_SUMMARY.md and tests/PERFORMANCE.md for detailed testing documentation.

🚀 Real-World Performance

These benchmarks demonstrate that Fastrict can easily handle:

  • High-traffic APIs: 3,000+ requests per second
  • Real-time applications: Sub-millisecond response times
  • Microservices: Zero performance impact
  • Enterprise workloads: 100% stability under pressure

Performance may vary based on hardware, Redis configuration, and network conditions.

🎯 Real-World Examples

🏢 Multi-Tenant SaaS Application

def extract_tenant_key(request: Request) -> str:
    """Extract tenant-aware rate limiting key."""
    api_key = request.headers.get("X-API-Key", "")
    tenant_id = request.headers.get("X-Tenant-ID", "unknown")
    
    # Different limits based on subscription tier
    if api_key.startswith("ent_"):
        return f"enterprise:tenant:{tenant_id}"
    elif api_key.startswith("pro_"):
        return f"professional:tenant:{tenant_id}"
    else:
        return f"free:tenant:{tenant_id}"

# Different strategies per tier
enterprise_strategy = RateLimitStrategy(name=RateLimitStrategyName.CUSTOM, limit=10000, ttl=3600)
professional_strategy = RateLimitStrategy(name=RateLimitStrategyName.LONG, limit=1000, ttl=3600)
free_strategy = RateLimitStrategy(name=RateLimitStrategyName.MEDIUM, limit=100, ttl=3600)

@app.get("/api/analytics")
@throttle(
    limit=100,  # Free tier limit
    ttl=3600,
    key_type=KeyExtractionType.CUSTOM,
    key_extractor=extract_tenant_key
)
async def get_analytics():
    return {"analytics": "tenant-specific data"}

🛒 E-commerce API Protection

# Protect checkout process
@app.post("/api/checkout")
@throttle(
    limit=5, ttl=300,  # 5 checkouts per 5 minutes
    key_type=KeyExtractionType.HEADER,
    key_field="User-ID",
    custom_error_message="Too many checkout attempts. Please wait before trying again."
)
async def process_checkout():
    return {"order_id": "12345", "status": "processing"}

# Protect payment endpoints with combined key (user + IP)
@app.post("/api/payment")
@throttle(
    limit=3, ttl=600,  # 3 payment attempts per 10 minutes
    key_type=KeyExtractionType.COMBINED,
    key_combination=["header:User-ID", "ip"],
    custom_error_message="Payment rate limit exceeded. Contact support if you need assistance."
)
async def process_payment():
    return {"payment_id": "pay_123", "status": "success"}

🔐 Authentication & Security

# Login rate limiting with exponential backoff
@app.post("/auth/login")
@throttle(
    limit=5, ttl=900,  # 5 login attempts per 15 minutes
    key_type=KeyExtractionType.COMBINED,
    key_combination=["ip", "form_field:username"],
    custom_error_message="Too many login attempts. Account temporarily locked."
)
async def login():
    return {"token": "jwt_token", "expires_in": 3600}

# Password reset protection  
@app.post("/auth/password-reset")
@throttle(
    limit=3, ttl=3600,  # 3 password resets per hour
    key_type=KeyExtractionType.FORM_FIELD,
    key_field="email",
    custom_error_message="Password reset limit exceeded. Try again in an hour."
)
async def password_reset():
    return {"message": "Password reset email sent"}

# 2FA verification
@app.post("/auth/verify-2fa")
@throttle(
    limit=10, ttl=300,  # 10 attempts per 5 minutes
    key_type=KeyExtractionType.HEADER,
    key_field="Session-ID",
    custom_error_message="Too many 2FA verification attempts."
)
async def verify_2fa():
    return {"verified": True}

📱 Mobile API with Device Limits

def extract_device_key(request: Request) -> str:
    """Rate limit by device fingerprint."""
    device_id = request.headers.get("Device-ID")
    app_version = request.headers.get("App-Version", "unknown")
    platform = request.headers.get("Platform", "unknown")
    
    if device_id:
        return f"device:{device_id}:{platform}:{app_version}"
    else:
        return f"ip:{request.client.host}"

@app.get("/api/mobile/sync")
@throttle(
    limit=100, ttl=3600,  # 100 syncs per hour per device
    key_type=KeyExtractionType.CUSTOM,
    key_extractor=extract_device_key
)
async def mobile_sync():
    return {"sync_data": "device-specific data"}

🤖 Bot Protection & Scraping Prevention

def detect_bot(request: Request) -> bool:
    """Detect and allow verified bots."""
    user_agent = request.headers.get("User-Agent", "").lower()
    bot_token = request.headers.get("Bot-Token")
    
    # Allow verified search engine bots
    verified_bots = ["googlebot", "bingbot", "slurp"]
    if any(bot in user_agent for bot in verified_bots):
        return True
        
    # Allow bots with valid tokens
    return bot_token in os.getenv("VALID_BOT_TOKENS", "").split(",")

@app.get("/api/public-data")
@throttle(
    limit=10, ttl=60,  # Strict limits for non-bots
    bypass_function=detect_bot,
    key_type=KeyExtractionType.COMBINED,
    key_combination=["ip", "header:User-Agent"]
)
async def public_data():
    return {"data": "public information"}

🔧 Configuration Examples

🌍 Environment-Based Configuration

import os
from fastrict import RateLimitStrategy, RateLimitStrategyName

def get_rate_limit_config():
    """Get rate limit configuration based on environment."""
    env = os.getenv("ENVIRONMENT", "development")
    
    if env == "production":
        return {
            "strategies": [
                RateLimitStrategy(name=RateLimitStrategyName.SHORT, limit=5, ttl=60),
                RateLimitStrategy(name=RateLimitStrategyName.MEDIUM, limit=50, ttl=600),
                RateLimitStrategy(name=RateLimitStrategyName.LONG, limit=500, ttl=3600),
            ],
            "redis_url": os.getenv("REDIS_URL"),
            "key_prefix": "prod_limits"
        }
    elif env == "staging":
        return {
            "strategies": [
                RateLimitStrategy(name=RateLimitStrategyName.SHORT, limit=10, ttl=60),
                RateLimitStrategy(name=RateLimitStrategyName.MEDIUM, limit=100, ttl=600),
                RateLimitStrategy(name=RateLimitStrategyName.LONG, limit=1000, ttl=3600),
            ],
            "redis_url": os.getenv("REDIS_URL", "redis://localhost:6379/1"),
            "key_prefix": "staging_limits"
        }
    else:  # development
        return {
            "strategies": [
                RateLimitStrategy(name=RateLimitStrategyName.SHORT, limit=100, ttl=60),
                RateLimitStrategy(name=RateLimitStrategyName.MEDIUM, limit=1000, ttl=600),
                RateLimitStrategy(name=RateLimitStrategyName.LONG, limit=10000, ttl=3600),
            ],
            "redis_url": "redis://localhost:6379/0",
            "key_prefix": "dev_limits"
        }

# Apply configuration
config = get_rate_limit_config()
repository = RedisRateLimitRepository.from_url(
    redis_url=config["redis_url"],
    key_prefix=config["key_prefix"]
)

app.add_middleware(
    RateLimitMiddleware,
    rate_limit_use_case=rate_limiter,
    default_strategies=config["strategies"],
    default_strategy_name=RateLimitStrategyName.MEDIUM
)

📋 Feature Flags Integration

def feature_flag_bypass(request: Request) -> bool:
    """Bypass rate limiting based on feature flags."""
    # Integration with feature flag service
    user_id = request.headers.get("User-ID")
    
    if user_id:
        # Check if user has rate limiting bypass feature enabled
        return feature_flag_service.is_enabled(
            flag="rate_limiting_bypass", 
            user_id=user_id
        )
    return False

@app.get("/api/experimental")
@throttle(
    limit=10, ttl=300,
    bypass_function=feature_flag_bypass
)
async def experimental_feature():
    return {"feature": "experimental"}

🤝 Contributing

We welcome contributions! Fastrict is built with ❤️ by the community.

🚀 Quick Start for Contributors

# Fork and clone the repository
git clone https://github.com/yourusername/fastrict.git
cd fastrict

# Install development dependencies
pip install -e ".[dev]"

# Run tests
pytest

# Run linting
black src tests
flake8 src tests
mypy src

# Run the example
python src/examples/simple_example.py

📋 Contribution Guidelines

  • 🐛 Bug Reports: Use the issue tracker with detailed reproduction steps
  • ✨ Feature Requests: Propose new features with use cases
  • 📝 Documentation: Help improve our docs and examples
  • 🧪 Tests: Maintain 100% test coverage
  • 🎨 Code Style: Follow Ruff formatting and type hints

🏗️ Development Workflow

  1. Fork the repository
  2. Create a feature branch: git checkout -b feature/amazing-feature
  3. Make your changes with tests
  4. Run the test suite: pytest --cov=fastrict
  5. Commit with clear messages: git commit -m 'Add amazing feature'
  6. Push to your fork: git push origin feature/amazing-feature
  7. Create a Pull Request

📚 Resources & Documentation

📖 Documentation

🆘 Support Channels

🔗 Related Projects

  • FastAPI - Modern, fast web framework for building APIs
  • Redis - In-memory data structure store
  • Starlette - Lightweight ASGI framework
  • Pydantic - Data validation using Python type hints

📄 License

This project is licensed under the MIT License - see the LICENSE file for details.

📈 Changelog & Roadmap

🎯 Current Version: v0.1.1

See CHANGELOG.md for version history and release notes.

🚀 Upcoming Features

  • 🌐 GraphQL Support: Rate limiting for GraphQL endpoints
  • 🌐 Django Support: Rate limiting for Django applications
  • 📊 Prometheus Metrics: Built-in metrics collection
  • 🔄 Circuit Breaker: Integrate with circuit breaker patterns
  • 🎯 Rate Limit Warming: Gradual limit increases
  • 📱 WebSocket Support: Rate limiting for WebSocket connections

Fastrict - Powering the next generation of FastAPI applications

⬆️ Back to Top