Skip to content

End-to-end data pipeline showcasing file ingestion, transformation, testing, and CI/CD deployment with Python, PostgreSQL, Docker, and GitHub Actions.

Notifications You must be signed in to change notification settings

DHANA5982/Scalable-Config-Driven-Multi-Source-ETL-Pipeline

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

63 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

πŸ”„ Scalable Config-Driven Multi-Source ETL Pipeline

CI Python Docker PostgreSQL Pytest

Developed a production-grade, automated data integration pipeline to unify fragmented sales data across multiple formats (CSV, JSON, Parquet) for enterprise analytics and reporting.

A robust ETL data pipeline built with Python that demonstrates enterprise-level data engineering practices including:

  • 🎯 Config-driven architecture using YAML with environment variable overrides
  • πŸ“Š Multi-format data ingestion (CSV, JSON, Parquet)
  • πŸ”— Data transformation & merging with Pandas
  • πŸ—„οΈ Dual database support (PostgreSQL & SQLite) with SQLAlchemy ORM
  • πŸ“ Structured logging with file-based outputs
  • 🐳 Docker & Docker Compose orchestration with health checks
  • πŸ§ͺ Comprehensive testing with pytest
  • πŸš€ CI/CD pipeline with GitHub Actions
  • πŸ”§ CLI parameter overrides for operational flexibility

This project simulates a real-world data engineering workflow and serves as a foundation for scaling to tools like Apache Airflow, DBT, and Kubernetes.


πŸ“‹ Table of Contents


πŸš€ Features

Core Capabilities

βœ… Multi-format data extraction - Read from CSV, JSON, and Parquet files
βœ… Intelligent data merging - Join multiple datasets on configurable keys
βœ… Flexible output formats - Export to CSV or Parquet
βœ… Database integration - Load processed data to PostgreSQL or SQLite
βœ… YAML configuration - Centralized, environment-agnostic settings
βœ… Environment variable support - .env file integration for sensitive data

Engineering Best Practices

βœ… Structured logging - File-based logs with timestamps and levels
βœ… Exception handling - Graceful error recovery with detailed logging
βœ… CLI argument parsing - Runtime parameter overrides
βœ… Unit & integration testing - Comprehensive pytest coverage
βœ… Docker containerization - Reproducible deployment environment
βœ… Health checks - Database readiness validation
βœ… CI/CD automation - GitHub Actions for testing and deployment


πŸ—οΈ Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   CSV File  │────▢│              β”‚     β”‚  PostgreSQL β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€     β”‚              β”‚     β”‚      or     β”‚
β”‚  JSON File  │────▢│  ETL Engine  │────▢│   SQLite    β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€     β”‚   (Pandas)   β”‚     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚Parquet File │────▢│              β”‚            β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β–Ό
                           β”‚              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                           β”‚              β”‚ Output File β”‚
                           β–Ό              β”‚ (CSV/Parquet)β”‚
                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                    β”‚  Log System  β”‚
                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Key Components:

  1. Configuration Layer - YAML-based settings with .env overrides
  2. Extraction Layer - Multi-format file readers
  3. Transformation Layer - Pandas-based data merging and cleaning
  4. Loading Layer - SQLAlchemy database writers + file export
  5. Observability Layer - Structured logging for monitoring

πŸ“‚ Project Structure

Scalable Config-Driven Multi-Source ETL Pipeline/
β”œβ”€β”€ .github/
β”‚   └── workflows/
β”‚       └── ci.yml              # GitHub Actions CI/CD workflow
β”œβ”€β”€ config/
β”‚   └── config.yaml             # Pipeline configuration (paths, DB settings)
β”œβ”€β”€ data/
β”‚   β”œβ”€β”€ sales.csv               # Source: Sales transactions
β”‚   β”œβ”€β”€ products.json           # Source: Product catalog
β”‚   β”œβ”€β”€ region.parquet          # Source: Regional data
β”‚   β”œβ”€β”€ merged_data.csv         # Output: Merged dataset (CSV)
β”‚   └── merged_data.parquet     # Output: Merged dataset (Parquet)
β”œβ”€β”€ logs/
β”‚   └── Pipeline.log            # Runtime logs with timestamps
β”œβ”€β”€ src/
β”‚   β”œβ”€β”€ __init__.py             # Package initialization
β”‚   └── pipeline.py             # Main ETL pipeline logic
β”œβ”€β”€ tests/
β”‚   β”œβ”€β”€ __init__.py             # Test package initialization
β”‚   └── test_pipeline.py        # Pytest test suite
β”œβ”€β”€ .env                        # Environment variables (DB credentials)
β”œβ”€β”€ docker-compose.yml          # Multi-container orchestration
β”œβ”€β”€ Dockerfile                  # Pipeline container definition
β”œβ”€β”€ requirements.txt            # Python dependencies
└── README.md                   # Project documentation

