Skip to content

Comments

Dhis2 exhaustivity integration#3

Open
claude-marie wants to merge 66 commits intomainfrom
dhis2-exhaustivity-integration
Open

Dhis2 exhaustivity integration#3
claude-marie wants to merge 66 commits intomainfrom
dhis2-exhaustivity-integration

Conversation

@claude-marie
Copy link

Extracts data (with DX_UID), computes exhaustivity per (period, org unit) using mapping-based valid (DX_UID, COC) pairs, writes parquet, and pushes results (dry-run by default).
Uses additive org unit sync (keeps existing, adds new), with concise, clearer logs for empty periods/COCs.
Non-breaking: does not modify existing pipelines; self-contained and ready for main.

Configs include mapping of FoSa and BCZ COC and DX_UID.
extract config contain dynamic date behaviour tthat can be set to False if we want to select a specific range of period.

…comment .vscode directory in .gitignore to not push the venv
…irectly and updating logging; rename functions and parameters for clarity
…lete combinations of (PERIOD, DX_UID, ORG_UNIT) with exhaustivity = 0; enhance logging for clarity
…older naming based on configuration; improve logging and .gitignore for workspace data
…rom dhis2_dataset_sync; add parameters for optional sync operations
…dividual combinations of (PERIOD, ORG_UNIT, COC) to separate parquet files; enhance logging for better traceability
… all combinations for each period into a single file; improve logging for clarity
…les and reducing redundant configuration loads; enhance performance with vectorized operations for missing data handling
… reading and optimizing data processing with vectorized operations; improve logging for clarity and efficiency
…arallel processing; enhance logging clarity and optimize data handling for missing entries
…cted mappings for periods and org units; improve logging for missing combinations and streamline data handling
…rameter and improving return logic for task execution
… data in a streamlined format; enhance logic for detecting and processing exhaustivity files
…pute_exhaustivity function to streamline data processing; clean up related code for clarity
…_exhaustivity function; streamline data processing by consolidating mapping logic and enhancing DataFrame initialization for missing files
…ity function by replacing nested loops with Polars joins; enhance error handling in data extraction and improve logging for missing expected DX_UIDs
@claude-marie claude-marie self-assigned this Jan 7, 2026
Copy link
Collaborator

@EstebanMontandon EstebanMontandon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I think we should reuse as much as possible the existing architecture of the D2D library and stick to its patterns to ensure reusability (and try to convince others that it works 😉).
let me know if you want to go though and discuss the comments here. good luck.

requests
python-dateutil
openhexa.toolbox
openhexa.sdk
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When libraries are included in the requirements, they'll be installed when the pipeline is executed (docker image). I think all these libraries are already part of the OH image, so no need to re-install, unless we want to use a specific version for compatibility (maybe python-dateutil is not there, not sure ...)

return True


