설정 기반의 시각적 데이터 파이프라인 플랫폼
코드 없이 웹 UI에서 ETL/ELT 파이프라인을 구성하고 실행할 수 있는 데이터 플랫폼입니다.
- 비주얼 ETL 에디터: 드래그 앤 드롭으로 데이터 파이프라인 구성
- 다중 소스 지원: PostgreSQL, MySQL, MongoDB, S3, REST API
- 실시간 & 배치: CDC(Change Data Capture) + 배치 ETL
- 데이터 카탈로그: 자동화된 메타데이터 관리 및 검색
- 데이터 품질: 자동 품질 체크 및 모니터링
- AI 쿼리 어시스턴트: 자연어 → SQL 변환 (AWS Bedrock)
- 리니지 추적: 소스부터 타겟까지 데이터 흐름 시각화
- SQL Lab: DuckDB/Trino 기반 인터랙티브 쿼리 실행
- Kubernetes 지원: Production-ready K8s 배포 설정
┌──────────────────────────────────────────────────────────────────────┐
│ FRONTEND (React + Vite) │
│ Landing │ Dataset │ Catalog │ Query │ Quality │ Admin │
└─────────────────────────┬────────────────────────────────────────────┘
│ REST API
▼
┌──────────────────────────────────────────────────────────────────────┐
│ BACKEND API (FastAPI) │
│ 27 Routers │ 17 Services │ MongoDB ODM │ PostgreSQL ORM │
└────┬──────┬──────────┬──────────┬──────────-────────────────────────--
│ │ │ │
▼ ▼ ▼ ▼
┌────────┐ │ ┌──────────┐ ┌──────────┐
│Airflow │ │ │OpenSearch│ │ Trino │
│ DAG │ │ │(Search) │ │ (Query) │
└────┬───┘ │ └──────────┘ └──────────┘
│ │
▼ ▼
┌─────────────┐
│ Spark │
│ ETL Runner │
└──────┬──────┘
│
┌───┴────────────────┬──────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│PostgreSQL│ │ S3 │ │ MongoDB │
│(Source/ │ │ (Data │ │(Metadata)│
│ Dest) │ │ Lake) │ │ │
└──────────┘ └──────────┘ └──────────┘
# 1. Clone repository
git clone https://github.com/yourusername/xflow.git
cd xflow
# 2. Start all services (15+ containers)
docker compose up -d
# 3. Download Spark JAR dependencies
mkdir -p spark/jars
curl -fsSLO --output-dir spark/jars https://jdbc.postgresql.org/download/postgresql-42.7.4.jar
curl -fsSLO --output-dir spark/jars https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar
curl -fsSLO --output-dir spark/jars https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar
# 4. Load sample data
docker compose exec -T postgres psql -U postgres -d mydb < init-fake-data.sql
# 5. Access the platform
open http://localhost:5173 # Frontend| Service | URL | Credentials | Purpose |
|---|---|---|---|
| Frontend | http://localhost:5173 | - | React 웹 UI |
| Backend API | http://localhost:8000 | - | FastAPI 서버 |
| API Docs | http://localhost:8000/docs | - | Swagger UI |
| Airflow | http://localhost:8080 | admin/admin | DAG 오케스트레이션 |
| Spark Master UI | http://localhost:8081 | - | Spark 클러스터 모니터링 |
| OpenSearch | http://localhost:9200 | - | 전문 검색 엔진 |
| OpenSearch Dashboards | http://localhost:5601 | - | 데이터 시각화 |
| Trino | http://localhost:8082 | - | 분산 SQL 쿼리 |
| Kafka UI | http://localhost:8084 | - | Kafka 토픽 관리 |
| PostgreSQL | localhost:5433 | postgres/postgres | 관계형 DB |
| MongoDB | localhost:27017 | - | 문서형 DB (메타데이터) |
웹 UI에서 드래그 앤 드롭으로 데이터 파이프라인을 구성하세요.
[Source Node] → [Transform Node] → [Target Node]
↓ ↓ ↓
PostgreSQL Filter/Select S3 Parquet
MongoDB SQL Transform Delta Lake
S3 Files Union/Join PostgreSQL
REST API Custom Logic MongoDB
지원하는 변환 타입 (10+):
select-fields: 특정 컬럼만 선택drop-columns: 컬럼 제거filter: SQL 표현식 필터링union: 다중 입력 병합sql: 임의의 Spark SQL 실행s3-select-fields: S3 로그 필드 추출s3-filter: S3 로그 필터링
- 자동 메타데이터 수집: 스키마, 통계, 샘플 데이터
- 전문 검색: OpenSearch 기반 (한국어 Nori 분석기)
- 도메인 조직화: 비즈니스 도메인별 데이터 자산 그룹화
- 리니지 시각화: 소스 → 변환 → 타겟 데이터 흐름
DuckDB 기반 Parquet 파일 품질 체크:
- Row count validation
- Null value detection
- Duplicate check
- Data quality scoring
AWS Bedrock 통합:
자연어: "지난달 매출이 가장 높은 상품 10개를 보여줘"
↓
SQL: SELECT product_name, SUM(sales) as total_sales
FROM products WHERE date >= '2026-01-01'
GROUP BY product_name ORDER BY total_sales DESC LIMIT 10
- DuckDB: S3 Parquet 파일 직접 쿼리 (서버리스)
- Trino: 분산 SQL 엔진 (대용량 데이터)
- Query History: 실행 이력 저장 및 재사용
Debezium + Kafka를 통한 실시간 데이터 동기화:
PostgreSQL (logical replication)
↓
Kafka Topics (per table)
↓
S3 Data Lake / Delta Lake
curl -X POST http://localhost:8000/api/connections/ \
-H "Content-Type: application/json" \
-d '{
"name": "postgres-main",
"description": "Main PostgreSQL database",
"type": "postgres",
"host": "postgres",
"port": 5432,
"database": "mydb",
"username": "postgres",
"password": "postgres"
}'curl -X POST http://localhost:8000/api/datasets/ \
-H "Content-Type: application/json" \
-d '{
"name": "products_to_s3",
"description": "Extract products, transform, save to S3",
"nodes": [
{
"id": "source1",
"type": "rdb-source",
"config": {
"connection_id": "<connection_id>",
"table": "products"
}
},
{
"id": "transform1",
"type": "filter",
"config": {
"expression": "price > 100"
}
},
{
"id": "target1",
"type": "s3-target",
"config": {
"path": "s3a://xflow-data/products",
"format": "parquet"
}
}
],
"edges": [
{"from": "source1", "to": "transform1"},
{"from": "transform1", "to": "target1"}
]
}'curl -X POST http://localhost:8000/api/datasets/<dataset_id>/run# List all runs
curl http://localhost:8000/api/job-runs/
# Get specific run with logs
curl http://localhost:8000/api/job-runs/<run_id>- React 18.3 + Vite 6.0
- Tailwind CSS 4.1
- React Router 7.10
- XYFlow - DAG 시각화
- Recharts - 차트/그래프
- 110+ custom components
- FastAPI 0.115 - REST API 프레임워크
- Beanie 1.26 - MongoDB ODM
- SQLAlchemy 2.0 - PostgreSQL ORM
- SlowAPI - Rate limiting
- boto3 - AWS S3/IAM 연동
- Apache Spark 3.5 - 분산 ETL 실행
- Apache Airflow 2.x - DAG 오케스트레이션
- DuckDB 1.1 - 서버리스 SQL 쿼리
- Trino 435 - 분산 SQL 엔진
- PostgreSQL - 관계형 데이터
- MongoDB - 파이프라인 메타데이터
- OpenSearch 2.11 - 전문 검색
- MinIO / S3 - 객체 저장소 (데이터 레이크)
- Docker + Docker Compose - 로컬 개발
- Kubernetes - 프로덕션 배포
- Terraform - Infrastructure as Code
- Prometheus + Grafana - 모니터링
POST /api/datasets/- ETL 파이프라인 생성GET /api/datasets/- 파이프라인 목록GET /api/datasets/{id}- 파이프라인 상세PUT /api/datasets/{id}- 파이프라인 수정DELETE /api/datasets/{id}- 파이프라인 삭제POST /api/datasets/{id}/run- 파이프라인 실행
POST /api/connections/- 데이터 소스 연결GET /api/connections/- 연결 목록POST /api/connections/{id}/test- 연결 테스트DELETE /api/connections/{id}- 연결 삭제
GET /api/job-runs/- 실행 이력GET /api/job-runs/{id}- 실행 상세 (로그 포함)POST /api/job-runs/{id}/cancel- 실행 취소
GET /api/catalog/- 데이터 카탈로그 탐색GET /api/catalog/{id}- 데이터셋 메타데이터GET /api/catalog/{id}/lineage- 데이터 리니지POST /api/catalog/search- 전문 검색
POST /api/domains/- 도메인 생성GET /api/domains/- 도메인 목록PUT /api/domains/{id}- 도메인 수정
GET /api/quality/- 품질 체크 결과POST /api/quality/{dataset_id}/check- 품질 체크 실행GET /api/quality/{dataset_id}/score- 품질 점수
POST /api/duckdb/query- DuckDB 쿼리 실행POST /api/trino/query- Trino 쿼리 실행GET /api/query-history/- 쿼리 이력
POST /api/ai/text-to-sql- 자연어 → SQL 변환POST /api/ai/query-explain- SQL 쿼리 설명
POST /api/cdc/connectors/- CDC 커넥터 생성GET /api/cdc/connectors/- 커넥터 목록POST /api/kafka-streaming/topics- Kafka 토픽 관리
POST /api/s3-csv/schema- S3 CSV 스키마 추출POST /api/s3-parquet/schema- S3 Parquet 스키마 추출POST /api/s3-json/schema- S3 JSON 스키마 추출
POST /api/admin/users/- 사용자 생성GET /api/admin/users/- 사용자 목록PUT /api/admin/roles/{id}- 권한 관리
cd backend
# Install dependencies
pip install -r requirements.txt
# Run with hot reload
uvicorn main:app --reload --host 0.0.0.0 --port 8000
# API docs available at:
# - http://localhost:8000/docs (Swagger UI)
# - http://localhost:8000/redoc (ReDoc)cd frontend
# Install dependencies
npm install
# Run dev server (Vite)
npm run dev # Runs on http://localhost:5173
# Build for production
npm run build
# Preview production build
npm run previewcd backend
# Create new migration
alembic revision --autogenerate -m "add new table"
# Apply migrations
alembic upgrade head
# Rollback migration
alembic downgrade -1
# Verify
docker compose exec postgres psql -U postgres -d mydb -c "\dt"docker exec spark-master /opt/spark/bin/spark-submit \
--master 'local[*]' \
--driver-memory 2g \
--jars /opt/spark/jars/extra/postgresql-42.7.4.jar,\
/opt/spark/jars/extra/hadoop-aws-3.3.4.jar,\
/opt/spark/jars/extra/aws-java-sdk-bundle-1.12.262.jar \
/opt/spark/jobs/etl_runner.py \
--config '{"source": {"type": "rdb", ...}}'xflow/
├── backend/ # FastAPI application
│ ├── routers/ # API route handlers (27 files)
│ ├── services/ # Business logic (17 files)
│ ├── models.py # MongoDB models (Beanie)
│ ├── database.py # PostgreSQL config
│ ├── main.py # FastAPI app
│ └── requirements.txt # Python dependencies
├── frontend/ # React application
│ ├── src/
│ │ ├── pages/ # Page components (14)
│ │ ├── components/ # Reusable components (110+)
│ │ ├── services/ # API client
│ │ ├── context/ # React context
│ │ └── hooks/ # Custom hooks
│ ├── package.json
│ └── vite.config.js
├── airflow/ # Orchestration
│ ├── dags/ # DAG definitions
│ │ ├── dataset_dag.py
│ │ └── etl_common.py
│ └── Dockerfile
├── spark/ # Data processing
│ ├── jobs/ # ETL scripts (14)
│ │ └── etl_runner.py # Main ETL engine (2,000+ lines)
│ ├── jars/ # JDBC/Hadoop JARs
│ └── Dockerfile.k8s
├── k8s/ # Kubernetes manifests
│ ├── backend/ # Backend deployment
│ ├── airflow/ # Airflow Helm values
│ ├── spark/ # Spark Operator CRDs
│ ├── opensearch/ # OpenSearch cluster
│ ├── kafka/ # Kafka cluster
│ ├── mongodb/ # MongoDB StatefulSet
│ ├── redis/ # Redis cache
│ └── trino/ # Trino coordinator
├── terraform/ # Infrastructure as Code
│ └── localstack/ # LocalStack config (S3, IAM)
├── docker-compose.yml # Local development (15+ services)
├── init-fake-data.sql # Sample data
└── README.md
# Use local mode instead of cluster
--master 'local[*]'
# Increase driver memory
--driver-memory 2g
# Increase executor memory
--executor-memory 2g# Restart scheduler
docker compose restart airflow-scheduler
# Check DAG errors
docker compose logs airflow-scheduler
# Verify DAG file syntax
docker compose exec airflow-scheduler python -c "import sys; sys.path.append('/opt/airflow/dags'); import dataset_dag"# Check MongoDB status
docker compose ps mongodb
# Restart MongoDB
docker compose restart mongodb
# Check logs
docker compose logs mongodb# Restart LocalStack
docker compose restart localstack
# Wait for healthy status
docker compose ps localstack
# Recreate buckets
aws --endpoint-url=http://localhost:4566 s3 mb s3://xflow-data# Check cluster health
curl http://localhost:9200/_cluster/health?pretty
# Restart OpenSearch
docker compose restart opensearch
# Check disk space
docker system df# Restart Kafka cluster
docker compose restart kafka zookeeper
# Check broker logs
docker compose logs kafka --tail=100
# List topics
docker compose exec kafka kafka-topics --list --bootstrap-server localhost:9092cd frontend
# Build production bundle
npm run build
# Upload to S3
aws s3 sync dist/ s3://xflows --delete
# Invalidate CloudFront cache
aws cloudfront create-invalidation \
--distribution-id E3J1KK599NCLXA \
--paths "/*"- EKS Cluster Setup
# Install eksctl
brew install eksctl
# Create EKS cluster
eksctl create cluster \
--name xflow-cluster \
--region ap-northeast-2 \
--nodegroup-name standard-workers \
--node-type t3.xlarge \
--nodes 3 \
--nodes-min 1 \
--nodes-max 5 \
--managed- Install Required Tools
# Helm
brew install helm
# kubectl
brew install kubectl
# AWS CLI
brew install awscliaws ecr get-login-password --region ap-northeast-2 > /tmp/ecr_password.txt
docker login --username AWS \
--password-stdin 134059028370.dkr.ecr.ap-northeast-2.amazonaws.com < /tmp/ecr_password.txt
rm /tmp/ecr_password.txtcd backend
# Build & push image
docker build --platform linux/amd64 \
-t 134059028370.dkr.ecr.ap-northeast-2.amazonaws.com/xflow-backend:latest .
docker push 134059028370.dkr.ecr.ap-northeast-2.amazonaws.com/xflow-backend:latest
# Deploy (first time)
kubectl apply -f k8s/backend/
# Update deployment
kubectl rollout restart deployment backend -n default
# Check status
kubectl rollout status deployment backend -n default
kubectl get pods -n default -l app=backend
# View logs
kubectl logs -n default -l app=backend --tail=100 -fConfiguration Files:
k8s/backend/deployment.yaml- Deployment speck8s/backend/configmap.yaml- Environment variablesk8s/backend/serviceaccount.yaml- IRSA (IAM role for S3)k8s/backend/service.yaml- ClusterIP service
cd airflow
# Build & push image (includes DAGs)
docker build --platform linux/amd64 \
-t 134059028370.dkr.ecr.ap-northeast-2.amazonaws.com/xflow-airflow:latest .
docker push 134059028370.dkr.ecr.ap-northeast-2.amazonaws.com/xflow-airflow:latest
# Install Airflow (first time)
helm repo add apache-airflow https://airflow.apache.org
helm upgrade --install airflow apache-airflow/airflow \
--namespace airflow \
--create-namespace \
--version 1.15.0 \
-f k8s/airflow/values.yaml \
--wait
# Deploy Spark RBAC
kubectl apply -f k8s/airflow/spark-rbac.yaml
# Update DAGs (after changes)
kubectl rollout restart deployment -n airflow \
airflow-scheduler \
airflow-webserver \
airflow-triggerer \
airflow-dag-processor
# Check status
kubectl get pods -n airflow
# View logs
kubectl logs -n airflow -l component=scheduler --tail=100 -fConfiguration Files:
k8s/airflow/values.yaml- Helm chart valuesk8s/airflow/spark-rbac.yaml- Spark Operator permissionsk8s/airflow/ingress.yaml- ALB Ingress config
cd spark
# Build & push image
docker build --platform linux/amd64 -f Dockerfile.k8s \
-t 134059028370.dkr.ecr.ap-northeast-2.amazonaws.com/xflow-spark:latest .
docker push 134059028370.dkr.ecr.ap-northeast-2.amazonaws.com/xflow-spark:latest
# Install Spark Operator (first time)
helm repo add spark-operator https://kubeflow.github.io/spark-operator
helm install spark-operator spark-operator/spark-operator \
--namespace spark-operator \
--create-namespace \
--set webhook.enable=true
# Jobs are automatically created by Airflow DAGs
# Monitor jobs:
kubectl get sparkapplication -n spark-jobsSpark Job Management:
# List running jobs
kubectl get sparkapplication -n spark-jobs
# View job details
kubectl describe sparkapplication <name> -n spark-jobs
# View driver logs
kubectl logs -f <driver-pod-name> -n spark-jobs
# Delete job
kubectl delete sparkapplication <name> -n spark-jobs
# Delete all completed jobs
kubectl delete sparkapplication --all -n spark-jobs --field-selector status.applicationState.state=COMPLETED# Deploy OpenSearch cluster
kubectl apply -f k8s/opensearch/
# Check cluster health
kubectl get pods -n opensearch
# View logs
kubectl logs -n opensearch -l app=opensearch# Install Prometheus + Grafana
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm install prometheus prometheus-community/kube-prometheus-stack \
--namespace monitoring \
--create-namespace
# Access Grafana
kubectl port-forward -n monitoring svc/prometheus-grafana 3000:80
# Open http://localhost:3000 (admin/prom-operator)Deployed Services:
kubectl get pods -n default # Backend, MongoDB
kubectl get pods -n airflow # Airflow (scheduler, webserver, workers)
kubectl get pods -n spark-jobs # Spark applications
kubectl get pods -n opensearch # OpenSearch cluster
kubectl get pods -n monitoring # Prometheus, Grafana- E-Commerce Analytics: Extract orders from PostgreSQL → Transform → Store in S3 Data Lake → Query with Trino
- Data Lake Ingestion: Schedule daily S3 log parsing → Parquet conversion → Catalog registration
- Cross-Database Joins: Union data from multiple sources (PostgreSQL + MongoDB + S3) → Single view
- Data Quality Monitoring: Automated quality checks on every pipeline run → Alerts on failures
This project is licensed under the MIT License. See LICENSE file for details.
- Apache Airflow - Workflow orchestration
- Apache Spark - Distributed data processing
- FastAPI - Modern Python web framework
- React + XYFlow - Interactive UI components
- OpenSearch - Full-text search engine
- Trino - Distributed SQL query engine
- Documentation: GitHub Wiki
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- Real-time streaming transformations (Flink integration)
- ML model deployment pipeline
- Data version control (lakeFS integration)
- Advanced lineage visualization (Neo4j integration)
- Multi-tenancy support
- SaaS deployment option
Built with ❤️ for the Data Engineering Community