Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 65 additions & 33 deletions lambdas/ryhti_client/lambda_function.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import datetime
import enum
import logging
import os
Expand Down Expand Up @@ -396,41 +397,80 @@ def handler(
elif event_type is Action.COPY_PLAN:
data = event.get("data") or {}
LOGGER.debug("data: %s", data)

lifecycle_status_uuid = data.get("lifecycle_status_uuid")
LOGGER.debug("lifecycle_status_uuid: %s", lifecycle_status_uuid)

plan_name = data.get("plan_name")
if plan_uuid is None or lifecycle_status_uuid is None or plan_name is None:
LOGGER.warning("Copying plan failed. Required parameters missing.")

# Optional validity dates
period_of_validity_start = None
period_of_validity_end = None

try:
if data.get("period_of_validity_start"):
period_of_validity_start = datetime.date.fromisoformat(
data["period_of_validity_start"]
)

if data.get("period_of_validity_end"):
period_of_validity_end = datetime.date.fromisoformat(
data["period_of_validity_end"]
)
except ValueError:
LOGGER.warning("Invalid date format in copy plan request.")
status_code = 400
title = "Error copying plan."
copy_details = {
"error": "Missing some required parameter: 'plan_uuid', 'data.lifecycle_status_uuid' or 'data.plan_name'"
"error": "Invalid date format. Expected ISO format YYYY-MM-DD."
}

else:
LOGGER.info("Copying plan...")
try:
copied_plan_id = database_client.copy_plan(
plan_uuid, lifecycle_status_uuid, plan_name
)
status_code = 200
title = "Plan copied."
copy_details = {"copied_plan_id": str(copied_plan_id)}

except Exception as e:
if (
plan_uuid is None
or lifecycle_status_uuid is None
or plan_name is None
):
LOGGER.warning("Copying plan failed. Required parameters missing.")
status_code = 400
title = "Error copying plan."
if isinstance(e, PlanNotFoundError):
status_code = 404
copy_details = {"error": f'plan "{plan_uuid}" not found'}
if isinstance(e, LifeCycleStatusNotFoundError):
status_code = 404
copy_details = {
"error": f'lifecycle_status_uuid" {lifecycle_status_uuid} not found'
}
else:
LOGGER.exception("Error copying plan.")
status_code = 500
copy_details = {"error": "Please contact support"}
copy_details = {
"error": (
"Missing some required parameter: "
"'plan_uuid', 'data.lifecycle_status_uuid' or 'data.plan_name'"
)
}

else:
LOGGER.info("Copying plan...")
try:
copied_plan_id = database_client.copy_plan(
plan_uuid,
lifecycle_status_uuid,
plan_name,
period_of_validity_start=period_of_validity_start,
period_of_validity_end=period_of_validity_end,
)
status_code = 200
title = "Plan copied."
copy_details = {"copied_plan_id": str(copied_plan_id)}

except Exception as e:
title = "Error copying plan."
if isinstance(e, PlanNotFoundError):
status_code = 404
copy_details = {"error": f'plan "{plan_uuid}" not found'}
elif isinstance(e, LifeCycleStatusNotFoundError):
status_code = 404
copy_details = {
"error": (
f'lifecycle_status_uuid "{lifecycle_status_uuid}" not found'
)
}
else:
LOGGER.exception("Error copying plan.")
status_code = 500
copy_details = {"error": "Please contact support"}

lambda_response = Response(
statusCode=status_code,
Expand All @@ -441,14 +481,6 @@ def handler(
),
)

else:
lambda_response = Response(
statusCode=400,
body=ResponseBody(
title="No action taken.", details={}, ryhti_responses={}
),
)

elif event_type is Action.IMPORT_PLAN:
data = event.get("data") or {}
plan_json = data.get("plan_json")
Expand Down
19 changes: 15 additions & 4 deletions lambdas/ryhti_client/ryhti_client/database_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1338,7 +1338,12 @@ def import_plan(
return plan.id

def copy_plan(
self, plan_id: str, lifecycle_status_id: str, plan_name: dict[str, str]
self,
plan_id: str,
lifecycle_status_id: str,
plan_name: dict[str, str],
period_of_validity_start: datetime.date | None = None,
period_of_validity_end: datetime.date | None = None,
) -> DbId | None:
"""Deep copy plan instance with all associated child objects and relationships."""
with self.Session(autoflush=False, expire_on_commit=False) as session:
Expand All @@ -1352,10 +1357,16 @@ def copy_plan(
if new_lifecycle_status is None:
raise LifeCycleStatusNotFoundError(UUID(lifecycle_status_id))

plan_copier = PlanCopier(plan, new_lifecycle_status, plan_name)
copied_plan = plan_copier.copy_plan()
plan_copier = PlanCopier(
plan=plan,
lifecycle_status=new_lifecycle_status,
plan_name=plan_name,
period_of_validity_start=period_of_validity_start,
period_of_validity_end=period_of_validity_end,
)

copied_plan = plan_copier.copy_plan()
session.add(copied_plan)
session.commit()

return copied_plan.id
return copied_plan.id
80 changes: 52 additions & 28 deletions lambdas/ryhti_client/ryhti_client/plan_copier.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from __future__ import annotations

from datetime import date
from typing import Any, TypeVar
from uuid import uuid4

Expand All @@ -15,64 +18,81 @@ def __init__(
plan: models.Plan,
lifecycle_status: codes.LifeCycleStatus,
plan_name: dict[str, str],
period_of_validity_start: date | None = None,
period_of_validity_end: date | None = None,
) -> None:
self.plan = plan
self.lifecycle_status = lifecycle_status
self.plan_name = plan_name
self.period_of_validity_start = period_of_validity_start
self.period_of_validity_end = period_of_validity_end

# Mapping original regulation group ID → duplicated regulation group
self.regulation_group_mapping: dict[str, models.PlanRegulationGroup] = {}

@classmethod
def clone_model(cls, obj: MODEL, **overrides: Any) -> MODEL:
"""Clone a SQLAlchemy model instance with the same column data."""
"""Clone a SQLAlchemy model instance with the same column data. Optionally copy period_of_validity."""
model_class = type(obj)
mapper = class_mapper(model_class)

data = {
c.key: getattr(obj, c.key) for c in mapper.columns if c.key not in overrides
column.key: getattr(obj, column.key)
for column in mapper.columns
if column.key not in overrides
}

data.update(overrides)
return model_class(**data)

def copy_plan(self) -> models.Plan:
self.regulation_group_mapping: dict[str, models.PlanRegulationGroup] = {}
self.duplicate_plan = self.clone_model(
self.plan,
id=uuid4(),
lifecycle_status=self.lifecycle_status,
name=self.plan_name,
)
overrides: dict[str, Any] = {
"id": uuid4(),
"lifecycle_status": self.lifecycle_status,
"name": self.plan_name,
}

if self.period_of_validity_start is not None:
overrides["period_of_validity_start"] = self.period_of_validity_start

if self.period_of_validity_end is not None:
overrides["period_of_validity_end"] = self.period_of_validity_end

self.duplicate_plan = self.clone_model(self.plan, **overrides)

# Documents
self.duplicate_plan.documents = [
self.clone_model(document, id=uuid4(), plan=self.duplicate_plan)
for document in self.plan.documents
]

# Master plan effects
self.duplicate_plan.legal_effects_of_master_plan = (
self.plan.legal_effects_of_master_plan
)

# Regulation groups
# Regulation groups and dependencies
self.copy_regulation_groups()

# Plan objects
duplicate_land_use_areas = self.copy_plan_objects(self.plan.land_use_areas)
self.duplicate_plan.land_use_areas = duplicate_land_use_areas

duplicate_other_areas = self.copy_plan_objects(self.plan.other_areas)
self.duplicate_plan.other_areas = duplicate_other_areas

duplicate_lines = self.copy_plan_objects(self.plan.lines)
self.duplicate_plan.lines = duplicate_lines

duplicate_points = self.copy_plan_objects(self.plan.points)
self.duplicate_plan.points = duplicate_points
self.duplicate_plan.land_use_areas = self.copy_plan_objects(
self.plan.land_use_areas
)
self.duplicate_plan.other_areas = self.copy_plan_objects(self.plan.other_areas)
self.duplicate_plan.lines = self.copy_plan_objects(self.plan.lines)
self.duplicate_plan.points = self.copy_plan_objects(self.plan.points)

return self.duplicate_plan

def copy_regulation_groups(self) -> None:
duplicate_general_regulation_groups = []
duplicate_regulation_groups = []
duplicate_regulation_groups: list[models.PlanRegulationGroup] = []
duplicate_general_regulation_groups: list[models.PlanRegulationGroup] = []

for regulation_group in self.plan.regulation_groups:
duplicate_regulation_group = self.clone_model(
regulation_group, id=uuid4(), plan=self.duplicate_plan
)

self.regulation_group_mapping[regulation_group.id] = (
duplicate_regulation_group
)
Expand All @@ -84,6 +104,7 @@ def copy_regulation_groups(self) -> None:
self.copy_propositions(regulation_group, duplicate_regulation_group)

duplicate_regulation_groups.append(duplicate_regulation_group)

if regulation_group in self.plan.general_plan_regulation_groups:
duplicate_general_regulation_groups.append(duplicate_regulation_group)

Expand Down Expand Up @@ -125,24 +146,27 @@ def copy_propositions(
plan_regulation_group=duplicate_regulation_group,
lifecycle_status=self.lifecycle_status,
)

duplicate_proposition.plan_themes = proposition.plan_themes

def copy_plan_objects(
self, plan_objects: list[PLAN_OBJECT_MODEL]
) -> list[PLAN_OBJECT_MODEL]:
duplicate_plan_objects = []
duplicate_plan_objects: list[PLAN_OBJECT_MODEL] = []

for plan_object in plan_objects:
duplicate_plan_object = self.clone_model(
plan_object,
id=uuid4(),
plan=self.duplicate_plan,
lifecycle_status=self.lifecycle_status,
)
duplicate_regulation_groups = [
self.regulation_group_mapping[original_regulation_group.id]
for original_regulation_group in plan_object.plan_regulation_groups

duplicate_plan_object.plan_regulation_groups = [
self.regulation_group_mapping[rg.id]
for rg in plan_object.plan_regulation_groups
]
duplicate_plan_object.plan_regulation_groups = duplicate_regulation_groups

duplicate_plan_objects.append(duplicate_plan_object)

return duplicate_plan_objects