Conversation
…comment .vscode directory in .gitignore to not push the venv
…irectly and updating logging; rename functions and parameters for clarity
…austivity process
…n; improve logging and data handling
… update .gitignore for better clarity
…lete combinations of (PERIOD, DX_UID, ORG_UNIT) with exhaustivity = 0; enhance logging for clarity
…older naming based on configuration; improve logging and .gitignore for workspace data
…onfiguration file path for data extraction
…y and ensure correct file paths
…g in requirements.txt
…es to requirements.txt
…rom dhis2_dataset_sync; add parameters for optional sync operations
…dividual combinations of (PERIOD, ORG_UNIT, COC) to separate parquet files; enhance logging for better traceability
… all combinations for each period into a single file; improve logging for clarity
…les and reducing redundant configuration loads; enhance performance with vectorized operations for missing data handling
… reading and optimizing data processing with vectorized operations; improve logging for clarity and efficiency
…arallel processing; enhance logging clarity and optimize data handling for missing entries
…cted mappings for periods and org units; improve logging for missing combinations and streamline data handling
…rameter and improving return logic for task execution
… data in a streamlined format; enhance logic for detecting and processing exhaustivity files
…pute_exhaustivity function to streamline data processing; clean up related code for clarity
…_exhaustivity function; streamline data processing by consolidating mapping logic and enhancing DataFrame initialization for missing files
…ity function by replacing nested loops with Polars joins; enhance error handling in data extraction and improve logging for missing expected DX_UIDs
…UE check to ensure accurate exhaustivity computation
EstebanMontandon
left a comment
There was a problem hiding this comment.
In general, I think we should reuse as much as possible the existing architecture of the D2D library and stick to its patterns to ensure reusability (and try to convince others that it works 😉).
let me know if you want to go though and discuss the comments here. good luck.
| requests | ||
| python-dateutil | ||
| openhexa.toolbox | ||
| openhexa.sdk |
There was a problem hiding this comment.
When libraries are included in the requirements, they'll be installed when the pipeline is executed (docker image). I think all these libraries are already part of the OH image, so no need to re-install, unless we want to use a specific version for compatibility (maybe python-dateutil is not there, not sure ...)
dhis2_exhaustivity/pipeline.py
Outdated
| return True | ||
|
|
||
|
|
||
| def push_dataset_org_units( |
dhis2_exhaustivity/pipeline.py
Outdated
| # -https://github.com/BLSQ/openhexa-pipelines-drc-prs | ||
|
|
||
|
|
||
| def dhis2_request( |
There was a problem hiding this comment.
maybe we can move this function to utils.py?
dhis2_exhaustivity/pipeline.py
Outdated
| help="Extract data elements from source DHIS2.", | ||
| ) | ||
| @parameter( | ||
| code="run_compute_data", |
There was a problem hiding this comment.
Perhaps we can think on removing this option, probably there's no use case where we just want to compute right?
we can consider the options:
- if we extract (true) then compute
- if we push (true) then first compute then push
dhis2_exhaustivity/pipeline.py
Outdated
|
|
||
| # extract data from source DHIS2 | ||
| @pipeline("dhis2_exhaustivity", timeout=3600) # 1 hour | ||
| @parameter( |
There was a problem hiding this comment.
I think I would remove this option as it should be a mandatory step.
The organisation units in the target DS should always be the same as those in the source DS (in sync)
dhis2_exhaustivity/pipeline.py
Outdated
|
|
||
| # After all periods are saved for this extract, enqueue a marker to indicate extract is complete | ||
| # This allows push_data to collect all files for an extract before concatenating and pushing | ||
| push_queue.enqueue(f"EXTRACT_COMPLETE|{extract_id}") |
There was a problem hiding this comment.
The goal of the push task is to check in the queue until there's something to be pushed. Therefore we should only enqueue the data/files ready, like you do before in :
# Enqueue aggregated file for pushing
push_queue.enqueue(f"{extract_id}|{aggregated_file}") # -> This is enough, the push task will load it and push it (after some mapping ofc)
dhis2_exhaustivity/pipeline.py
Outdated
| ) | ||
|
|
||
| # Save exhaustivity data per period | ||
| # Get all periods from the result (may include periods not in exhaustivity_periods if data exists) |
There was a problem hiding this comment.
Im not sure why we would have files (extracts) with mixed periods, the extract task only retrieves data for a single period at the time... therefore, exhaustivity should do the computation and produce results also per period. Probably I'm not understanding the logic correctly here, we should rework this task to simplify it.
dhis2_exhaustivity/pipeline.py
Outdated
|
|
||
| # Calculate COC-level statistics (before aggregation to ORG_UNIT level) | ||
| # Aggregate by (PERIOD, COC, ORG_UNIT): exhaustivity = 1 only if all DX_UIDs for that COC are 1 | ||
| if "CATEGORY_OPTION_COMBO" in period_exhaustivity.columns: |
There was a problem hiding this comment.
lets try to put this parts of the computation in separated functions to simplify a bit 👍
dhis2_exhaustivity/pipeline.py
Outdated
| _, extract_id = split_on_pipe(next_item) | ||
| push_queue.dequeue() # remove marker | ||
| # If we have files collected for this extract, process them now | ||
| if extract_id in extract_files_collected and len(extract_files_collected[extract_id]) > 0: |
There was a problem hiding this comment.
let's avoid collecting all files per extracts, our approach should focus a single periods per extract so:
extract 1 -> extract for period 202501 -> compute exhaustivity -> push results for 202501
-----------> extract for period 202502 -> compute exhaustivity -> push results for 202502
-----------> extract for period 202503 -> compute exhaustivity -> push results for 202503
This is why we use a queue to collect the periods that are ready to be pushed!
dhis2_exhaustivity/pipeline.py
Outdated
| current_run.log_warning(f"⚠️ File {extract_path.name} missing EXHAUSTIVITY_VALUE, skipping") | ||
| continue | ||
|
|
||
| # Map to DHIS2 format (COC/AOC not needed - DHIS2 uses defaults) |
There was a problem hiding this comment.
we need this mandatory format for DHIS2 to import a datapoint :
{"dataElement", "period", "orgUnit", "categoryOptionCombo", "attributeOptionCombo", "value" }
your implementation is correct, but I don't see the CATEGORY_OPTION_COMBO nor ATTRIBUTE_OPTION_COMBO columns in the mapped dataframe.. maybe I'm missing something?
in this case , indeed we can set coc and aoc to default:
"categoryOptionCombo": "HllvX50cXC0"
"attributeOptionCombo": "HllvX50cXC0"
There was a problem hiding this comment.
Pull request overview
This PR introduces a self-contained DHIS2 exhaustivity integration pipeline that extracts data with DX_UID, computes exhaustivity per (period, org unit) using mapping-based valid (DX_UID, COC) pairs, writes results to parquet files, and pushes results (dry-run by default). The integration uses additive org unit synchronization, includes concise logging for empty periods/COCs, and provides dynamic date behavior configuration. The implementation is non-breaking and ready for main.
Key changes:
- New exhaustivity computation pipeline with extract, compute, and push stages
- Dynamic drug mapping configuration for FoSa and BCZ COC and DX_UID associations
- Additive org unit sync that preserves existing units while adding new ones
Reviewed changes
Copilot reviewed 17 out of 19 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| dhis2_exhaustivity/utils.py | Core utility functions for DHIS2 connection, config loading, drug mapping, and parquet file handling |
| dhis2_exhaustivity/requirements.txt | Python dependencies for the pipeline |
| dhis2_exhaustivity/pipeline.py | Main pipeline orchestration with extract, compute, and push tasks |
| dhis2_exhaustivity/missing_dx_coc_diagnostic.txt | Diagnostic report of missing DX_UID and COC entries in FOSA and BCZ datasets |
| dhis2_exhaustivity/exhaustivity_calculation.py | Core exhaustivity computation logic with COC-level validation |
| dhis2_exhaustivity/d2d_library/org_unit_point.py | Organization unit data structures and validation |
| dhis2_exhaustivity/d2d_library/dhis2_pusher.py | DHIS2 data push handler with batch processing |
| dhis2_exhaustivity/d2d_library/dhis2_org_unit_aligner.py | Organization unit synchronization between DHIS2 instances |
| dhis2_exhaustivity/d2d_library/dhis2_extract_handlers.py | Data extraction handlers for analytics and data elements |
| dhis2_exhaustivity/d2d_library/db_queue.py | SQLite-based thread-safe queue for managing file processing |
| dhis2_exhaustivity/d2d_library/data_point.py | Data point structures for DHIS2 push operations |
| dhis2_exhaustivity/configuration/push_settings.json | Push operation configuration with dry-run defaults |
| dhis2_exhaustivity/configuration/extract_config.json | Pipeline configuration for extracts and connections |
| dhis2_exhaustivity/configuration/drug_mapping_fosa.json | FoSa drug-to-UID/COC mapping configuration |
| dhis2_exhaustivity/configuration/drug_mapping_bcz.json | BCZ drug-to-UID/COC mapping configuration |
| dhis2_exhaustivity/config_sync.py | Configuration validation and drug mapping synchronization |
| dhis2_exhaustivity/.gitignore | Standard Python and pipeline-specific ignore patterns |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
dhis2_exhaustivity/config_sync.py
Outdated
| def log_info(msg): current_run.log_info(msg) | ||
| def log_warning(msg): current_run.log_warning(msg) | ||
| def log_error(msg): current_run.log_error(msg) | ||
| except ImportError: | ||
| workspace = None | ||
| def log_info(msg): print(f"[INFO] {msg}") | ||
| def log_warning(msg): print(f"[WARNING] {msg}") | ||
| def log_error(msg): print(f"[ERROR] {msg}") | ||
|
|
||
|
|
There was a problem hiding this comment.
The inline function definitions for logging create unnecessary complexity. Consider defining these as proper functions or using a logger instance directly instead of inline lambda-style definitions. This would improve readability and make the code easier to maintain.
| def log_info(msg): current_run.log_info(msg) | |
| def log_warning(msg): current_run.log_warning(msg) | |
| def log_error(msg): current_run.log_error(msg) | |
| except ImportError: | |
| workspace = None | |
| def log_info(msg): print(f"[INFO] {msg}") | |
| def log_warning(msg): print(f"[WARNING] {msg}") | |
| def log_error(msg): print(f"[ERROR] {msg}") | |
| def log_info(msg): | |
| current_run.log_info(msg) | |
| def log_warning(msg): | |
| current_run.log_warning(msg) | |
| def log_error(msg): | |
| current_run.log_error(msg) | |
| except ImportError: | |
| workspace = None | |
| def log_info(msg): | |
| print(f"[INFO] {msg}") | |
| def log_warning(msg): | |
| print(f"[WARNING] {msg}") | |
| def log_error(msg): | |
| print(f"[ERROR] {msg}") |
| parts = s.split("|", 1) | ||
| if len(parts) == 2: | ||
| return parts[0], parts[1] | ||
| return None, parts[0] |
There was a problem hiding this comment.
The function returns inconsistent tuple positions when no pipe is found. When a pipe exists, it returns (before_pipe, after_pipe). When no pipe exists, it returns (None, entire_string), reversing the expected pattern. This should return (entire_string, None) to maintain consistency, where the first element is always the content before the pipe (or entire string if no pipe).
| return None, parts[0] | |
| return parts[0], None |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 18 out of 20 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| periods_in_data = exhaustivity_df["PERIOD"].unique().to_list() if len(exhaustivity_df) > 0 else [] | ||
| cocs_in_data = exhaustivity_df["CATEGORY_OPTION_COMBO"].unique().to_list() if len(exhaustivity_df) > 0 else [] |
There was a problem hiding this comment.
The condition len(exhaustivity_df) > 0 is checked twice in consecutive lines. Consider checking once and reusing the result to avoid redundant DataFrame length checks.
| periods_in_data = exhaustivity_df["PERIOD"].unique().to_list() if len(exhaustivity_df) > 0 else [] | |
| cocs_in_data = exhaustivity_df["CATEGORY_OPTION_COMBO"].unique().to_list() if len(exhaustivity_df) > 0 else [] | |
| periods_in_data = exhaustivity_df["PERIOD"].unique().to_list() if total_combinations > 0 else [] | |
| cocs_in_data = exhaustivity_df["CATEGORY_OPTION_COMBO"].unique().to_list() if total_combinations > 0 else [] |
| from d2d_library.db_queue import Queue | ||
| from d2d_library.dhis2_dataset_utils import dhis2_request, push_dataset_org_units | ||
| from d2d_library.dhis2_extract_handlers import DHIS2Extractor | ||
| from d2d_library.dhis2_pusher import DHIS2Pusher | ||
| from dateutil.relativedelta import relativedelta | ||
| from exhaustivity_calculation import compute_exhaustivity | ||
| from openhexa.sdk import current_run, parameter, pipeline, workspace | ||
| from openhexa.toolbox.dhis2 import DHIS2 | ||
| from openhexa.toolbox.dhis2.dataframe import get_datasets | ||
| from openhexa.toolbox.dhis2.periods import period_from_string | ||
| from utils import ( |
There was a problem hiding this comment.
Consider using relative imports (e.g., from .d2d_library.db_queue import Queue) to make the module more portable and explicit about its package structure.
| from d2d_library.db_queue import Queue | |
| from d2d_library.dhis2_dataset_utils import dhis2_request, push_dataset_org_units | |
| from d2d_library.dhis2_extract_handlers import DHIS2Extractor | |
| from d2d_library.dhis2_pusher import DHIS2Pusher | |
| from dateutil.relativedelta import relativedelta | |
| from exhaustivity_calculation import compute_exhaustivity | |
| from openhexa.sdk import current_run, parameter, pipeline, workspace | |
| from openhexa.toolbox.dhis2 import DHIS2 | |
| from openhexa.toolbox.dhis2.dataframe import get_datasets | |
| from openhexa.toolbox.dhis2.periods import period_from_string | |
| from utils import ( | |
| from .d2d_library.db_queue import Queue | |
| from .d2d_library.dhis2_dataset_utils import dhis2_request, push_dataset_org_units | |
| from .d2d_library.dhis2_extract_handlers import DHIS2Extractor | |
| from .d2d_library.dhis2_pusher import DHIS2Pusher | |
| from dateutil.relativedelta import relativedelta | |
| from .exhaustivity_calculation import compute_exhaustivity | |
| from openhexa.sdk import current_run, parameter, pipeline, workspace | |
| from openhexa.toolbox.dhis2 import DHIS2 | |
| from openhexa.toolbox.dhis2.dataframe import get_datasets | |
| from openhexa.toolbox.dhis2.periods import period_from_string | |
| from .utils import ( |
| raise | ||
|
|
||
|
|
||
| def split_on_pipe(s: str) -> tuple[str, str | None]: |
There was a problem hiding this comment.
The return order is incorrect. When no pipe is found, the function returns (None, parts[0]), but the type hint and docstring indicate it should return (parts[0], None) - the part before the pipe should be first, followed by the part after (or None).
| parts = s.split("|", 1) | ||
| if len(parts) == 2: | ||
| return parts[0], parts[1] | ||
| return None, parts[0] |
There was a problem hiding this comment.
The return order is incorrect. When no pipe is found, the function returns (None, parts[0]), but the type hint and docstring indicate it should return (parts[0], None) - the part before the pipe should be first, followed by the part after (or None).
| return None, parts[0] | |
| return parts[0], None |
| for i_e, error in enumerate(errors, start=1): | ||
| error_value = error.get("value", None) | ||
| error_code = error.get("errorCode", None) | ||
| logger.error(f"Error {i_e} value: {error_value} (DHSI2 errorCode: {error_code})") |
There was a problem hiding this comment.
Corrected spelling of 'DHIS2' from 'DHSI2'.
| logger.error(f"Error {i_e} value: {error_value} (DHSI2 errorCode: {error_code})") | |
| logger.error(f"Error {i_e} value: {error_value} (DHIS2 errorCode: {error_code})") |
| from openhexa.toolbox.dhis2 import DHIS2 | ||
|
|
||
|
|
||
| class AnanalyticsDataElementExtractor: |
There was a problem hiding this comment.
Corrected spelling of 'Analytics' from 'Ananalytics' (extra 'na').
| dx_uid_to_null_flag = { | ||
| dx_uid: null_flag | ||
| for dx_uid, null_flag in zip(dx_uids_present_list, null_flags_list, strict=False) |
There was a problem hiding this comment.
Using strict=False in zip silently allows lists of different lengths, which could hide data integrity issues. Consider using strict=True (Python 3.10+) or adding explicit length validation before the zip operation to catch mismatched data early.
| dx_uid_to_null_flag = { | |
| dx_uid: null_flag | |
| for dx_uid, null_flag in zip(dx_uids_present_list, null_flags_list, strict=False) | |
| if len(dx_uids_present_list) != len(null_flags_list): | |
| logger.warning( | |
| "Mismatched lengths for DX_UIDs and NULL_FLAGS " | |
| "for PERIOD=%s, COC=%s, ORG_UNIT=%s: %d vs %d", | |
| period, | |
| coc, | |
| org_unit, | |
| len(dx_uids_present_list), | |
| len(null_flags_list), | |
| ) | |
| dx_uid_to_null_flag = { | |
| dx_uid: null_flag | |
| for dx_uid, null_flag in zip(dx_uids_present_list, null_flags_list) |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 18 out of 20 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
dhis2_exhaustivity/pipeline.py
Outdated
| # 3) Compute exhaustivity from extracted data (automatic if extract or push is enabled) | ||
| # If extract is enabled → compute after extraction | ||
| # If push is enabled → compute before pushing (even if extract is disabled, in case of recovery) | ||
| should_compute = run_extract_data or run_push_data |
There was a problem hiding this comment.
Let's try to simplify this by moving the exhaustivity computation inside the extract process and queue results:
Run Extract task :
for extract - for period:
-> extract(extract_1, 202501)
-> exhaustivity_extract_1, 202501 = compute_exhaustivity(extract_1, 202501)
-> queue(exhaustivity_extract_1 | 202501)
-> next period , next extract
-> -END-
Push Task :
-> while True:
-> next_exhaustivity_extract = dequeue()
-> mapping(next_exhaustivity_extract )
-> push(next_exhaustivity_extract )
dhis2_exhaustivity/pipeline.py
Outdated
| f"Extract {extract_id} download returned None " | ||
| f"for period {period}. No file created." | ||
| ) | ||
| elif not output_file.exists(): |
There was a problem hiding this comment.
Is the extractor.download_period() function returning file names for extracts that are not downloaded??
There was a problem hiding this comment.
I think you can just do like here:
dhis2_exhaustivity/pipeline.py
Outdated
| f"for period {period}: {output_file}" | ||
| ) | ||
|
|
||
| except Exception as e: |
There was a problem hiding this comment.
Let's maybe put the exhaustivity computation after successfully downloading the extract inside the for loop and after the try: block like:
for periods:
try:
extract = retrieve data extract for period....
except
try:
if (extract):
exhaustivity_extract = compute exhaustivity for (extract)
queue(exhaustivity_extract)
except
Correctly handling and logging errors/warnings.
EstebanMontandon
left a comment
There was a problem hiding this comment.
A few adjustments are still needed, but we’re almost there. Thanks for the effort ... let's do one final round of checks👍
dhis2_exhaustivity/pipeline.py
Outdated
| pipeline_path=pipeline_path, | ||
| run_task=run_push_data, | ||
| wait=(compute_ready and sync_ready), | ||
| wait=(sync_ready and (extract_ready if run_extract_data else True)), |
There was a problem hiding this comment.
The idea of the tasks extract and push is that they can work asynchronously. As soon there's data to be pushed, we push it. The wait in the push task should be set only to sync_ready
dhis2_exhaustivity/pipeline.py
Outdated
There was a problem hiding this comment.
Queue class includes mutual-exclusion, so only one process can read or write a file at a time, so far so good. so there's no need to initialize it here, actually I noticed, this init is already included to the extract and push task, that's good, so this can be removed.
dhis2_exhaustivity/pipeline.py
Outdated
There was a problem hiding this comment.
Preferably , do not include spaces in folder names , use "_"
| # setup | ||
| configure_logging(logs_path=pipeline_path / "logs" / "push", task_name="push_data") | ||
| # load pipeline configuration | ||
| config = load_pipeline_config(pipeline_path / "configuration") |
There was a problem hiding this comment.
This is still not resolved
dhis2_exhaustivity/pipeline.py
Outdated
| ) | ||
|
|
||
| # Signal end of compute/enqueue | ||
| push_queue.enqueue("FINISH") |
There was a problem hiding this comment.
better to include this inside the method handle_data_element_extracts()
dhis2_exhaustivity/pipeline.py
Outdated
There was a problem hiding this comment.
See the extractor class in the D2D , it handles the creation of folders and files. No need to handle it here.
We also have a overwrite option which usually for this type of pipeline is useful to replace existing files.
The extract implementation here should be the same as the other pipelines (for example dhis2_cmm_push).
dhis2_exhaustivity/pipeline.py
Outdated
| current_run.log_warning( | ||
| f"Extract {extract_id} download returned path {output_file} " | ||
| f"but file does not exist for period {period}." | ||
| except Exception as e: |
There was a problem hiding this comment.
you just need to catch the exception and log it , the extractor class will raise the corresponding error in each case.
|
|
||
| # Compute exhaustivity and enqueue if extraction succeeded | ||
| try: | ||
| if extract_path is not None: |
There was a problem hiding this comment.
This try block should be moved inside the method compute_exhaustivity(), there we use this to handle the right behaviour: For example, if an exception occurs we skip to the next period maybe? and at the end of the loop we ensure the label "FINISH" is enqueued inside finally (try finally expression).
see example:
dhis2_exhaustivity/pipeline.py
Outdated
| f"Extract {extract_id} successfully created file " | ||
| f"for period {period}: {output_file}" | ||
|
|
||
| if len(period_exhaustivity_df) == 0: |
There was a problem hiding this comment.
can we handle all the extra logic inside the calculate_exhaustivity()? let's try to keep the extract task as clean as possible.
dhis2_exhaustivity/pipeline.py
Outdated
| dataset_name = source_dataset["name"][0] if len(source_dataset) > 0 else "Unknown" | ||
|
|
||
| # Limit org units for testing if MAX_ORG_UNITS_FOR_TEST is set in config | ||
| if pipeline_config and pipeline_config.get("EXTRACTION", {}).get("MAX_ORG_UNITS_FOR_TEST"): |
There was a problem hiding this comment.
not a good idea to leave testing option in production code. You could instead use a "extract_config.json" that contains a subset or something perhaps...
dhis2_exhaustivity/pipeline.py
Outdated
| # Prepare exhaustivity context for this extract (compute will run per period) | ||
| expected_dx_uids = None | ||
| if pipeline_config: | ||
| drug_mapping_file = extract.get("DRUG_MAPPING_FILE") |
There was a problem hiding this comment.
You can simplify the configuration by directly loading the drug config from a specific file.
This file should not change :
Its ok to just hardcode the path to the file
drug_mapping_file = load_drug_mapping(pipeline_path / "something" / "file" ) # I assume this raise the corresponding exception
dhis2_exhaustivity/pipeline.py
Outdated
| try: | ||
| old_file.unlink() | ||
| logging.info(f"Deleted old processed file: {old_file.name}") | ||
| extract_path = dhis2_extractor.analytics_data_elements.download_period( |
There was a problem hiding this comment.
If we're downloading data elements (with DE UIDS) from DHIS2 , we should not retrieve data elements from analytics endpoint (This requires analytics process to compute indicators in the DHIS2, so the data can be available. In our case we're just downloading raw datapoints , data_value_sets endpoint provides direct access to this).
We can simply reuse the existing implementation and avoid re-inventing the wheel:
dhis2_extractor.data_elements.download_period() ---> this class extracts from data_value_sets.
Please do not modify the D2D library unless there's an additional functionality needed or a fix. I believe the current implementation cover most of the requirements of this ticket.
dhis2_exhaustivity/pipeline.py
Outdated
| # Compute exhaustivity and enqueue if extraction succeeded | ||
| if extract_path is not None: | ||
| try: | ||
| compute_exhaustivity_and_enqueue( |
There was a problem hiding this comment.
instead of keeping this function inside a try block, can we handle errors and logs inside the function itself?
|
So I now have the same d2d_lib as cmm_push as you asked. |
Extracts data (with DX_UID), computes exhaustivity per (period, org unit) using mapping-based valid (DX_UID, COC) pairs, writes parquet, and pushes results (dry-run by default).
Uses additive org unit sync (keeps existing, adds new), with concise, clearer logs for empty periods/COCs.
Non-breaking: does not modify existing pipelines; self-contained and ready for main.
Configs include mapping of FoSa and BCZ COC and DX_UID.
extract config contain dynamic date behaviour tthat can be set to False if we want to select a specific range of period.