Developed a production-grade, automated data integration pipeline to unify fragmented sales data across multiple formats (CSV, JSON, Parquet) for enterprise analytics and reporting.
A robust ETL data pipeline built with Python that demonstrates enterprise-level data engineering practices including:
- π― Config-driven architecture using YAML with environment variable overrides
- π Multi-format data ingestion (CSV, JSON, Parquet)
- π Data transformation & merging with Pandas
- ποΈ Dual database support (PostgreSQL & SQLite) with SQLAlchemy ORM
- π Structured logging with file-based outputs
- π³ Docker & Docker Compose orchestration with health checks
- π§ͺ Comprehensive testing with pytest
- π CI/CD pipeline with GitHub Actions
- π§ CLI parameter overrides for operational flexibility
This project simulates a real-world data engineering workflow and serves as a foundation for scaling to tools like Apache Airflow, DBT, and Kubernetes.
- Features
- Architecture
- Project Structure
- Prerequisites
- Installation
- Configuration
- Usage
- Docker Deployment
- Testing
- CI/CD Pipeline
- Data Flow
- Environment Variables
- Troubleshooting
- Contributing
β
Multi-format data extraction - Read from CSV, JSON, and Parquet files
β
Intelligent data merging - Join multiple datasets on configurable keys
β
Flexible output formats - Export to CSV or Parquet
β
Database integration - Load processed data to PostgreSQL or SQLite
β
YAML configuration - Centralized, environment-agnostic settings
β
Environment variable support - .env file integration for sensitive data
β
Structured logging - File-based logs with timestamps and levels
β
Exception handling - Graceful error recovery with detailed logging
β
CLI argument parsing - Runtime parameter overrides
β
Unit & integration testing - Comprehensive pytest coverage
β
Docker containerization - Reproducible deployment environment
β
Health checks - Database readiness validation
β
CI/CD automation - GitHub Actions for testing and deployment
βββββββββββββββ ββββββββββββββββ βββββββββββββββ
β CSV File ββββββΆβ β β PostgreSQL β
βββββββββββββββ€ β β β or β
β JSON File ββββββΆβ ETL Engine ββββββΆβ SQLite β
βββββββββββββββ€ β (Pandas) β βββββββββββββββ
βParquet File ββββββΆβ β β
βββββββββββββββ ββββββββββββββββ βΌ
β βββββββββββββββ
β β Output File β
βΌ β (CSV/Parquet)β
ββββββββββββββββ βββββββββββββββ
β Log System β
ββββββββββββββββ
Key Components:
- Configuration Layer - YAML-based settings with
.envoverrides - Extraction Layer - Multi-format file readers
- Transformation Layer - Pandas-based data merging and cleaning
- Loading Layer - SQLAlchemy database writers + file export
- Observability Layer - Structured logging for monitoring
Scalable Config-Driven Multi-Source ETL Pipeline/
βββ .github/
β βββ workflows/
β βββ ci.yml # GitHub Actions CI/CD workflow
βββ config/
β βββ config.yaml # Pipeline configuration (paths, DB settings)
βββ data/
β βββ sales.csv # Source: Sales transactions
β βββ products.json # Source: Product catalog
β βββ region.parquet # Source: Regional data
β βββ merged_data.csv # Output: Merged dataset (CSV)
β βββ merged_data.parquet # Output: Merged dataset (Parquet)
βββ logs/
β βββ Pipeline.log # Runtime logs with timestamps
βββ src/
β βββ __init__.py # Package initialization
β βββ pipeline.py # Main ETL pipeline logic
βββ tests/
β βββ __init__.py # Test package initialization
β βββ test_pipeline.py # Pytest test suite
βββ .env # Environment variables (DB credentials)
βββ docker-compose.yml # Multi-container orchestration
βββ Dockerfile # Pipeline container definition
βββ requirements.txt # Python dependencies
βββ README.md # Project documentation
- Python 3.10+ - Core runtime
- Docker & Docker Compose (optional) - For containerized deployment
- PostgreSQL 15+ (optional) - Production database
- Git - Version control
- Clone the repository
git clone https://github.com/DHANA5982/Scalable-Multi-Source-ETL-Pipeline.git- Create a virtual environment
python -m venv .venv
# Windows
.venv\Scripts\activate
# Linux/Mac
source venv/bin/activate- Install dependencies
pip install --upgrade pip
pip install -r requirements.txt- Configure environment variables (optional for PostgreSQL)
# Create .env file in project root
POSTGRES_USER=postgres
POSTGRES_PASSWORD=your_secure_password
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_DB=merged_data_db# Build and start services
docker-compose up --build
# Run in detached mode
docker-compose up -d --build
# View logs
docker-compose logs -f pipelinedata_paths:
csv_file: "../data/sales.csv"
json_file: "../data/products.json"
parquet_file: "../data/region.parquet"
output_file: "../data/merged_data.csv"
logging:
log_dir: "../logs"
log_file: "Pipeline.log"
log_level: "INFO" # Options: DEBUG, INFO, WARNING, ERROR
database:
type: "postgres" # Options: postgres, sqlite
host: "postgres" # Use "localhost" for local setup
port: 5432
user: "postgres" # Overridden by .env if present
password: "postgres" # Overridden by .env if present
name: "merged_data_db"
table: "merged_table"
pipeline:
merge_on:
- order_id
- product_id
output_format: "csv" # Options: csv, parquetConfiguration Priority:
.env variables > CLI arguments > config.yaml defaults
# Run with default config
cd src
python src/pipeline.py
# Specify custom config file
python src/pipeline.py --config 'path/to/config.yaml'# Override individual file paths
cd src
python src/pipeline.py \
--csv_file 'data/new_sales.csv' \
--json_file 'data/new_products.json' \
--output_file 'data/custom_output.csv' \
# Change output format to Parquet
cd src
python src/pipeline.py \
--output_file 'data/output.parquet' \
--output_format 'parquet'# In config.yaml, change database type:
database:
type: "sqlite"
name: "../data/pipeline.db"
table: "merged_table"The Docker setup includes:
- PostgreSQL container - Database with health checks
- Pipeline container - Python ETL application
- Volume mounts - Persistent data, logs, and config
- Service dependencies - Pipeline waits for DB readiness
# Start all services
docker-compose up -d
# Check service status
docker-compose ps
# View pipeline logs
docker-compose logs -f pipeline
# Stop all services
docker-compose down
# Clean up (including volumes)
docker-compose down -vCreate a .env file in the project root:
POSTGRES_USER=postgres
POSTGRES_PASSWORD=secure_password_here
POSTGRES_HOST=postgres
POSTGRES_PORT=5432
POSTGRES_DB=merged_data_db# Basic test execution
pytest
# Verbose output with coverage
pytest -v --cov=src tests/
# Run specific test file
pytest tests/test_pipeline.py
# Run specific test function
pytest tests/test_pipeline.py::test_merge_dataThe test suite includes:
| Test Type | Coverage |
|---|---|
| Configuration Loading | YAML parsing, error handling |
| File Readers | CSV, JSON, Parquet validation |
| Data Transformation | Merge logic, column integrity |
| Output Generation | File creation, format validation |
| Database Loading | PostgreSQL & SQLite integration |
| Error Handling | Exception capture, logging |
==================== test session starts ====================
tests/test_pipeline.py::test_config_load PASSED
tests/test_pipeline.py::test_read_file_csv PASSED
tests/test_pipeline.py::test_merge_data PASSED
tests/test_pipeline.py::test_save_output PASSED
tests/test_pipeline.py::test_load_to_db_postgres_or_sqlite PASSED
==================== 7 passed in 2.34s ====================Located at .github/workflows/ci.yml, the CI pipeline includes:
Triggers:
- Push to
mainormasterbranch - Pull requests to
mainormaster
Jobs:
- Test Job
- Sets up Python 3.10
- Installs dependencies from
requirements.txt - Starts PostgreSQL service container
- Runs pytest with database connectivity
- Validates code against production environment
Services:
- PostgreSQL 15 with health checks
- Environment variable injection from GitHub Secrets
-
Add repository secrets in GitHub:
POSTGRES_USERPOSTGRES_PASSWORDPOSTGRES_DB
-
Push code to trigger workflow:
git add .
git commit -m "feat: update pipeline logic"
git push origin mainsales.csv
order_id,product_id,quantity,price
1,101,2,29.99
2,102,1,49.99products.json
[
{"product_id": 101, "name": "Widget A", "category": "Electronics"},
{"product_id": 102, "name": "Widget B", "category": "Home"}
]region.parquet
order_id | region | store_id
---------|--------|----------
1 | West | S001
2 | East | S002
merged_data.csv / merged_data.parquet
order_id,product_id,quantity,price,name,category,region,store_id
1,101,2,29.99,Widget A,Electronics,West,S001
2,102,1,49.99,Widget B,Home,East,S002Table: merged_table
CREATE TABLE merged_table (
order_id INTEGER,
product_id INTEGER,
quantity INTEGER,
price FLOAT,
name VARCHAR(255),
category VARCHAR(100),
region VARCHAR(50),
store_id VARCHAR(20)
);Solution:
# Install pytest in your virtual environment
pip install pytest
# Verify installation
python -m pytest --version
# Reload VS Code window
# Ctrl+Shift+P β "Developer: Reload Window"Solution:
# Check PostgreSQL is running
docker-compose ps
# Verify connection parameters in .env
cat .env
# Test connection manually
docker-compose exec -it postgres psql -U postgres -d merged_data_dbSolution:
# Ensure you're in the project root
cd "Scalable Mutli-Source ETL Pipeline"
# Verify data files exist
ls data/
# Check config paths are relative
cat config/config.yamlSolution:
# Clean rebuild
docker-compose down -v
docker-compose build --no-cache
docker-compose up# View pipeline logs
cat logs/Pipeline.log
# Filter errors only
grep ERROR logs/Pipeline.log
# Real-time log monitoring
tail -f logs/Pipeline.log- Add Apache Airflow DAG orchestration
- Implement data quality checks (Great Expectations)
- Add incremental loading with change data capture (CDC)
- Integrate DBT for transformation layer
- Add data lineage tracking
- Implement alerting system (Slack/Email)
- Add support for cloud storage (S3, Azure Blob)
- Create interactive dashboard with Streamlit
- Add data profiling and validation
- Implement row-level security
Contributions are welcome! Please follow these steps:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
DHANA5982
GitHub: @DHANA5982
- Built with best practices from modern data engineering
- Inspired by production ETL workflows
- Designed for learning and portfolio demonstration
For issues, questions, or suggestions:
- Open an issue on GitHub
- Check existing documentation
- Review the troubleshooting section above
β If you find this project helpful, please consider giving it a star!