Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions pyatlan/client/aio/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,12 @@ async def save(
"""
Async save method - creates or updates assets based on qualified_name.

When using AtlanTag with semantic values:
- APPEND: adds/updates the tag using addOrUpdateClassifications
- REMOVE: removes the tag using removeClassifications
- REPLACE: replaces all tags on the asset
- None: uses existing logic based on replace_atlan_tags and append_atlan_tags flags

:param entity: one or more assets to save
:param replace_atlan_tags: whether to replace AtlanTags during an update
:param replace_custom_metadata: replaces any custom metadata with non-empty values provided
Expand All @@ -352,7 +358,22 @@ async def save(
:raises AtlanError: on any API communication issue
:raises ApiError: if a connection was created and blocking until policies are synced overruns the retry limit
"""
# Convert entity to list for consistent handling
entities: List[Asset] = []
if isinstance(entity, list):
entities.extend(entity)
else:
entities.append(entity)

# Check if any entity has tags with semantic
if Save.has_tags_with_semantic(entities):
return await self._save_with_tag_semantic(
entities=entities,
replace_custom_metadata=replace_custom_metadata,
overwrite_custom_metadata=overwrite_custom_metadata,
)

# Use existing logic for backward compatibility
query_params, request = await Save.prepare_request_async(
entity=entity,
replace_atlan_tags=replace_atlan_tags,
Expand All @@ -367,6 +388,54 @@ async def save(
await self._wait_for_connections_to_be_created(connections_created)
return response

async def _save_with_tag_semantic(
self,
entities: List[Asset],
replace_custom_metadata: bool = False,
overwrite_custom_metadata: bool = False,
) -> AssetMutationResponse:
"""
Internal async method to handle saving assets with tag semantic values.
Updates query params based on semantics and makes a single API call.

If entities have APPEND/REMOVE semantic tags → appendTags=True
If entities have REPLACE semantic tags → replaceTags=True
If both are present → both flags True → backend error (user must make separate calls)

:param entities: list of assets to save
:param replace_custom_metadata: replaces any custom metadata with non-empty values provided
:param overwrite_custom_metadata: overwrites any custom metadata, even with empty values
:returns: AssetMutationResponse from the API call
"""
# Determine which flags to set based on semantics present
has_append_remove, has_replace = Save.get_semantic_flags(entities)

# Process entities with APPEND/REMOVE semantic to set classification fields
for entity in entities:
Save.process_asset_for_append_remove_semantic(entity)

# Validate and flush custom metadata
await Save.validate_and_flush_entities_async(entities, self._client) # type: ignore[arg-type]

# Build query params based on semantics
# If user mixes APPEND/REMOVE and REPLACE, both flags will be True → backend error
query_params = {
"replaceTags": has_replace,
"appendTags": has_append_remove,
"replaceBusinessAttributes": replace_custom_metadata,
"overwriteBusinessAttributes": overwrite_custom_metadata,
}

request = BulkRequest[Asset](entities=entities)
raw_json = await self._client._call_api(BULK_UPDATE, query_params, request)
response = Save.process_response(raw_json)

# Handle connection waiting for any created connections
if connections_created := response.assets_created(Connection):
await self._wait_for_connections_to_be_created(connections_created)

return response

async def _wait_for_connections_to_be_created(self, connections_created):
guids = Save.get_connection_guids_to_wait_for(connections_created)

Expand Down
72 changes: 71 additions & 1 deletion pyatlan/client/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
Table,
View,
)
from pyatlan.model.core import Announcement, AtlanObject, SearchRequest
from pyatlan.model.core import Announcement, AtlanObject, BulkRequest, SearchRequest
from pyatlan.model.custom_metadata import CustomMetadataDict
from pyatlan.model.enums import (
AssetCreationHandling,
Expand Down Expand Up @@ -432,6 +432,12 @@ def save(
If an asset does exist, opertionally overwrites any Atlan tags. Custom metadata will either be
overwritten or merged depending on the options provided.

When using AtlanTag with semantic values:
- APPEND: adds/updates the tag using addOrUpdateClassifications
- REMOVE: removes the tag using removeClassifications
- REPLACE: replaces all tags on the asset
- None: uses existing logic based on replace_atlan_tags and append_atlan_tags flags

:param entity: one or more assets to save
:param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False)
:param replace_custom_metadata: replaces any custom metadata with non-empty values provided
Expand All @@ -441,6 +447,22 @@ def save(
:raises AtlanError: on any API communication issue
:raises ApiError: if a connection was created and blocking until policies are synced overruns the retry limit
"""
# Convert entity to list for consistent handling
entities: List[Asset] = []
if isinstance(entity, list):
entities.extend(entity)
else:
entities.append(entity)

# Check if any entity has tags with semantic
if Save.has_tags_with_semantic(entities):
return self._save_with_tag_semantic(
entities=entities,
replace_custom_metadata=replace_custom_metadata,
overwrite_custom_metadata=overwrite_custom_metadata,
)

# Use existing logic for backward compatibility
query_params, request = Save.prepare_request(
entity=entity,
replace_atlan_tags=replace_atlan_tags,
Expand All @@ -455,6 +477,54 @@ def save(
self._wait_for_connections_to_be_created(connections_created)
return response

def _save_with_tag_semantic(
self,
entities: List[Asset],
replace_custom_metadata: bool = False,
overwrite_custom_metadata: bool = False,
) -> AssetMutationResponse:
"""
Internal method to handle saving assets with tag semantic values.
Updates query params based on semantics and makes a single API call.

If entities have APPEND/REMOVE semantic tags → appendTags=True
If entities have REPLACE semantic tags → replaceTags=True
If both are present → both flags True → backend error (user must make separate calls)

:param entities: list of assets to save
:param replace_custom_metadata: replaces any custom metadata with non-empty values provided
:param overwrite_custom_metadata: overwrites any custom metadata, even with empty values
:returns: AssetMutationResponse from the API call
"""
# Determine which flags to set based on semantics present
has_append_remove, has_replace = Save.get_semantic_flags(entities)

# Process entities with APPEND/REMOVE semantic to set classification fields
for entity in entities:
Save.process_asset_for_append_remove_semantic(entity)

# Validate and flush custom metadata
Save.validate_and_flush_entities(entities, self._client) # type: ignore[arg-type]

# Build query params based on semantics
# If user mixes APPEND/REMOVE and REPLACE, both flags will be True → backend error
query_params = {
"replaceTags": has_replace,
"appendTags": has_append_remove,
"replaceBusinessAttributes": replace_custom_metadata,
"overwriteBusinessAttributes": overwrite_custom_metadata,
}

request = BulkRequest[Asset](entities=entities)
raw_json = self._client._call_api(BULK_UPDATE, query_params, request)
response = Save.process_response(raw_json)

# Handle connection waiting for any created connections
if connections_created := response.assets_created(Connection):
self._wait_for_connections_to_be_created(connections_created)

return response

def _wait_for_connections_to_be_created(self, connections_created):
guids = Save.get_connection_guids_to_wait_for(connections_created)

Expand Down
141 changes: 141 additions & 0 deletions pyatlan/client/common/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,147 @@ def process_response_replacing_cm(
"""
return AssetMutationResponse(**raw_json)

@staticmethod
def has_tags_with_semantic(entities: List[Asset]) -> bool:
"""
Check if any entity has atlan_tags with a semantic value set.

:param entities: list of assets to check
:returns: True if any entity has tags with semantic set
"""
for entity in entities:
if entity.atlan_tags:
for tag in entity.atlan_tags:
if tag.semantic is not None:
return True
return False

@staticmethod
def get_semantic_flags(entities: List[Asset]) -> tuple[bool, bool]:
"""
Determine which semantic flags should be set based on tags in entities.

:param entities: list of assets to check
:returns: tuple of (has_append_remove, has_replace)
"""
has_append_remove = False
has_replace = False

for entity in entities:
if not entity.atlan_tags:
continue
for tag in entity.atlan_tags:
if tag.semantic in (SaveSemantic.APPEND, SaveSemantic.REMOVE):
has_append_remove = True
elif tag.semantic == SaveSemantic.REPLACE:
has_replace = True

return has_append_remove, has_replace

@staticmethod
def process_asset_for_append_remove_semantic(entity: Asset) -> Asset:
"""
Process an asset with APPEND/REMOVE semantic tags.
Sets add_or_update_classifications for APPEND tags and
remove_classifications for REMOVE tags.
Keeps REPLACE and None semantic tags in atlan_tags.

:param entity: the asset to process
:returns: the processed asset
"""
if not entity.atlan_tags:
return entity

append_tags: List[AtlanTag] = []
remove_tags: List[AtlanTag] = []
remaining_tags: List[AtlanTag] = []

for tag in entity.atlan_tags:
if tag.semantic == SaveSemantic.APPEND:
append_tags.append(tag)
elif tag.semantic == SaveSemantic.REMOVE:
remove_tags.append(tag)
else:
# Keep REPLACE and None semantic tags in atlan_tags
remaining_tags.append(tag)

if append_tags:
entity.add_or_update_classifications = append_tags
if remove_tags:
entity.remove_classifications = remove_tags

# Keep remaining tags (REPLACE and None semantic) in atlan_tags
entity.atlan_tags = remaining_tags if remaining_tags else None

return entity

@staticmethod
def merge_responses(
responses: List[AssetMutationResponse],
) -> AssetMutationResponse:
"""
Merge multiple AssetMutationResponse objects into a single response.

:param responses: list of responses to merge
:returns: merged AssetMutationResponse
"""
from pyatlan.model.response import MutatedEntities

if not responses:
return AssetMutationResponse()

if len(responses) == 1:
return responses[0]

merged_guid_assignments: Dict[str, str] = {}
merged_created: List[Asset] = []
merged_updated: List[Asset] = []
merged_deleted: List[Asset] = []
merged_partial_updated: List[Asset] = []

for response in responses:
if response.guid_assignments:
merged_guid_assignments.update(response.guid_assignments)
if response.mutated_entities:
if response.mutated_entities.CREATE:
merged_created.extend(response.mutated_entities.CREATE)
if response.mutated_entities.UPDATE:
merged_updated.extend(response.mutated_entities.UPDATE)
if response.mutated_entities.DELETE:
merged_deleted.extend(response.mutated_entities.DELETE)
if response.mutated_entities.PARTIAL_UPDATE:
merged_partial_updated.extend(
response.mutated_entities.PARTIAL_UPDATE
)
if response.partial_updated_entities:
merged_partial_updated.extend(response.partial_updated_entities)

mutated_entities = MutatedEntities(
CREATE=merged_created if merged_created else None,
UPDATE=merged_updated if merged_updated else None,
DELETE=merged_deleted if merged_deleted else None,
)
# Set PARTIAL_UPDATE separately due to alias conflict in MutatedEntities
if merged_partial_updated:
mutated_entities.PARTIAL_UPDATE = merged_partial_updated

return AssetMutationResponse(
guid_assignments=merged_guid_assignments
if merged_guid_assignments
else None,
mutated_entities=mutated_entities
if (
merged_created
or merged_updated
or merged_deleted
or merged_partial_updated
)
else None,
partial_updated_entities=merged_partial_updated
if merged_partial_updated
else None,
)


class UpdateAsset:
@staticmethod
Expand Down
4 changes: 2 additions & 2 deletions pyatlan/model/aio/retranslators.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async def retranslate(self, data: Dict[str, Any]) -> Dict[str, Any]:

# Convert classification human-readable name → hash ID
for key in self._CLASSIFICATION_NAMES:
if key in data:
if key in data and data[key] is not None:
tag_ids = []
for name in data[key]:
tag_id = await self.client.atlan_tag_cache.get_id_for_name(
Expand All @@ -85,7 +85,7 @@ async def retranslate(self, data: Dict[str, Any]) -> Dict[str, Any]:

# Convert classification objects human-readable name typeName → hash ID
for key in self._CLASSIFICATION_KEYS:
if key in data:
if key in data and data[key] is not None:
for classification in data[key]:
tag_name = str(classification.get(self._TYPE_NAME))
if tag_name:
Expand Down
4 changes: 2 additions & 2 deletions pyatlan/model/aio/translators.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async def translate(self, data: Dict[str, Any]) -> Dict[str, Any]:

# Convert classification hash ID → human-readable name
for key in self._CLASSIFICATION_NAMES:
if key in raw_json:
if key in raw_json and raw_json[key] is not None:
tag_names = []
for tag_id in raw_json[key]:
tag_name = await self.client.atlan_tag_cache.get_name_for_id(tag_id)
Expand All @@ -86,7 +86,7 @@ async def translate(self, data: Dict[str, Any]) -> Dict[str, Any]:

# Convert classification objects typeName hash ID → human-readable name
for key in self._CLASSIFICATION_KEYS:
if key in raw_json:
if key in raw_json and raw_json[key] is not None:
for classification in raw_json[key]:
tag_id = classification.get(self._TYPE_NAME)
if tag_id:
Expand Down
Loading
Loading