def push_dataset_org_units(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just re-using this function?

# -https://github.com/BLSQ/openhexa-pipelines-drc-prs


def dhis2_request(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can move this function to utils.py?

help="Extract data elements from source DHIS2.",
)
@parameter(
code="run_compute_data",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can think on removing this option, probably there's no use case where we just want to compute right?
we can consider the options:

  1. if we extract (true) then compute
  2. if we push (true) then first compute then push


# extract data from source DHIS2
@pipeline("dhis2_exhaustivity", timeout=3600) # 1 hour
@parameter(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would remove this option as it should be a mandatory step.
The organisation units in the target DS should always be the same as those in the source DS (in sync)


# After all periods are saved for this extract, enqueue a marker to indicate extract is complete
# This allows push_data to collect all files for an extract before concatenating and pushing
push_queue.enqueue(f"EXTRACT_COMPLETE|{extract_id}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of the push task is to check in the queue until there's something to be pushed. Therefore we should only enqueue the data/files ready, like you do before in :

# Enqueue aggregated file for pushing
push_queue.enqueue(f"{extract_id}|{aggregated_file}")  # -> This is enough, the push task will load it and push it (after some mapping ofc)

)

# Save exhaustivity data per period
# Get all periods from the result (may include periods not in exhaustivity_periods if data exists)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im not sure why we would have files (extracts) with mixed periods, the extract task only retrieves data for a single period at the time... therefore, exhaustivity should do the computation and produce results also per period. Probably I'm not understanding the logic correctly here, we should rework this task to simplify it.


# Calculate COC-level statistics (before aggregation to ORG_UNIT level)
# Aggregate by (PERIOD, COC, ORG_UNIT): exhaustivity = 1 only if all DX_UIDs for that COC are 1
if "CATEGORY_OPTION_COMBO" in period_exhaustivity.columns:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets try to put this parts of the computation in separated functions to simplify a bit 👍

_, extract_id = split_on_pipe(next_item)
push_queue.dequeue() # remove marker
# If we have files collected for this extract, process them now
if extract_id in extract_files_collected and len(extract_files_collected[extract_id]) > 0:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's avoid collecting all files per extracts, our approach should focus a single periods per extract so:
extract 1 -> extract for period 202501 -> compute exhaustivity -> push results for 202501
-----------> extract for period 202502 -> compute exhaustivity -> push results for 202502
-----------> extract for period 202503 -> compute exhaustivity -> push results for 202503

This is why we use a queue to collect the periods that are ready to be pushed!

current_run.log_warning(f"⚠️ File {extract_path.name} missing EXHAUSTIVITY_VALUE, skipping")
continue

# Map to DHIS2 format (COC/AOC not needed - DHIS2 uses defaults)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need this mandatory format for DHIS2 to import a datapoint :
{"dataElement", "period", "orgUnit", "categoryOptionCombo", "attributeOptionCombo", "value" }
your implementation is correct, but I don't see the CATEGORY_OPTION_COMBO nor ATTRIBUTE_OPTION_COMBO columns in the mapped dataframe.. maybe I'm missing something?

in this case , indeed we can set coc and aoc to default:
"categoryOptionCombo": "HllvX50cXC0"
"attributeOptionCombo": "HllvX50cXC0"

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a self-contained DHIS2 exhaustivity integration pipeline that extracts data with DX_UID, computes exhaustivity per (period, org unit) using mapping-based valid (DX_UID, COC) pairs, writes results to parquet files, and pushes results (dry-run by default). The integration uses additive org unit synchronization, includes concise logging for empty periods/COCs, and provides dynamic date behavior configuration. The implementation is non-breaking and ready for main.

Key changes:

  • New exhaustivity computation pipeline with extract, compute, and push stages
  • Dynamic drug mapping configuration for FoSa and BCZ COC and DX_UID associations
  • Additive org unit sync that preserves existing units while adding new ones

Reviewed changes

Copilot reviewed 17 out of 19 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
dhis2_exhaustivity/utils.py Core utility functions for DHIS2 connection, config loading, drug mapping, and parquet file handling
dhis2_exhaustivity/requirements.txt Python dependencies for the pipeline
dhis2_exhaustivity/pipeline.py Main pipeline orchestration with extract, compute, and push tasks
dhis2_exhaustivity/missing_dx_coc_diagnostic.txt Diagnostic report of missing DX_UID and COC entries in FOSA and BCZ datasets
dhis2_exhaustivity/exhaustivity_calculation.py Core exhaustivity computation logic with COC-level validation
dhis2_exhaustivity/d2d_library/org_unit_point.py Organization unit data structures and validation
dhis2_exhaustivity/d2d_library/dhis2_pusher.py DHIS2 data push handler with batch processing
dhis2_exhaustivity/d2d_library/dhis2_org_unit_aligner.py Organization unit synchronization between DHIS2 instances
dhis2_exhaustivity/d2d_library/dhis2_extract_handlers.py Data extraction handlers for analytics and data elements
dhis2_exhaustivity/d2d_library/db_queue.py SQLite-based thread-safe queue for managing file processing
dhis2_exhaustivity/d2d_library/data_point.py Data point structures for DHIS2 push operations
dhis2_exhaustivity/configuration/push_settings.json Push operation configuration with dry-run defaults
dhis2_exhaustivity/configuration/extract_config.json Pipeline configuration for extracts and connections
dhis2_exhaustivity/configuration/drug_mapping_fosa.json FoSa drug-to-UID/COC mapping configuration
dhis2_exhaustivity/configuration/drug_mapping_bcz.json BCZ drug-to-UID/COC mapping configuration
dhis2_exhaustivity/config_sync.py Configuration validation and drug mapping synchronization
dhis2_exhaustivity/.gitignore Standard Python and pipeline-specific ignore patterns

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 15 to 24
def log_info(msg): current_run.log_info(msg)
def log_warning(msg): current_run.log_warning(msg)
def log_error(msg): current_run.log_error(msg)
except ImportError:
workspace = None
def log_info(msg): print(f"[INFO] {msg}")
def log_warning(msg): print(f"[WARNING] {msg}")
def log_error(msg): print(f"[ERROR] {msg}")


Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inline function definitions for logging create unnecessary complexity. Consider defining these as proper functions or using a logger instance directly instead of inline lambda-style definitions. This would improve readability and make the code easier to maintain.

Suggested change
def log_info(msg): current_run.log_info(msg)
def log_warning(msg): current_run.log_warning(msg)
def log_error(msg): current_run.log_error(msg)
except ImportError:
workspace = None
def log_info(msg): print(f"[INFO] {msg}")
def log_warning(msg): print(f"[WARNING] {msg}")
def log_error(msg): print(f"[ERROR] {msg}")
def log_info(msg):
current_run.log_info(msg)
def log_warning(msg):
current_run.log_warning(msg)
def log_error(msg):
current_run.log_error(msg)
except ImportError:
workspace = None
def log_info(msg):
print(f"[INFO] {msg}")
def log_warning(msg):
print(f"[WARNING] {msg}")
def log_error(msg):
print(f"[ERROR] {msg}")

Copilot uses AI. Check for mistakes.
parts = s.split("|", 1)
if len(parts) == 2:
return parts[0], parts[1]
return None, parts[0]
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function returns inconsistent tuple positions when no pipe is found. When a pipe exists, it returns (before_pipe, after_pipe). When no pipe exists, it returns (None, entire_string), reversing the expected pattern. This should return (entire_string, None) to maintain consistency, where the first element is always the content before the pipe (or entire string if no pipe).

Suggested change
return None, parts[0]
return parts[0], None

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 18 out of 20 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +558 to +559
periods_in_data = exhaustivity_df["PERIOD"].unique().to_list() if len(exhaustivity_df) > 0 else []
cocs_in_data = exhaustivity_df["CATEGORY_OPTION_COMBO"].unique().to_list() if len(exhaustivity_df) > 0 else []
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition len(exhaustivity_df) > 0 is checked twice in consecutive lines. Consider checking once and reusing the result to avoid redundant DataFrame length checks.

Suggested change
periods_in_data = exhaustivity_df["PERIOD"].unique().to_list() if len(exhaustivity_df) > 0 else []
cocs_in_data = exhaustivity_df["CATEGORY_OPTION_COMBO"].unique().to_list() if len(exhaustivity_df) > 0 else []
periods_in_data = exhaustivity_df["PERIOD"].unique().to_list() if total_combinations > 0 else []
cocs_in_data = exhaustivity_df["CATEGORY_OPTION_COMBO"].unique().to_list() if total_combinations > 0 else []

Copilot uses AI. Check for mistakes.
Comment on lines 8 to 18
from d2d_library.db_queue import Queue
from d2d_library.dhis2_dataset_utils import dhis2_request, push_dataset_org_units
from d2d_library.dhis2_extract_handlers import DHIS2Extractor
from d2d_library.dhis2_pusher import DHIS2Pusher
from dateutil.relativedelta import relativedelta
from exhaustivity_calculation import compute_exhaustivity
from openhexa.sdk import current_run, parameter, pipeline, workspace
from openhexa.toolbox.dhis2 import DHIS2
from openhexa.toolbox.dhis2.dataframe import get_datasets
from openhexa.toolbox.dhis2.periods import period_from_string
from utils import (
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using relative imports (e.g., from .d2d_library.db_queue import Queue) to make the module more portable and explicit about its package structure.

Suggested change
from d2d_library.db_queue import Queue
from d2d_library.dhis2_dataset_utils import dhis2_request, push_dataset_org_units
from d2d_library.dhis2_extract_handlers import DHIS2Extractor
from d2d_library.dhis2_pusher import DHIS2Pusher
from dateutil.relativedelta import relativedelta
from exhaustivity_calculation import compute_exhaustivity
from openhexa.sdk import current_run, parameter, pipeline, workspace
from openhexa.toolbox.dhis2 import DHIS2
from openhexa.toolbox.dhis2.dataframe import get_datasets
from openhexa.toolbox.dhis2.periods import period_from_string
from utils import (
from .d2d_library.db_queue import Queue
from .d2d_library.dhis2_dataset_utils import dhis2_request, push_dataset_org_units
from .d2d_library.dhis2_extract_handlers import DHIS2Extractor
from .d2d_library.dhis2_pusher import DHIS2Pusher
from dateutil.relativedelta import relativedelta
from .exhaustivity_calculation import compute_exhaustivity
from openhexa.sdk import current_run, parameter, pipeline, workspace
from openhexa.toolbox.dhis2 import DHIS2
from openhexa.toolbox.dhis2.dataframe import get_datasets
from openhexa.toolbox.dhis2.periods import period_from_string
from .utils import (

Copilot uses AI. Check for mistakes.
raise


def split_on_pipe(s: str) -> tuple[str, str | None]:
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return order is incorrect. When no pipe is found, the function returns (None, parts[0]), but the type hint and docstring indicate it should return (parts[0], None) - the part before the pipe should be first, followed by the part after (or None).

Copilot uses AI. Check for mistakes.
parts = s.split("|", 1)
if len(parts) == 2:
return parts[0], parts[1]
return None, parts[0]
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return order is incorrect. When no pipe is found, the function returns (None, parts[0]), but the type hint and docstring indicate it should return (parts[0], None) - the part before the pipe should be first, followed by the part after (or None).

Suggested change
return None, parts[0]
return parts[0], None

Copilot uses AI. Check for mistakes.
for i_e, error in enumerate(errors, start=1):
error_value = error.get("value", None)
error_code = error.get("errorCode", None)
logger.error(f"Error {i_e} value: {error_value} (DHSI2 errorCode: {error_code})")
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'DHIS2' from 'DHSI2'.

Suggested change
logger.error(f"Error {i_e} value: {error_value} (DHSI2 errorCode: {error_code})")
logger.error(f"Error {i_e} value: {error_value} (DHIS2 errorCode: {error_code})")

Copilot uses AI. Check for mistakes.
from openhexa.toolbox.dhis2 import DHIS2


class AnanalyticsDataElementExtractor:
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'Analytics' from 'Ananalytics' (extra 'na').

Copilot uses AI. Check for mistakes.
Comment on lines +494 to +496
dx_uid_to_null_flag = {
dx_uid: null_flag
for dx_uid, null_flag in zip(dx_uids_present_list, null_flags_list, strict=False)
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using strict=False in zip silently allows lists of different lengths, which could hide data integrity issues. Consider using strict=True (Python 3.10+) or adding explicit length validation before the zip operation to catch mismatched data early.

Suggested change
dx_uid_to_null_flag = {
dx_uid: null_flag
for dx_uid, null_flag in zip(dx_uids_present_list, null_flags_list, strict=False)
if len(dx_uids_present_list) != len(null_flags_list):
logger.warning(
"Mismatched lengths for DX_UIDs and NULL_FLAGS "
"for PERIOD=%s, COC=%s, ORG_UNIT=%s: %d vs %d",
period,
coc,
org_unit,
len(dx_uids_present_list),
len(null_flags_list),
)
dx_uid_to_null_flag = {
dx_uid: null_flag
for dx_uid, null_flag in zip(dx_uids_present_list, null_flags_list)

Copilot uses AI. Check for mistakes.
@claude-marie claude-marie requested a review from Copilot January 13, 2026 10:33
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 18 out of 20 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

# 3) Compute exhaustivity from extracted data (automatic if extract or push is enabled)
# If extract is enabled → compute after extraction
# If push is enabled → compute before pushing (even if extract is disabled, in case of recovery)
should_compute = run_extract_data or run_push_data
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try to simplify this by moving the exhaustivity computation inside the extract process and queue results:

Run Extract task :
for extract - for period:
-> extract(extract_1, 202501)
-> exhaustivity_extract_1, 202501 = compute_exhaustivity(extract_1, 202501)
-> queue(exhaustivity_extract_1 | 202501)
-> next period , next extract
-> -END-

Push Task :
-> while True:
-> next_exhaustivity_extract = dequeue()
-> mapping(next_exhaustivity_extract )
-> push(next_exhaustivity_extract )

f"Extract {extract_id} download returned None "
f"for period {period}. No file created."
)
elif not output_file.exists():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the extractor.download_period() function returning file names for extracts that are not downloaded??

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can just do like here:

for period in extract_periods:

f"for period {period}: {output_file}"
)

except Exception as e:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's maybe put the exhaustivity computation after successfully downloading the extract inside the for loop and after the try: block like:

for periods:
try:
extract = retrieve data extract for period....
except
try:
if (extract):
exhaustivity_extract = compute exhaustivity for (extract)
queue(exhaustivity_extract)
except

Correctly handling and logging errors/warnings.

Copy link
Collaborator

@EstebanMontandon EstebanMontandon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few adjustments are still needed, but we’re almost there. Thanks for the effort ... let's do one final round of checks👍

pipeline_path=pipeline_path,
run_task=run_push_data,
wait=(compute_ready and sync_ready),
wait=(sync_ready and (extract_ready if run_extract_data else True)),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of the tasks extract and push is that they can work asynchronously. As soon there's data to be pushed, we push it. The wait in the push task should be set only to sync_ready

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Queue class includes mutual-exclusion, so only one process can read or write a file at a time, so far so good. so there's no need to initialize it here, actually I noticed, this init is already included to the extract and push task, that's good, so this can be removed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preferably , do not include spaces in folder names , use "_"

# setup
configure_logging(logs_path=pipeline_path / "logs" / "push", task_name="push_data")
# load pipeline configuration
config = load_pipeline_config(pipeline_path / "configuration")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still not resolved

)

# Signal end of compute/enqueue
push_queue.enqueue("FINISH")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to include this inside the method handle_data_element_extracts()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the extractor class in the D2D , it handles the creation of folders and files. No need to handle it here.
We also have a overwrite option which usually for this type of pipeline is useful to replace existing files.
The extract implementation here should be the same as the other pipelines (for example dhis2_cmm_push).

current_run.log_warning(
f"Extract {extract_id} download returned path {output_file} "
f"but file does not exist for period {period}."
except Exception as e:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you just need to catch the exception and log it , the extractor class will raise the corresponding error in each case.


# Compute exhaustivity and enqueue if extraction succeeded
try:
if extract_path is not None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This try block should be moved inside the method compute_exhaustivity(), there we use this to handle the right behaviour: For example, if an exception occurs we skip to the next period maybe? and at the end of the loop we ensure the label "FINISH" is enqueued inside finally (try finally expression).
see example:

push_queue.enqueue("FINISH")

f"Extract {extract_id} successfully created file "
f"for period {period}: {output_file}"

if len(period_exhaustivity_df) == 0:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we handle all the extra logic inside the calculate_exhaustivity()? let's try to keep the extract task as clean as possible.

dataset_name = source_dataset["name"][0] if len(source_dataset) > 0 else "Unknown"

# Limit org units for testing if MAX_ORG_UNITS_FOR_TEST is set in config
if pipeline_config and pipeline_config.get("EXTRACTION", {}).get("MAX_ORG_UNITS_FOR_TEST"):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a good idea to leave testing option in production code. You could instead use a "extract_config.json" that contains a subset or something perhaps...

# Prepare exhaustivity context for this extract (compute will run per period)
expected_dx_uids = None
if pipeline_config:
drug_mapping_file = extract.get("DRUG_MAPPING_FILE")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can simplify the configuration by directly loading the drug config from a specific file.
This file should not change :

Its ok to just hardcode the path to the file
drug_mapping_file = load_drug_mapping(pipeline_path / "something" / "file" ) # I assume this raise the corresponding exception

try:
old_file.unlink()
logging.info(f"Deleted old processed file: {old_file.name}")
extract_path = dhis2_extractor.analytics_data_elements.download_period(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're downloading data elements (with DE UIDS) from DHIS2 , we should not retrieve data elements from analytics endpoint (This requires analytics process to compute indicators in the DHIS2, so the data can be available. In our case we're just downloading raw datapoints , data_value_sets endpoint provides direct access to this).
We can simply reuse the existing implementation and avoid re-inventing the wheel:

dhis2_extractor.data_elements.download_period() ---> this class extracts from data_value_sets.

Please do not modify the D2D library unless there's an additional functionality needed or a fix. I believe the current implementation cover most of the requirements of this ticket.

# Compute exhaustivity and enqueue if extraction succeeded
if extract_path is not None:
try:
compute_exhaustivity_and_enqueue(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of keeping this function inside a try block, can we handle errors and logs inside the function itself?

@claude-marie
Copy link
Author

So I now have the same d2d_lib as cmm_push as you asked.
I moved some logic in the utils.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants