A comprehensive ETL pipeline for converting PCORnet/PICORI CDM data to OMOP CDM v5.4.2 format, compatible with OHDSI tools and analyses.
This project provides a complete solution for transforming PCORnet/PICORI Common Data Model (CDM) datasets stored as Parquet files into a fully standards-compliant OMOP CDM v5.4.2 instance. The ETL process is designed to be compatible with OHDSI tools including Achilles, Data Quality Dashboard (DQD), Patient-Level Prediction, and CohortMethod.
- Complete ETL Pipeline: Transforms all major PCORnet tables to OMOP format
- Standards Compliant: Follows OMOP CDM v5.4.2 specifications exactly
- OHDSI Compatible: Works with all major OHDSI tools and packages
- Comprehensive Validation: Includes DQD, Achilles, and custom validation checks
- Scalable Processing: Uses PySpark for efficient large-scale data processing
- Quality Assurance: Built-in data quality checks and validation framework
- Documentation: Complete mapping specifications and data dictionary
- ETL Engine: PySpark for distributed data processing
- Target Database: PostgreSQL with OMOP CDM v5.4.2 schema
- Vocabularies: Athena standardized vocabularies (SNOMED, RxNorm, LOINC, etc.)
- Validation: OHDSI Data Quality Dashboard and Achilles
- Configuration: YAML-based configuration with environment variable support
- Python 3.10+
- Java 8 or 11
- Apache Spark 3.4+
- PostgreSQL 14+
- R 4.3+ (for validation tools)
- Clone the repository:
git clone <repository-url>
cd PICORI2OMOP- Install Python dependencies:
pip install -r requirements.txt- Set up environment variables:
export OMOP_DB_PASSWORD="your_postgres_password"
export OMOP_ID_SALT="your_strong_random_salt"- Create PostgreSQL database:
createdb omop- Run bootstrap script:
./etl/scripts/bootstrap.sh-
Download and load OMOP CDM DDLs from OHDSI CommonDataModel
-
Download and load Athena vocabularies from OHDSI Athena
-
Place your PCORnet Parquet files in
~/datasets/stroke_data/ -
Run the complete ETL process:
./etl/scripts/run_etl.sh- Run validation checks:
./etl/scripts/run_validation.shPICORI2OMOP/
├── plan.md # Comprehensive ETL plan
├── README.md # This file
├── requirements.txt # Python dependencies
├── etl/
│ ├── config/
│ │ ├── etl_config.yml # ETL configuration
│ │ └── secrets.example.yml # Example secrets
│ ├── spark/
│ │ ├── common/ # Common utilities
│ │ │ ├── io_utils.py
│ │ │ ├── mapping_utils.py
│ │ │ ├── ids.py
│ │ │ └── validation.py
│ │ ├── load_person.py # Person data loader
│ │ ├── load_visits.py # Visit data loader
│ │ ├── load_condition.py # Condition data loader
│ │ └── ... # Other domain loaders
│ ├── mappings/ # Mapping files
│ │ ├── encounter_type.csv
│ │ ├── dx_type.csv
│ │ ├── units.csv
│ │ └── drug_type.csv
│ ├── sql/
│ │ ├── create_schemas.sql # Schema creation
│ │ ├── vocab_load.sql # Vocabulary loading
│ │ ├── eras/ # Era building scripts
│ │ └── checks/ # Validation scripts
│ └── scripts/
│ ├── bootstrap.sh # Database setup
│ ├── run_etl.sh # ETL orchestration
│ └── run_validation.sh # Validation orchestration
└── docs/
├── decisions_log.md # ETL decisions log
└── data_dictionary.md # Data dictionary
The ETL process is configured via etl/config/etl_config.yml:
source:
parquet_root: "/home/asadr/datasets/stroke_data"
target:
jdbc_url: "jdbc:postgresql://localhost:5432/omop"
db_user: "postgres"
db_password_env: "OMOP_DB_PASSWORD"
cdm_schema: "cdm"
staging_schema: "staging"
results_schema: "results"
vocabulary:
snapshot_date: "2025-09-30"
enforce_standard_only: true
etl:
spark_master: "local[*]"
partitions: 8
batch_size_rows: 50000
write_mode: "append"
timezone: "UTC"The ETL process maps PCORnet tables to OMOP tables:
DEMOGRAPHIC→person+observation_periodENCOUNTER→visit_occurrenceDIAGNOSIS→condition_occurrencePROCEDURES→procedure_occurrencePRESCRIBING/DISPENSING→drug_exposureLAB_RESULT_CM→measurementVITAL→measurementOBS_CLIN/OBS_GEN→observationIMMUNIZATION→drug_exposureDEATH→deathENROLLMENT→observation_period
The ETL process includes comprehensive validation:
- Integrity Checks: Primary key uniqueness, foreign key integrity
- Constraint Validation: Not-null constraints, data type validation
- Data Quality Dashboard: OHDSI DQD for comprehensive quality assessment
- Achilles: Data characterization and profiling
- Custom Validation: Row count reconciliation, concept mapping quality
- Standards Compliance: Follows OMOP CDM v5.4.2 specifications exactly
- OHDSI Compatibility: Tested with OHDSI tools and packages
- Data Quality: Comprehensive validation and quality checks
- Documentation: Complete mapping specifications and decisions log
- Error Handling: Robust error handling and logging
- Database Connection: Ensure PostgreSQL is running and credentials are correct
- Memory Issues: Adjust Spark memory settings for large datasets
- Vocabulary Loading: Ensure Athena vocabularies are properly loaded
- Permission Issues: Check file permissions for Parquet data and scripts
- ETL logs are written to console
- Validation results are stored in the
resultsschema - Error logs include detailed error messages and stack traces
- Follow the existing code structure and patterns
- Update documentation for any changes
- Add tests for new functionality
- Update the decisions log for significant changes
This project is licensed under the MIT License - see the LICENSE file for details.
For issues and questions:
- Check the documentation in the
docs/directory - Review the decisions log for known issues
- Check the validation results for data quality issues
- Create an issue in the repository