Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 30 additions & 28 deletions snt_malaria_budgeting/core/budget_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,59 +126,61 @@ def get_places_costs(self, year):
budget["currency"] == self.budget_currency.upper()
]

# Group to have cost per place and intervention type and code
grouped_per_place_and_intervention = budget_filtered_by_currency.groupby(
[self.spatial_planning_unit, "type_intervention", "code_intervention"]
# Group to have cost per place, intervention type/code, and cost_class
grouped_per_place_intervention_class = budget_filtered_by_currency.groupby(
[
self.spatial_planning_unit,
"type_intervention",
"code_intervention",
"cost_class",
]
)["cost_element"].sum()

# get total costs per place
place_totals = budget_filtered_by_currency.groupby(self.spatial_planning_unit)[
"cost_element"
].sum()

# get costs per place and cost_class
place_cost_class_totals = budget_filtered_by_currency.groupby(
[self.spatial_planning_unit, "cost_class"]
)["cost_element"].sum()

places_with_interventions = set(
grouped_per_place_and_intervention.index.get_level_values(0)
)
places_with_cost_classes = set(
place_cost_class_totals.index.get_level_values(0)
places_with_data = set(
grouped_per_place_intervention_class.index.get_level_values(0)
)

place_costs = []
for place in self.places:
interventions_list = []
if place in places_with_interventions:
if place in places_with_data:
place_data = grouped_per_place_intervention_class[place]
interventions_dict = {}
for (
type_intervention,
code_intervention,
), cost in grouped_per_place_and_intervention[place].items():
cost_class,
), cost in place_data.items():
key = (type_intervention, code_intervention)
if key not in interventions_dict:
interventions_dict[key] = {
"type": type_intervention,
"code": code_intervention,
"total_cost": 0,
"cost_breakdown": [],
}
if cost > 0:
interventions_list.append(
{
"type": type_intervention,
"code": code_intervention,
"total_cost": cost,
}
interventions_dict[key]["total_cost"] += cost
interventions_dict[key]["cost_breakdown"].append(
{"cost_class": cost_class, "cost": cost}
)

cost_breakdown = []
if place in places_with_cost_classes:
for cost_class, cost in place_cost_class_totals[place].items():
if cost > 0:
cost_breakdown.append({"cost_class": cost_class, "cost": cost})
interventions_list = [
v for v in interventions_dict.values() if v["total_cost"] > 0
]

place_costs.append(
{
"place": place,
"total_cost": place_totals.get(place, 0),
"cost_breakdown": cost_breakdown,
"interventions": interventions_list,
}
)

return place_costs

def _get_scenario_data(
Expand Down
89 changes: 45 additions & 44 deletions tests/core/test_budget_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,14 @@ def test_get_budget_use_default_currency(self):
place_cost for place_cost in places_costs if place_cost["place"] == 1001
)
self.assertAlmostEqual(place_1001["total_cost"], correct_iptp_cost)
self.assertEqual(len(place_1001["cost_breakdown"]), 1)
self.assertEqual(place_1001["cost_breakdown"][0]["cost_class"], "Commodity")
self.assertAlmostEqual(
place_1001["cost_breakdown"][0]["cost"], correct_iptp_cost
)
self.assertEqual(len(place_1001["interventions"]), 1)
place_iptp = place_1001["interventions"][0]
self.assertEqual(place_iptp["type"], "SP")
self.assertEqual(place_iptp["code"], "iptp")
self.assertAlmostEqual(place_iptp["total_cost"], correct_iptp_cost)
self.assertEqual(len(place_iptp["cost_breakdown"]), 1)
self.assertEqual(place_iptp["cost_breakdown"][0]["cost_class"], "Commodity")
self.assertAlmostEqual(place_iptp["cost_breakdown"][0]["cost"], correct_iptp_cost)

def test_get_budget_iptp(self):
interventions = [InterventionDetailModel(code="iptp", type="SP", places=[1001])]
Expand Down Expand Up @@ -136,16 +134,14 @@ def test_get_budget_iptp(self):
place_cost for place_cost in places_costs if place_cost["place"] == 1001
)
self.assertAlmostEqual(place_1001["total_cost"], correct_iptp_cost)
self.assertEqual(len(place_1001["cost_breakdown"]), 1)
self.assertEqual(place_1001["cost_breakdown"][0]["cost_class"], "Commodity")
self.assertAlmostEqual(
place_1001["cost_breakdown"][0]["cost"], correct_iptp_cost
)
self.assertEqual(len(place_1001["interventions"]), 1)
place_iptp = place_1001["interventions"][0]
self.assertEqual(place_iptp["type"], "SP")
self.assertEqual(place_iptp["code"], "iptp")
self.assertAlmostEqual(place_iptp["total_cost"], correct_iptp_cost)
self.assertEqual(len(place_iptp["cost_breakdown"]), 1)
self.assertEqual(place_iptp["cost_breakdown"][0]["cost_class"], "Commodity")
self.assertAlmostEqual(place_iptp["cost_breakdown"][0]["cost"], correct_iptp_cost)

def test_get_budget_itn_routine(self):
interventions = [
Expand Down Expand Up @@ -272,16 +268,16 @@ def test_get_budget_itn_campaign(self):
place_cost for place_cost in places_costs if place_cost["place"] == 1001
)
self.assertAlmostEqual(place_1001["total_cost"], correct_itn_campaign_cost)
self.assertEqual(len(place_1001["cost_breakdown"]), 1)
self.assertEqual(place_1001["cost_breakdown"][0]["cost_class"], "Procurement")
self.assertAlmostEqual(
place_1001["cost_breakdown"][0]["cost"], correct_itn_campaign_cost
)
self.assertEqual(len(place_1001["interventions"]), 1)
place_iptp = place_1001["interventions"][0]
self.assertEqual(place_iptp["type"], "PBO")
self.assertEqual(place_iptp["code"], "itn_campaign")
self.assertAlmostEqual(place_iptp["total_cost"], correct_itn_campaign_cost)
self.assertEqual(len(place_iptp["cost_breakdown"]), 1)
self.assertEqual(place_iptp["cost_breakdown"][0]["cost_class"], "Procurement")
self.assertAlmostEqual(
place_iptp["cost_breakdown"][0]["cost"], correct_itn_campaign_cost
)

def test_get_budget_smc(self):
interventions = [
Expand Down Expand Up @@ -361,16 +357,14 @@ def test_get_budget_smc(self):
place_cost for place_cost in places_costs if place_cost["place"] == 1001
)
self.assertAlmostEqual(place_1001["total_cost"], correct_smc_cost)
self.assertEqual(len(place_1001["cost_breakdown"]), 1)
self.assertEqual(place_1001["cost_breakdown"][0]["cost_class"], "Commodity")
self.assertAlmostEqual(
place_1001["cost_breakdown"][0]["cost"], correct_smc_cost
)
self.assertEqual(len(place_1001["interventions"]), 1)
place_iptp = place_1001["interventions"][0]
self.assertEqual(place_iptp["type"], "SP+AQ")
self.assertEqual(place_iptp["code"], "smc")
self.assertAlmostEqual(place_iptp["total_cost"], correct_smc_cost)
self.assertEqual(len(place_iptp["cost_breakdown"]), 1)
self.assertEqual(place_iptp["cost_breakdown"][0]["cost_class"], "Commodity")
self.assertAlmostEqual(place_iptp["cost_breakdown"][0]["cost"], correct_smc_cost)

def test_get_budget_pmc(self):
interventions = [InterventionDetailModel(code="pmc", type="SP", places=[1001])]
Expand Down Expand Up @@ -444,16 +438,14 @@ def test_get_budget_pmc(self):
place_cost for place_cost in places_costs if place_cost["place"] == 1001
)
self.assertAlmostEqual(place_1001["total_cost"], correct_pmc_cost)
self.assertEqual(len(place_1001["cost_breakdown"]), 1)
self.assertEqual(place_1001["cost_breakdown"][0]["cost_class"], "Commodity")
self.assertAlmostEqual(
place_1001["cost_breakdown"][0]["cost"], correct_pmc_cost
)
self.assertEqual(len(place_1001["interventions"]), 1)
place_iptp = place_1001["interventions"][0]
self.assertEqual(place_iptp["type"], "SP")
self.assertEqual(place_iptp["code"], "pmc")
self.assertAlmostEqual(place_iptp["total_cost"], correct_pmc_cost)
self.assertEqual(len(place_iptp["cost_breakdown"]), 1)
self.assertEqual(place_iptp["cost_breakdown"][0]["cost_class"], "Commodity")
self.assertAlmostEqual(place_iptp["cost_breakdown"][0]["cost"], correct_pmc_cost)