πŸ”§ Prerequisites

  • Python 3.10+ - Core runtime
  • Docker & Docker Compose (optional) - For containerized deployment
  • PostgreSQL 15+ (optional) - Production database
  • Git - Version control

πŸ“₯ Installation

Option 1: Local Setup

  1. Clone the repository
git clone https://github.com/DHANA5982/Scalable-Multi-Source-ETL-Pipeline.git
  1. Create a virtual environment
python -m venv .venv
# Windows
.venv\Scripts\activate
# Linux/Mac
source venv/bin/activate
  1. Install dependencies
pip install --upgrade pip
pip install -r requirements.txt
  1. Configure environment variables (optional for PostgreSQL)
# Create .env file in project root
POSTGRES_USER=postgres
POSTGRES_PASSWORD=your_secure_password
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_DB=merged_data_db

Option 2: Docker Setup

# Build and start services
docker-compose up --build

# Run in detached mode
docker-compose up -d --build

# View logs
docker-compose logs -f pipeline

βš™οΈ Configuration

config/config.yaml

data_paths:
  csv_file: "../data/sales.csv"
  json_file: "../data/products.json"
  parquet_file: "../data/region.parquet"
  output_file: "../data/merged_data.csv"

logging:
  log_dir: "../logs"
  log_file: "Pipeline.log"
  log_level: "INFO"              # Options: DEBUG, INFO, WARNING, ERROR

database:
  type: "postgres"                # Options: postgres, sqlite
  host: "postgres"                # Use "localhost" for local setup
  port: 5432
  user: "postgres"                # Overridden by .env if present
  password: "postgres"            # Overridden by .env if present
  name: "merged_data_db"
  table: "merged_table"

pipeline:
  merge_on:
    - order_id
    - product_id
  output_format: "csv"            # Options: csv, parquet

Configuration Priority:
.env variables > CLI arguments > config.yaml defaults


πŸš€ Usage

Basic Execution

# Run with default config
cd src
python src/pipeline.py

# Specify custom config file
python src/pipeline.py --config 'path/to/config.yaml'

Advanced CLI Overrides

# Override individual file paths
cd src
python src/pipeline.py \
  --csv_file 'data/new_sales.csv' \
  --json_file 'data/new_products.json' \
  --output_file 'data/custom_output.csv' \

# Change output format to Parquet
cd src
python src/pipeline.py \
    --output_file 'data/output.parquet' \
    --output_format 'parquet'

Using SQLite Instead of PostgreSQL

# In config.yaml, change database type:
database:
  type: "sqlite"
  name: "../data/pipeline.db"
  table: "merged_table"

🐳 Docker Deployment

Architecture

The Docker setup includes:

  • PostgreSQL container - Database with health checks
  • Pipeline container - Python ETL application
  • Volume mounts - Persistent data, logs, and config
  • Service dependencies - Pipeline waits for DB readiness

Commands

# Start all services
docker-compose up -d

# Check service status
docker-compose ps

# View pipeline logs
docker-compose logs -f pipeline

# Stop all services
docker-compose down

# Clean up (including volumes)
docker-compose down -v

Environment Setup

Create a .env file in the project root:

POSTGRES_USER=postgres
POSTGRES_PASSWORD=secure_password_here
POSTGRES_HOST=postgres
POSTGRES_PORT=5432
POSTGRES_DB=merged_data_db

πŸ§ͺ Testing

Run All Tests

# Basic test execution
pytest

# Verbose output with coverage
pytest -v --cov=src tests/

# Run specific test file
pytest tests/test_pipeline.py

# Run specific test function
pytest tests/test_pipeline.py::test_merge_data

Test Coverage

The test suite includes:

Test Type Coverage
Configuration Loading YAML parsing, error handling
File Readers CSV, JSON, Parquet validation
Data Transformation Merge logic, column integrity
Output Generation File creation, format validation
Database Loading PostgreSQL & SQLite integration
Error Handling Exception capture, logging

Sample Test Output

==================== test session starts ====================
tests/test_pipeline.py::test_config_load PASSED
tests/test_pipeline.py::test_read_file_csv PASSED
tests/test_pipeline.py::test_merge_data PASSED
tests/test_pipeline.py::test_save_output PASSED
tests/test_pipeline.py::test_load_to_db_postgres_or_sqlite PASSED
==================== 7 passed in 2.34s ====================

πŸ”„ CI/CD Pipeline

GitHub Actions Workflow

Located at .github/workflows/ci.yml, the CI pipeline includes:

Triggers:

  • Push to main or master branch
  • Pull requests to main or master

Jobs:

  1. Test Job
    • Sets up Python 3.10
    • Installs dependencies from requirements.txt
    • Starts PostgreSQL service container
    • Runs pytest with database connectivity
    • Validates code against production environment

Services:

  • PostgreSQL 15 with health checks
  • Environment variable injection from GitHub Secrets

Setting Up CI/CD

  1. Add repository secrets in GitHub:

    • POSTGRES_USER
    • POSTGRES_PASSWORD
    • POSTGRES_DB
  2. Push code to trigger workflow:

git add .
git commit -m "feat: update pipeline logic"
git push origin main

πŸ“Š Data Flow

Input Data Structure

sales.csv

order_id,product_id,quantity,price
1,101,2,29.99
2,102,1,49.99

products.json

[
  {"product_id": 101, "name": "Widget A", "category": "Electronics"},
  {"product_id": 102, "name": "Widget B", "category": "Home"}
]

region.parquet

order_id | region | store_id
---------|--------|----------
1        | West   | S001
2        | East   | S002

Output Data Structure

merged_data.csv / merged_data.parquet

order_id,product_id,quantity,price,name,category,region,store_id
1,101,2,29.99,Widget A,Electronics,West,S001
2,102,1,49.99,Widget B,Home,East,S002

Database Schema

Table: merged_table

CREATE TABLE merged_table (
    order_id INTEGER,
    product_id INTEGER,
    quantity INTEGER,
    price FLOAT,
    name VARCHAR(255),
    category VARCHAR(100),
    region VARCHAR(50),
    store_id VARCHAR(20)
);

πŸ› Troubleshooting

Common Issues

1. Import Error: "pytest could not be resolved"

Solution:

# Install pytest in your virtual environment
pip install pytest

# Verify installation
python -m pytest --version

# Reload VS Code window
# Ctrl+Shift+P β†’ "Developer: Reload Window"

2. Database Connection Failed

Solution:

# Check PostgreSQL is running
docker-compose ps

# Verify connection parameters in .env
cat .env

# Test connection manually
docker-compose exec -it postgres psql -U postgres -d merged_data_db

3. File Not Found Errors

Solution:

# Ensure you're in the project root
cd "Scalable Mutli-Source ETL Pipeline"

# Verify data files exist
ls data/

# Check config paths are relative
cat config/config.yaml

4. Docker Build Issues

Solution:

# Clean rebuild
docker-compose down -v
docker-compose build --no-cache
docker-compose up

Log Analysis

# View pipeline logs
cat logs/Pipeline.log

# Filter errors only
grep ERROR logs/Pipeline.log

# Real-time log monitoring
tail -f logs/Pipeline.log

πŸ“ˆ Future Enhancements

  • Add Apache Airflow DAG orchestration
  • Implement data quality checks (Great Expectations)
  • Add incremental loading with change data capture (CDC)
  • Integrate DBT for transformation layer
  • Add data lineage tracking
  • Implement alerting system (Slack/Email)
  • Add support for cloud storage (S3, Azure Blob)
  • Create interactive dashboard with Streamlit
  • Add data profiling and validation
  • Implement row-level security

🀝 Contributing

Contributions are welcome! Please follow these steps:

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

πŸ‘¨β€πŸ’» Author

DHANA5982
GitHub: @DHANA5982


🌟 Acknowledgments

  • Built with best practices from modern data engineering
  • Inspired by production ETL workflows
  • Designed for learning and portfolio demonstration

πŸ“ž Support

For issues, questions, or suggestions:

  • Open an issue on GitHub
  • Check existing documentation
  • Review the troubleshooting section above

⭐ If you find this project helpful, please consider giving it a star!