A service that polls NestJS APIs for AI tasks and processes them using OpenAI, local Ollama models. Features OAuth2 authentication, multi-tier rate limiting.
- Text embeddings generation using OpenAI API, Ollama (local)
- Processing modes:
openai,ollama, orhybrid(Ollama first, OpenAI fallback) - OAuth2 authentication via Ory Cloud with automatic token refresh
- Multi-tier rate limiting with persistent storage (minute/hour/day/week/month)
- Resilience: Circuit breakers, retry logic, graceful shutdown
openai: Uses OpenAI API for all embeddings. RequiresOPENAI_API_KEY. Supports any OpenAI model.ollama: Uses local Ollama models exclusively.OPENAI_API_KEYnot required. Only processes models inSUPPORTED_MODELS. Downloads models on startup.hybrid: Tries Ollama first, falls back to OpenAI. Requires both Ollama andOPENAI_API_KEY.
cp .env.example .envEdit .env with required settings:
# API Integration (required)
API_BASE_URL=http://localhost:3000
# OAuth2 Authentication (required)
ORY_PROJECT_SLUG=your-project-slug
OAUTH2_CLIENT_ID=your-client-id
OAUTH2_CLIENT_SECRET=your-client-secret
# Processing Mode
PROCESSING_MODE=openai # or ollama, hybrid
# OpenAI API Key (required for openai/hybrid, optional for ollama)
OPENAI_API_KEY=sk-your-key
# Ollama Models (for ollama/hybrid modes)
SUPPORTED_MODELS=["nomic-embed-text","dengcao/Qwen3-Embedding-0.6B:Q8_0"]docker-compose up -d
docker-compose logs -f ai-task-processorOllama models download automatically on first startup when using ollama or hybrid mode.
All configuration via environment variables (see .env.example for complete list).
Processing:
PROCESSING_MODE:openai,ollama, orhybrid(default:openai)OPENAI_API_KEY: OpenAI API key (required foropenai/hybrid, optional forollama)SUPPORTED_MODELS: JSON array of Ollama models forollama/hybridmodes (default:["nomic-embed-text","dengcao/Qwen3-Embedding-0.6B:Q8_0"])
Rate Limiting:
RATE_LIMIT_ENABLED: Enable rate limiting (default:true)RATE_LIMIT_PER_MINUTE,RATE_LIMIT_PER_HOUR,RATE_LIMIT_PER_DAY,RATE_LIMIT_PER_WEEK,RATE_LIMIT_PER_MONTH: Set to0to disable individual limitsRATE_LIMIT_STRATEGY:rollingorfixed(default:rolling)
Advanced:
POLLING_INTERVAL_SECONDS: Task polling frequency (default:30)CONCURRENCY_LIMIT: Max parallel tasks (default:5)CIRCUIT_BREAKER_THRESHOLD: Failures before circuit opens (default:5)
Integrates with NestJS APIs via OAuth2-protected endpoints:
GET /api/ai-tasks/pending?limit=10- Fetch pending tasksPATCH /api/ai-tasks/:id- Update task status/results
{
"_id": "task-id",
"type": "text_embedding",
"state": "pending",
"content": {"text": "Text to embed", "model": "nomic-embed-text"},
"callbackRoute": "verification_update_embedding",
"callbackParams": {"targetId": "doc-id", "field": "embedding"},
"createdAt": "2024-01-01T00:00:00.000Z"
}- Poll
/api/ai-tasks/pendingevery 30 seconds with OAuth2 Bearer token - Validate model is supported by current processing mode
- Generate embeddings via OpenAI or Ollama
- Update task via
PATCH /api/ai-tasks/:id - Respect rate limits
OpenAI (cloud):
- Supports any OpenAI embedding model (e.g.,
text-embedding-3-small,text-embedding-ada-002) - Models hosted by OpenAI, no downloads needed
Ollama (local):
- Only processes models in
SUPPORTED_MODELSconfiguration - Auto-downloads on startup (e.g.,
nomic-embed-text,dengcao/Qwen3-Embedding-0.6B:Q8_0) - Models persist in Docker volumes
Switch modes: Edit PROCESSING_MODE in .env and run docker-compose restart ai-task-processor
Mock Processing: Use OPENAI_API_KEY=your_openai_api_key_here (placeholder) to enable mock embeddings for testing without API costs.
New Task Types:
- Add enum to
TaskTypeinai_task_processor/models/task.py - Create input/output models
- Implement processor inheriting from
BaseProcessor - Register in
ProcessorFactory.__init__()
New AI Providers:
- Create provider class inheriting from
EmbeddingProvider(seeembedding_providers.py) - Implement
supports_model()andcreate_embedding() - Register in
EmbeddingProviderFactory.create_provider()
- OAuth2 authentication with automatic token refresh
- Circuit breaker for API resilience
- Multi-tier rate limiting with SQLite persistence
- Graceful shutdown with signal management
- Structured logging with correlation IDs
- Docker containerization with health checks