def test_get_budget_vacc(self):
interventions = [
Expand Down Expand Up @@ -514,16 +506,16 @@ def test_get_budget_vacc(self):
place_cost for place_cost in places_costs if place_cost["place"] == 1001
)
self.assertAlmostEqual(place_1001["total_cost"], correct_vacc_cost)
self.assertEqual(len(place_1001["cost_breakdown"]), 1)
self.assertEqual(place_1001["cost_breakdown"][0]["cost_class"], "Commodity")
self.assertAlmostEqual(
place_1001["cost_breakdown"][0]["cost"], correct_vacc_cost
)
self.assertEqual(len(place_1001["interventions"]), 1)
place_iptp = place_1001["interventions"][0]
self.assertEqual(place_iptp["type"], "R21")
self.assertEqual(place_iptp["code"], "vacc")
self.assertAlmostEqual(place_iptp["total_cost"], correct_vacc_cost)
self.assertEqual(len(place_iptp["cost_breakdown"]), 1)
self.assertEqual(place_iptp["cost_breakdown"][0]["cost_class"], "Commodity")
self.assertAlmostEqual(
place_iptp["cost_breakdown"][0]["cost"], correct_vacc_cost
)

def test_get_budget_multiple_interventions(self):
interventions = [
Expand Down Expand Up @@ -632,42 +624,46 @@ def test_get_budget_multiple_interventions(self):
self.assertAlmostEqual(
place_1001["total_cost"], correct_iptp_cost_location_1001
)
self.assertEqual(len(place_1001["cost_breakdown"]), 1)
self.assertEqual(place_1001["cost_breakdown"][0]["cost_class"], "Commodity")
self.assertAlmostEqual(
place_1001["cost_breakdown"][0]["cost"], correct_iptp_cost_location_1001
)
self.assertEqual(len(place_1001["interventions"]), 1)
place_iptp = place_1001["interventions"][0]
self.assertEqual(place_iptp["type"], "SP")
self.assertEqual(place_iptp["code"], "iptp")
self.assertAlmostEqual(
place_iptp["total_cost"], correct_iptp_cost_location_1001
)
self.assertEqual(len(place_iptp["cost_breakdown"]), 1)
self.assertEqual(place_iptp["cost_breakdown"][0]["cost_class"], "Commodity")
self.assertAlmostEqual(
place_iptp["cost_breakdown"][0]["cost"], correct_iptp_cost_location_1001
)

place_1002 = next(
place_cost for place_cost in places_costs if place_cost["place"] == 1002
)
self.assertAlmostEqual(
place_1002["total_cost"], correct_iptp_cost_location_1002 + correct_smc_cost
)
self.assertEqual(len(place_1002["cost_breakdown"]), 1)
self.assertEqual(place_1002["cost_breakdown"][0]["cost_class"], "Commodity")
self.assertAlmostEqual(
place_1002["cost_breakdown"][0]["cost"],
correct_iptp_cost_location_1002 + correct_smc_cost,
)
self.assertEqual(len(place_1002["interventions"]), 2)
place_iptp = place_1002["interventions"][0]
self.assertEqual(place_iptp["type"], "SP")
self.assertEqual(place_iptp["code"], "iptp")
self.assertAlmostEqual(
place_iptp["total_cost"], correct_iptp_cost_location_1002
)
self.assertEqual(len(place_iptp["cost_breakdown"]), 1)
self.assertEqual(place_iptp["cost_breakdown"][0]["cost_class"], "Commodity")
self.assertAlmostEqual(
place_iptp["cost_breakdown"][0]["cost"], correct_iptp_cost_location_1002
)
place_smc = place_1002["interventions"][1]
self.assertEqual(place_smc["type"], "SP+AQ")
self.assertEqual(place_smc["code"], "smc")
self.assertAlmostEqual(place_smc["total_cost"], correct_smc_cost)
self.assertEqual(len(place_smc["cost_breakdown"]), 1)
self.assertEqual(place_smc["cost_breakdown"][0]["cost_class"], "Commodity")
self.assertAlmostEqual(
place_smc["cost_breakdown"][0]["cost"], correct_smc_cost
)

def test_budget_itn(self):
interventions = [
Expand Down Expand Up @@ -834,8 +830,13 @@ def test_get_places_costs_multiple_cost_classes(self):
place_cost for place_cost in places_costs if place_cost["place"] == 1001
)
self.assertAlmostEqual(place_1001["total_cost"], correct_total_cost)
self.assertEqual(len(place_1001["cost_breakdown"]), 3)
for cost_breakdown in place_1001["cost_breakdown"]:
self.assertEqual(len(place_1001["interventions"]), 1)
place_itn_routine = place_1001["interventions"][0]
self.assertEqual(place_itn_routine["type"], "Dual AI")
self.assertEqual(place_itn_routine["code"], "itn_routine")
self.assertAlmostEqual(place_itn_routine["total_cost"], correct_total_cost)
self.assertEqual(len(place_itn_routine["cost_breakdown"]), 3)
for cost_breakdown in place_itn_routine["cost_breakdown"]:
if cost_breakdown["cost_class"] == "Procurement":
self.assertAlmostEqual(cost_breakdown["cost"], correct_procurement_cost)
elif cost_breakdown["cost_class"] == "Distribution":
Expand All @@ -845,4 +846,4 @@ def test_get_places_costs_multiple_cost_classes(self):
elif cost_breakdown["cost_class"] == "Operational":
self.assertAlmostEqual(cost_breakdown["cost"], correct_operational_cost)
else:
self.fail("Unexpected cost_class in place cost_breakdown")
self.fail("Unexpected cost_class in itn_routine cost_breakdown")
Loading