diff --git a/pyatlan/client/aio/asset.py b/pyatlan/client/aio/asset.py index eb6c582a5..bef211ab0 100644 --- a/pyatlan/client/aio/asset.py +++ b/pyatlan/client/aio/asset.py @@ -343,6 +343,12 @@ async def save( """ Async save method - creates or updates assets based on qualified_name. + When using AtlanTag with semantic values: + - APPEND: adds/updates the tag using addOrUpdateClassifications + - REMOVE: removes the tag using removeClassifications + - REPLACE: replaces all tags on the asset + - None: uses existing logic based on replace_atlan_tags and append_atlan_tags flags + :param entity: one or more assets to save :param replace_atlan_tags: whether to replace AtlanTags during an update :param replace_custom_metadata: replaces any custom metadata with non-empty values provided @@ -352,7 +358,22 @@ async def save( :raises AtlanError: on any API communication issue :raises ApiError: if a connection was created and blocking until policies are synced overruns the retry limit """ + # Convert entity to list for consistent handling + entities: List[Asset] = [] + if isinstance(entity, list): + entities.extend(entity) + else: + entities.append(entity) + + # Check if any entity has tags with semantic + if Save.has_tags_with_semantic(entities): + return await self._save_with_tag_semantic( + entities=entities, + replace_custom_metadata=replace_custom_metadata, + overwrite_custom_metadata=overwrite_custom_metadata, + ) + # Use existing logic for backward compatibility query_params, request = await Save.prepare_request_async( entity=entity, replace_atlan_tags=replace_atlan_tags, @@ -367,6 +388,54 @@ async def save( await self._wait_for_connections_to_be_created(connections_created) return response + async def _save_with_tag_semantic( + self, + entities: List[Asset], + replace_custom_metadata: bool = False, + overwrite_custom_metadata: bool = False, + ) -> AssetMutationResponse: + """ + Internal async method to handle saving assets with tag semantic values. + Updates query params based on semantics and makes a single API call. + + If entities have APPEND/REMOVE semantic tags → appendTags=True + If entities have REPLACE semantic tags → replaceTags=True + If both are present → both flags True → backend error (user must make separate calls) + + :param entities: list of assets to save + :param replace_custom_metadata: replaces any custom metadata with non-empty values provided + :param overwrite_custom_metadata: overwrites any custom metadata, even with empty values + :returns: AssetMutationResponse from the API call + """ + # Determine which flags to set based on semantics present + has_append_remove, has_replace = Save.get_semantic_flags(entities) + + # Process entities with APPEND/REMOVE semantic to set classification fields + for entity in entities: + Save.process_asset_for_append_remove_semantic(entity) + + # Validate and flush custom metadata + await Save.validate_and_flush_entities_async(entities, self._client) # type: ignore[arg-type] + + # Build query params based on semantics + # If user mixes APPEND/REMOVE and REPLACE, both flags will be True → backend error + query_params = { + "replaceTags": has_replace, + "appendTags": has_append_remove, + "replaceBusinessAttributes": replace_custom_metadata, + "overwriteBusinessAttributes": overwrite_custom_metadata, + } + + request = BulkRequest[Asset](entities=entities) + raw_json = await self._client._call_api(BULK_UPDATE, query_params, request) + response = Save.process_response(raw_json) + + # Handle connection waiting for any created connections + if connections_created := response.assets_created(Connection): + await self._wait_for_connections_to_be_created(connections_created) + + return response + async def _wait_for_connections_to_be_created(self, connections_created): guids = Save.get_connection_guids_to_wait_for(connections_created) diff --git a/pyatlan/client/asset.py b/pyatlan/client/asset.py index cf7667e9a..e7cbf5f9d 100644 --- a/pyatlan/client/asset.py +++ b/pyatlan/client/asset.py @@ -99,7 +99,7 @@ Table, View, ) -from pyatlan.model.core import Announcement, AtlanObject, SearchRequest +from pyatlan.model.core import Announcement, AtlanObject, BulkRequest, SearchRequest from pyatlan.model.custom_metadata import CustomMetadataDict from pyatlan.model.enums import ( AssetCreationHandling, @@ -432,6 +432,12 @@ def save( If an asset does exist, opertionally overwrites any Atlan tags. Custom metadata will either be overwritten or merged depending on the options provided. + When using AtlanTag with semantic values: + - APPEND: adds/updates the tag using addOrUpdateClassifications + - REMOVE: removes the tag using removeClassifications + - REPLACE: replaces all tags on the asset + - None: uses existing logic based on replace_atlan_tags and append_atlan_tags flags + :param entity: one or more assets to save :param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False) :param replace_custom_metadata: replaces any custom metadata with non-empty values provided @@ -441,6 +447,22 @@ def save( :raises AtlanError: on any API communication issue :raises ApiError: if a connection was created and blocking until policies are synced overruns the retry limit """ + # Convert entity to list for consistent handling + entities: List[Asset] = [] + if isinstance(entity, list): + entities.extend(entity) + else: + entities.append(entity) + + # Check if any entity has tags with semantic + if Save.has_tags_with_semantic(entities): + return self._save_with_tag_semantic( + entities=entities, + replace_custom_metadata=replace_custom_metadata, + overwrite_custom_metadata=overwrite_custom_metadata, + ) + + # Use existing logic for backward compatibility query_params, request = Save.prepare_request( entity=entity, replace_atlan_tags=replace_atlan_tags, @@ -455,6 +477,54 @@ def save( self._wait_for_connections_to_be_created(connections_created) return response + def _save_with_tag_semantic( + self, + entities: List[Asset], + replace_custom_metadata: bool = False, + overwrite_custom_metadata: bool = False, + ) -> AssetMutationResponse: + """ + Internal method to handle saving assets with tag semantic values. + Updates query params based on semantics and makes a single API call. + + If entities have APPEND/REMOVE semantic tags → appendTags=True + If entities have REPLACE semantic tags → replaceTags=True + If both are present → both flags True → backend error (user must make separate calls) + + :param entities: list of assets to save + :param replace_custom_metadata: replaces any custom metadata with non-empty values provided + :param overwrite_custom_metadata: overwrites any custom metadata, even with empty values + :returns: AssetMutationResponse from the API call + """ + # Determine which flags to set based on semantics present + has_append_remove, has_replace = Save.get_semantic_flags(entities) + + # Process entities with APPEND/REMOVE semantic to set classification fields + for entity in entities: + Save.process_asset_for_append_remove_semantic(entity) + + # Validate and flush custom metadata + Save.validate_and_flush_entities(entities, self._client) # type: ignore[arg-type] + + # Build query params based on semantics + # If user mixes APPEND/REMOVE and REPLACE, both flags will be True → backend error + query_params = { + "replaceTags": has_replace, + "appendTags": has_append_remove, + "replaceBusinessAttributes": replace_custom_metadata, + "overwriteBusinessAttributes": overwrite_custom_metadata, + } + + request = BulkRequest[Asset](entities=entities) + raw_json = self._client._call_api(BULK_UPDATE, query_params, request) + response = Save.process_response(raw_json) + + # Handle connection waiting for any created connections + if connections_created := response.assets_created(Connection): + self._wait_for_connections_to_be_created(connections_created) + + return response + def _wait_for_connections_to_be_created(self, connections_created): guids = Save.get_connection_guids_to_wait_for(connections_created) diff --git a/pyatlan/client/common/asset.py b/pyatlan/client/common/asset.py index ca265a7e1..6ede77593 100644 --- a/pyatlan/client/common/asset.py +++ b/pyatlan/client/common/asset.py @@ -863,6 +863,147 @@ def process_response_replacing_cm( """ return AssetMutationResponse(**raw_json) + @staticmethod + def has_tags_with_semantic(entities: List[Asset]) -> bool: + """ + Check if any entity has atlan_tags with a semantic value set. + + :param entities: list of assets to check + :returns: True if any entity has tags with semantic set + """ + for entity in entities: + if entity.atlan_tags: + for tag in entity.atlan_tags: + if tag.semantic is not None: + return True + return False + + @staticmethod + def get_semantic_flags(entities: List[Asset]) -> tuple[bool, bool]: + """ + Determine which semantic flags should be set based on tags in entities. + + :param entities: list of assets to check + :returns: tuple of (has_append_remove, has_replace) + """ + has_append_remove = False + has_replace = False + + for entity in entities: + if not entity.atlan_tags: + continue + for tag in entity.atlan_tags: + if tag.semantic in (SaveSemantic.APPEND, SaveSemantic.REMOVE): + has_append_remove = True + elif tag.semantic == SaveSemantic.REPLACE: + has_replace = True + + return has_append_remove, has_replace + + @staticmethod + def process_asset_for_append_remove_semantic(entity: Asset) -> Asset: + """ + Process an asset with APPEND/REMOVE semantic tags. + Sets add_or_update_classifications for APPEND tags and + remove_classifications for REMOVE tags. + Keeps REPLACE and None semantic tags in atlan_tags. + + :param entity: the asset to process + :returns: the processed asset + """ + if not entity.atlan_tags: + return entity + + append_tags: List[AtlanTag] = [] + remove_tags: List[AtlanTag] = [] + remaining_tags: List[AtlanTag] = [] + + for tag in entity.atlan_tags: + if tag.semantic == SaveSemantic.APPEND: + append_tags.append(tag) + elif tag.semantic == SaveSemantic.REMOVE: + remove_tags.append(tag) + else: + # Keep REPLACE and None semantic tags in atlan_tags + remaining_tags.append(tag) + + if append_tags: + entity.add_or_update_classifications = append_tags + if remove_tags: + entity.remove_classifications = remove_tags + + # Keep remaining tags (REPLACE and None semantic) in atlan_tags + entity.atlan_tags = remaining_tags if remaining_tags else None + + return entity + + @staticmethod + def merge_responses( + responses: List[AssetMutationResponse], + ) -> AssetMutationResponse: + """ + Merge multiple AssetMutationResponse objects into a single response. + + :param responses: list of responses to merge + :returns: merged AssetMutationResponse + """ + from pyatlan.model.response import MutatedEntities + + if not responses: + return AssetMutationResponse() + + if len(responses) == 1: + return responses[0] + + merged_guid_assignments: Dict[str, str] = {} + merged_created: List[Asset] = [] + merged_updated: List[Asset] = [] + merged_deleted: List[Asset] = [] + merged_partial_updated: List[Asset] = [] + + for response in responses: + if response.guid_assignments: + merged_guid_assignments.update(response.guid_assignments) + if response.mutated_entities: + if response.mutated_entities.CREATE: + merged_created.extend(response.mutated_entities.CREATE) + if response.mutated_entities.UPDATE: + merged_updated.extend(response.mutated_entities.UPDATE) + if response.mutated_entities.DELETE: + merged_deleted.extend(response.mutated_entities.DELETE) + if response.mutated_entities.PARTIAL_UPDATE: + merged_partial_updated.extend( + response.mutated_entities.PARTIAL_UPDATE + ) + if response.partial_updated_entities: + merged_partial_updated.extend(response.partial_updated_entities) + + mutated_entities = MutatedEntities( + CREATE=merged_created if merged_created else None, + UPDATE=merged_updated if merged_updated else None, + DELETE=merged_deleted if merged_deleted else None, + ) + # Set PARTIAL_UPDATE separately due to alias conflict in MutatedEntities + if merged_partial_updated: + mutated_entities.PARTIAL_UPDATE = merged_partial_updated + + return AssetMutationResponse( + guid_assignments=merged_guid_assignments + if merged_guid_assignments + else None, + mutated_entities=mutated_entities + if ( + merged_created + or merged_updated + or merged_deleted + or merged_partial_updated + ) + else None, + partial_updated_entities=merged_partial_updated + if merged_partial_updated + else None, + ) + class UpdateAsset: @staticmethod diff --git a/pyatlan/model/aio/retranslators.py b/pyatlan/model/aio/retranslators.py index 40c4ed464..c314a6791 100644 --- a/pyatlan/model/aio/retranslators.py +++ b/pyatlan/model/aio/retranslators.py @@ -74,7 +74,7 @@ async def retranslate(self, data: Dict[str, Any]) -> Dict[str, Any]: # Convert classification human-readable name → hash ID for key in self._CLASSIFICATION_NAMES: - if key in data: + if key in data and data[key] is not None: tag_ids = [] for name in data[key]: tag_id = await self.client.atlan_tag_cache.get_id_for_name( @@ -85,7 +85,7 @@ async def retranslate(self, data: Dict[str, Any]) -> Dict[str, Any]: # Convert classification objects human-readable name typeName → hash ID for key in self._CLASSIFICATION_KEYS: - if key in data: + if key in data and data[key] is not None: for classification in data[key]: tag_name = str(classification.get(self._TYPE_NAME)) if tag_name: diff --git a/pyatlan/model/aio/translators.py b/pyatlan/model/aio/translators.py index 3b4ea4b68..cf576d5ae 100644 --- a/pyatlan/model/aio/translators.py +++ b/pyatlan/model/aio/translators.py @@ -77,7 +77,7 @@ async def translate(self, data: Dict[str, Any]) -> Dict[str, Any]: # Convert classification hash ID → human-readable name for key in self._CLASSIFICATION_NAMES: - if key in raw_json: + if key in raw_json and raw_json[key] is not None: tag_names = [] for tag_id in raw_json[key]: tag_name = await self.client.atlan_tag_cache.get_name_for_id(tag_id) @@ -86,7 +86,7 @@ async def translate(self, data: Dict[str, Any]) -> Dict[str, Any]: # Convert classification objects typeName hash ID → human-readable name for key in self._CLASSIFICATION_KEYS: - if key in raw_json: + if key in raw_json and raw_json[key] is not None: for classification in raw_json[key]: tag_id = classification.get(self._TYPE_NAME) if tag_id: diff --git a/pyatlan/model/core.py b/pyatlan/model/core.py index 0a4b89da0..7adb38cda 100644 --- a/pyatlan/model/core.py +++ b/pyatlan/model/core.py @@ -326,6 +326,15 @@ class Config: attributes: Optional[Dict[str, Any]] = None tag_id: Optional[str] = Field(default=None, exclude=True) + semantic: Optional[SaveSemantic] = Field( + default=None, + exclude=True, + description=( + "Semantic for how this Atlan tag should be saved. " + "Use APPEND to add/update tags, REMOVE to remove tags, " + "or REPLACE to replace all tags on the asset." + ), + ) @classmethod def of( @@ -334,6 +343,7 @@ def of( entity_guid: Optional[str] = None, source_tag_attachment: Optional[SourceTagAttachment] = None, client: Optional[AtlanClient] = None, + semantic: Optional[SaveSemantic] = None, ) -> AtlanTag: """ Construct an Atlan tag assignment for a specific entity. @@ -342,6 +352,7 @@ def of( :param entity_guid: unique identifier (GUID) of the entity to which the Atlan tag is to be assigned :param source_tag_attachment: (optional) source-specific details for the tag :param client: (optional) client instance used for translating source-specific details + :param semantic: (optional) semantic for how this tag should be saved (APPEND, REMOVE, or REPLACE) :return: an Atlan tag assignment with default settings for propagation and a specific entity assignment :raises InvalidRequestError: if client is not provided and source_tag_attachment is specified """ @@ -358,6 +369,8 @@ def of( ) tag.attributes = {source_tag_attr_id: [source_tag_attachment]} # type: ignore[dict-item] tag.source_tag_attachments.append(source_tag_attachment) + if semantic: + tag.semantic = semantic return tag @classmethod @@ -367,6 +380,7 @@ async def of_async( entity_guid: Optional[str] = None, source_tag_attachment: Optional[SourceTagAttachment] = None, client: Optional[AsyncAtlanClient] = None, + semantic: Optional[SaveSemantic] = None, ) -> AtlanTag: """ Async version of AtlanTag.of() for use with AsyncAtlanClient. @@ -377,6 +391,7 @@ async def of_async( :param entity_guid: unique identifier (GUID) of the entity to which the Atlan tag is to be assigned :param source_tag_attachment: (optional) source-specific details for the tag :param client: (optional) async client instance used for translating source-specific details + :param semantic: (optional) semantic for how this tag should be saved (APPEND, REMOVE, or REPLACE) :return: an Atlan tag assignment with default settings for propagation and a specific entity assignment :raises InvalidRequestError: if client is not provided and source_tag_attachment is specified """ @@ -393,6 +408,8 @@ async def of_async( ) tag.attributes = {source_tag_attr_id: [source_tag_attachment]} # type: ignore[dict-item] tag.source_tag_attachments.append(source_tag_attachment) + if semantic: + tag.semantic = semantic return tag diff --git a/pyatlan/model/retranslators.py b/pyatlan/model/retranslators.py index f7787b89e..4fcc4ad14 100644 --- a/pyatlan/model/retranslators.py +++ b/pyatlan/model/retranslators.py @@ -70,7 +70,7 @@ def retranslate(self, data: Dict[str, Any]) -> Dict[str, Any]: # Convert classification human-readable name → hash ID for key in self._CLASSIFICATION_NAMES: - if key in data: + if key in data and data[key] is not None: data[key] = [ self.client.atlan_tag_cache.get_id_for_name(str(name)) or DELETED_ for name in data[key] @@ -78,7 +78,7 @@ def retranslate(self, data: Dict[str, Any]) -> Dict[str, Any]: # Convert classification objects human-readable name typeName → hash ID for key in self._CLASSIFICATION_KEYS: - if key in data: + if key in data and data[key] is not None: for classification in data[key]: tag_name = str(classification.get(self._TYPE_NAME)) if tag_name: diff --git a/pyatlan/model/translators.py b/pyatlan/model/translators.py index 1583b25bd..d17975dfb 100644 --- a/pyatlan/model/translators.py +++ b/pyatlan/model/translators.py @@ -74,7 +74,7 @@ def translate(self, data: Dict[str, Any]) -> Dict[str, Any]: # Convert classification hash ID → human-readable name for key in self._CLASSIFICATION_NAMES: - if key in raw_json: + if key in raw_json and raw_json[key] is not None: raw_json[key] = [ self.client.atlan_tag_cache.get_name_for_id(tag_id) or DELETED_ for tag_id in raw_json[key] @@ -82,7 +82,7 @@ def translate(self, data: Dict[str, Any]) -> Dict[str, Any]: # Convert classification objects typeName hash ID → human-readable name for key in self._CLASSIFICATION_KEYS: - if key in raw_json: + if key in raw_json and raw_json[key] is not None: for classification in raw_json[key]: tag_id = classification.get(self._TYPE_NAME) if tag_id: diff --git a/tests/integration/aio/test_atlan_tag_semantic.py b/tests/integration/aio/test_atlan_tag_semantic.py new file mode 100644 index 000000000..812ce4a1a --- /dev/null +++ b/tests/integration/aio/test_atlan_tag_semantic.py @@ -0,0 +1,532 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2026 Atlan Pte. Ltd. +import asyncio +import logging +from typing import AsyncGenerator, List, NamedTuple, Set + +import pytest +import pytest_asyncio + +from pyatlan.client.aio import AsyncAtlanClient +from pyatlan.errors import InvalidRequestError +from pyatlan.model.assets import Table +from pyatlan.model.core import AtlanTag, AtlanTagName +from pyatlan.model.enums import SaveSemantic +from pyatlan.model.fluent_search import FluentSearch + +# Table names to search for (in production Snowflake connection) +TABLE1_NAME = "BUYINGGROUPS" +TABLE2_NAME = "CITIES_ARCHIVE" + +# Tag names to use in tests +TAG_ISSUE = "Issue" +TAG_CONFIDENTIAL = "Confidential" + +LOGGER = logging.getLogger(__name__) + + +class TableInfo(NamedTuple): + """Container for table information.""" + + name: str + qualified_name: str + + +async def find_table_by_name_async( + client: AsyncAtlanClient, table_name: str, connector_type: str = "snowflake" +) -> TableInfo: + """ + Find a table by name using FluentSearch, filtering by connector type. + + :param client: AsyncAtlanClient instance + :param table_name: name of the table to find + :param connector_type: connector type to filter by (default: snowflake) + :returns: TableInfo with name and qualified_name + :raises ValueError: if table not found + """ + results = await ( + FluentSearch() + .where(FluentSearch.asset_type(Table)) + .where(Table.NAME.eq(table_name)) + .where(Table.CONNECTOR_NAME.eq(connector_type)) + .include_on_results(Table.NAME) + .include_on_results(Table.QUALIFIED_NAME) + .page_size(10) + ).execute_async(client) + + tables = [asset async for asset in results] + if not tables: + raise ValueError( + f"Table '{table_name}' not found in tenant " + f"(connector_type={connector_type})" + ) + + # If multiple tables found, log them and use the first one + if len(tables) > 1: + LOGGER.warning( + f"Multiple {connector_type} tables found with name '{table_name}'. " + f"Using first one. All qualified names: " + f"{[t.qualified_name for t in tables]}" + ) + + table = tables[0] + if not table.qualified_name: + raise ValueError(f"Table '{table_name}' has no qualified_name") + + LOGGER.info( + f"Found {connector_type} table '{table_name}' " + f"with qualified_name: {table.qualified_name}" + ) + return TableInfo(name=table_name, qualified_name=table.qualified_name) + + +async def get_current_tags_async( + client: AsyncAtlanClient, qualified_name: str +) -> Set[str]: + """Helper to retrieve current tags on an asset.""" + asset = await client.asset.get_by_qualified_name( + qualified_name=qualified_name, + asset_type=Table, + ) + return {str(t.type_name) for t in (asset.atlan_tags or [])} + + +async def remove_tags_from_asset_async( + client: AsyncAtlanClient, qualified_name: str, name: str, tag_names: List[str] +) -> None: + """Helper to remove specific tags from an asset.""" + if not tag_names: + return + table = Table.updater(qualified_name=qualified_name, name=name) + table.atlan_tags = [ + AtlanTag.of(atlan_tag_name=AtlanTagName(tag), semantic=SaveSemantic.REMOVE) + for tag in tag_names + ] + try: + await client.asset.save(entity=table) + except Exception as e: + LOGGER.debug(f"Tag removal (may be expected if tag not present): {e}") + + +async def remove_all_tags_from_asset_async( + client: AsyncAtlanClient, qualified_name: str, name: str +) -> None: + """Helper to remove ALL tags from an asset (complete cleanup).""" + try: + current_tags = await get_current_tags_async(client, qualified_name) + if current_tags: + LOGGER.info(f"Removing all tags from {name}: {current_tags}") + await remove_tags_from_asset_async( + client, qualified_name, name, list(current_tags) + ) + except Exception as e: + LOGGER.debug(f"Complete tag removal (may be expected): {e}") + + +@pytest_asyncio.fixture(scope="module") +async def table1(client: AsyncAtlanClient) -> TableInfo: + """Fixture to find TABLE1 (BUYINGGROUPS) dynamically.""" + return await find_table_by_name_async(client, TABLE1_NAME) + + +@pytest_asyncio.fixture(scope="module") +async def table2(client: AsyncAtlanClient) -> TableInfo: + """Fixture to find TABLE2 (CITIES_ARCHIVE) dynamically.""" + return await find_table_by_name_async(client, TABLE2_NAME) + + +@pytest_asyncio.fixture(scope="module", autouse=True) +async def complete_cleanup_before_tests( + client: AsyncAtlanClient, table1: TableInfo, table2: TableInfo +) -> AsyncGenerator[None, None]: + """ + Module-scoped fixture to do a COMPLETE cleanup of ALL tags before any tests run. + This ensures a clean slate at the start of the test module. + """ + LOGGER.info("Performing complete cleanup of ALL tags before tests...") + + # Remove ALL tags from both tables + for table_info in [table1, table2]: + await remove_all_tags_from_asset_async( + client, + table_info.qualified_name, + table_info.name, + ) + await asyncio.sleep(3) + + # Verify cleanup was successful + for table_info in [table1, table2]: + tags = await get_current_tags_async(client, table_info.qualified_name) + LOGGER.info(f"After complete cleanup - {table_info.name} tags: {tags}") + + yield + + # Final cleanup after all tests + LOGGER.info("Final cleanup after all tests...") + for table_info in [table1, table2]: + await remove_all_tags_from_asset_async( + client, + table_info.qualified_name, + table_info.name, + ) + + +@pytest_asyncio.fixture(autouse=True) +async def cleanup_tags( + client: AsyncAtlanClient, table1: TableInfo, table2: TableInfo +) -> AsyncGenerator[None, None]: + """ + Fixture to ensure test tags are removed before and after each test. + This maintains a clean state for testing. + """ + # Cleanup before test - remove our test tags + for table_info in [table1, table2]: + await remove_tags_from_asset_async( + client, + table_info.qualified_name, + table_info.name, + [TAG_ISSUE, TAG_CONFIDENTIAL], + ) + await asyncio.sleep(2) + + yield + + # Cleanup after test - remove our test tags + for table_info in [table1, table2]: + await remove_tags_from_asset_async( + client, + table_info.qualified_name, + table_info.name, + [TAG_ISSUE, TAG_CONFIDENTIAL], + ) + + +@pytest.mark.order(1) +async def test_append_single_tag( + client: AsyncAtlanClient, + table1: TableInfo, +) -> None: + """ + Test APPEND semantic - Add a single tag. + Expected: Tag is added to the asset. + """ + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + ] + + response = await client.asset.save(entity=table) + await asyncio.sleep(3) + + # Verify response and tags + assert response is not None + assert response.mutated_entities is not None + tags = await get_current_tags_async(client, table1.qualified_name) + assert TAG_ISSUE in tags + LOGGER.info(f"APPEND single tag test passed. Tags: {tags}") + + +@pytest.mark.order(after="test_append_single_tag") +async def test_append_multiple_tags( + client: AsyncAtlanClient, + table1: TableInfo, +) -> None: + """ + Test APPEND semantic - Add multiple tags in one request. + Expected: All tags are added to the asset. + """ + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.APPEND, + ), + ] + + response = await client.asset.save(entity=table) + await asyncio.sleep(3) + + # Verify response and tags + assert response is not None + tags = await get_current_tags_async(client, table1.qualified_name) + assert TAG_ISSUE in tags + assert TAG_CONFIDENTIAL in tags + LOGGER.info(f"APPEND multiple tags test passed. Tags: {tags}") + + +@pytest.mark.order(after="test_append_multiple_tags") +async def test_remove_tag_after_append( + client: AsyncAtlanClient, + table1: TableInfo, +) -> None: + """ + Test REMOVE semantic - Add a tag, then remove it. + Expected: Tag is removed from the asset. + """ + # First add a tag + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + ] + await client.asset.save(entity=table) + await asyncio.sleep(3) + + # Verify tag was added + tags_after_add = await get_current_tags_async(client, table1.qualified_name) + assert TAG_ISSUE in tags_after_add, "Setup failed: tag not added" + + # Remove the tag + table_remove = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table_remove.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.REMOVE, + ), + ] + response = await client.asset.save(entity=table_remove) + await asyncio.sleep(3) + + # Verify response and tags + assert response is not None + tags = await get_current_tags_async(client, table1.qualified_name) + assert TAG_ISSUE not in tags + LOGGER.info(f"REMOVE tag test passed. Tags: {tags}") + + +@pytest.mark.order(after="test_remove_tag_after_append") +async def test_replace_all_tags( + client: AsyncAtlanClient, + table2: TableInfo, +) -> None: + """ + Test REPLACE semantic - Replace all tags with a new one. + Expected: Only the new tag remains on the asset. + """ + # First add Issue tag + table = Table.updater(qualified_name=table2.qualified_name, name=table2.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + ] + await client.asset.save(entity=table) + await asyncio.sleep(3) + + # Verify tag was added + tags_after_add = await get_current_tags_async(client, table2.qualified_name) + assert TAG_ISSUE in tags_after_add, "Setup failed: tag not added" + + # Replace with Confidential + table_replace = Table.updater( + qualified_name=table2.qualified_name, name=table2.name + ) + table_replace.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.REPLACE, + ), + ] + response = await client.asset.save(entity=table_replace) + await asyncio.sleep(3) + + # Verify response and tags + assert response is not None + tags = await get_current_tags_async(client, table2.qualified_name) + assert TAG_CONFIDENTIAL in tags + assert TAG_ISSUE not in tags + LOGGER.info(f"REPLACE tags test passed. Tags: {tags}") + + +@pytest.mark.order(after="test_replace_all_tags") +async def test_append_and_remove_combined( + client: AsyncAtlanClient, + table1: TableInfo, +) -> None: + """ + Test mixed semantics - APPEND and REMOVE in same request. + Expected: Issue is added, Confidential is removed. + """ + # First add Confidential tag + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.APPEND, + ), + ] + await client.asset.save(entity=table) + await asyncio.sleep(3) + + # Verify setup + tags_after_add = await get_current_tags_async(client, table1.qualified_name) + assert TAG_CONFIDENTIAL in tags_after_add, ( + "Setup failed: Confidential tag not added" + ) + + # APPEND Issue and REMOVE Confidential in same request + table_mixed = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table_mixed.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.REMOVE, + ), + ] + response = await client.asset.save(entity=table_mixed) + await asyncio.sleep(3) + + # Verify response and tags + assert response is not None + tags = await get_current_tags_async(client, table1.qualified_name) + assert TAG_ISSUE in tags + assert TAG_CONFIDENTIAL not in tags + LOGGER.info(f"Mixed APPEND/REMOVE test passed. Tags: {tags}") + + +@pytest.mark.order(after="test_append_and_remove_combined") +async def test_bulk_save_with_mixed_semantics_raises_error( + client: AsyncAtlanClient, + table1: TableInfo, + table2: TableInfo, +) -> None: + """ + Test bulk save with APPEND/REMOVE and REPLACE semantics across different assets raises error. + - Table1: APPEND/REMOVE semantics + - Table2: REPLACE semantic + Expected: InvalidRequestError because SDK sets both appendTags and replaceTags to True. + + Note: To save entities with different semantics, use separate save() calls. + """ + t1 = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + t1.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + ] + + t2 = Table.updater(qualified_name=table2.qualified_name, name=table2.name) + t2.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.REPLACE, + ), + ] + + # This should raise an error because mixing APPEND/REMOVE and REPLACE + # in the same save() call sets both appendTags=True and replaceTags=True + with pytest.raises(InvalidRequestError) as exc_info: + await client.asset.save(entity=[t1, t2]) + + assert "Only one of" in str(exc_info.value) or "replaceTags" in str(exc_info.value) + LOGGER.info( + f"Expected error raised for bulk save with mixed semantics: {exc_info.value}" + ) + + +@pytest.mark.order(after="test_bulk_save_with_mixed_semantics_raises_error") +async def test_mixed_semantics_on_same_asset_raises_error( + client: AsyncAtlanClient, + table1: TableInfo, +) -> None: + """ + Test APPEND and REPLACE semantics on the same asset raises an error. + SDK sets both appendTags=True and replaceTags=True which backend rejects. + Expected: InvalidRequestError from server. + """ + # Apply both APPEND and REPLACE on the same asset + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.REPLACE, + ), + ] + + # This WILL raise "Only one of [replaceClassifications, replaceTags, appendTags]" error + # because SDK sets both appendTags=True and replaceTags=True + with pytest.raises(InvalidRequestError) as exc_info: + await client.asset.save(entity=table) + + assert "Only one of" in str(exc_info.value) or "replaceTags" in str(exc_info.value) + LOGGER.info( + f"Expected error raised for mixed APPEND+REPLACE on same asset: {exc_info.value}" + ) + + +@pytest.mark.order(after="test_mixed_semantics_on_same_asset_raises_error") +async def test_all_three_semantics_raises_error( + client: AsyncAtlanClient, + table1: TableInfo, +) -> None: + """ + Test that mixing all three semantics (APPEND, REPLACE, None) raises an error. + SDK sets both appendTags=True and replaceTags=True which backend rejects. + """ + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.REPLACE, + ), + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + # No semantic + ), + ] + + # This should raise an error because both appendTags and replaceTags will be True + with pytest.raises(InvalidRequestError) as exc_info: + await client.asset.save(entity=table) + + assert "Only one of" in str(exc_info.value) or "replaceTags" in str(exc_info.value) + LOGGER.info(f"Expected error raised for mixed semantics: {exc_info.value}") + + +@pytest.mark.order(after="test_all_three_semantics_raises_error") +async def test_backward_compatibility_no_semantic( + client: AsyncAtlanClient, + table1: TableInfo, +) -> None: + """ + Test backward compatibility - tags with no semantic (None) work as before. + Expected: Tag is added using existing behavior. + """ + # Use AtlanTag directly without semantic (backward compatible) + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE) + ), # No semantic - backward compatible + ] + + response = await client.asset.save(entity=table, replace_atlan_tags=True) + await asyncio.sleep(3) + + # Verify response and tags + assert response is not None + tags = await get_current_tags_async(client, table1.qualified_name) + assert TAG_ISSUE in tags + LOGGER.info(f"Backward compatibility test passed. Tags: {tags}") diff --git a/tests/integration/test_atlan_tag_semantic.py b/tests/integration/test_atlan_tag_semantic.py new file mode 100644 index 000000000..b2a9e4562 --- /dev/null +++ b/tests/integration/test_atlan_tag_semantic.py @@ -0,0 +1,527 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2026 Atlan Pte. Ltd. +import logging +import time +from typing import Generator, List, NamedTuple, Set + +import pytest + +from pyatlan.client.atlan import AtlanClient +from pyatlan.errors import InvalidRequestError +from pyatlan.model.assets import Table +from pyatlan.model.core import AtlanTag, AtlanTagName +from pyatlan.model.enums import SaveSemantic +from pyatlan.model.fluent_search import FluentSearch + +# Table names to search for (in production Snowflake connection) +TABLE1_NAME = "BUYINGGROUPS" +TABLE2_NAME = "CITIES_ARCHIVE" + +# Tag names to use in tests +TAG_ISSUE = "Issue" +TAG_CONFIDENTIAL = "Confidential" + +LOGGER = logging.getLogger(__name__) + + +class TableInfo(NamedTuple): + """Container for table information.""" + + name: str + qualified_name: str + + +def find_table_by_name( + client: AtlanClient, table_name: str, connector_type: str = "snowflake" +) -> TableInfo: + """ + Find a table by name using FluentSearch, filtering by connector type. + + :param client: AtlanClient instance + :param table_name: name of the table to find + :param connector_type: connector type to filter by (default: snowflake) + :returns: TableInfo with name and qualified_name + :raises ValueError: if table not found + """ + results = ( + FluentSearch() + .where(FluentSearch.asset_type(Table)) + .where(Table.NAME.eq(table_name)) + .where(Table.CONNECTOR_NAME.eq(connector_type)) + .include_on_results(Table.NAME) + .include_on_results(Table.QUALIFIED_NAME) + .page_size(10) + ).execute(client) + + tables = list(results) + if not tables: + raise ValueError( + f"Table '{table_name}' not found in tenant " + f"(connector_type={connector_type})" + ) + + # If multiple tables found, log them and use the first one + if len(tables) > 1: + LOGGER.warning( + f"Multiple {connector_type} tables found with name '{table_name}'. " + f"Using first one. All qualified names: " + f"{[t.qualified_name for t in tables]}" + ) + + table = tables[0] + if not table.qualified_name: + raise ValueError(f"Table '{table_name}' has no qualified_name") + + LOGGER.info( + f"Found {connector_type} table '{table_name}' " + f"with qualified_name: {table.qualified_name}" + ) + return TableInfo(name=table_name, qualified_name=table.qualified_name) + + +def get_current_tags(client: AtlanClient, qualified_name: str) -> Set[str]: + """Helper to retrieve current tags on an asset.""" + asset = client.asset.get_by_qualified_name( + qualified_name=qualified_name, + asset_type=Table, + ) + return {str(t.type_name) for t in (asset.atlan_tags or [])} + + +def remove_tags_from_asset( + client: AtlanClient, qualified_name: str, name: str, tag_names: List[str] +) -> None: + """Helper to remove specific tags from an asset.""" + if not tag_names: + return + table = Table.updater(qualified_name=qualified_name, name=name) + table.atlan_tags = [ + AtlanTag.of(atlan_tag_name=AtlanTagName(tag), semantic=SaveSemantic.REMOVE) + for tag in tag_names + ] + try: + client.asset.save(entity=table) + except Exception as e: + LOGGER.debug(f"Tag removal (may be expected if tag not present): {e}") + + +def remove_all_tags_from_asset( + client: AtlanClient, qualified_name: str, name: str +) -> None: + """Helper to remove ALL tags from an asset (complete cleanup).""" + try: + current_tags = get_current_tags(client, qualified_name) + if current_tags: + LOGGER.info(f"Removing all tags from {name}: {current_tags}") + remove_tags_from_asset(client, qualified_name, name, list(current_tags)) + except Exception as e: + LOGGER.debug(f"Complete tag removal (may be expected): {e}") + + +@pytest.fixture(scope="module") +def table1(client: AtlanClient) -> TableInfo: + """Fixture to find TABLE1 (BUYINGGROUPS) dynamically.""" + return find_table_by_name(client, TABLE1_NAME) + + +@pytest.fixture(scope="module") +def table2(client: AtlanClient) -> TableInfo: + """Fixture to find TABLE2 (CITIES_ARCHIVE) dynamically.""" + return find_table_by_name(client, TABLE2_NAME) + + +@pytest.fixture(scope="module", autouse=True) +def complete_cleanup_before_tests( + client: AtlanClient, table1: TableInfo, table2: TableInfo +) -> Generator[None, None, None]: + """ + Module-scoped fixture to do a COMPLETE cleanup of ALL tags before any tests run. + This ensures a clean slate at the start of the test module. + """ + LOGGER.info("Performing complete cleanup of ALL tags before tests...") + + # Remove ALL tags from both tables + for table_info in [table1, table2]: + remove_all_tags_from_asset( + client, + table_info.qualified_name, + table_info.name, + ) + time.sleep(3) + + # Verify cleanup was successful + for table_info in [table1, table2]: + tags = get_current_tags(client, table_info.qualified_name) + LOGGER.info(f"After complete cleanup - {table_info.name} tags: {tags}") + + yield + + # Final cleanup after all tests + LOGGER.info("Final cleanup after all tests...") + for table_info in [table1, table2]: + remove_all_tags_from_asset( + client, + table_info.qualified_name, + table_info.name, + ) + + +@pytest.fixture(autouse=True) +def cleanup_tags( + client: AtlanClient, table1: TableInfo, table2: TableInfo +) -> Generator[None, None, None]: + """ + Fixture to ensure test tags are removed before and after each test. + This maintains a clean state for testing. + """ + # Cleanup before test - remove our test tags + for table_info in [table1, table2]: + remove_tags_from_asset( + client, + table_info.qualified_name, + table_info.name, + [TAG_ISSUE, TAG_CONFIDENTIAL], + ) + time.sleep(2) + + yield + + # Cleanup after test - remove our test tags + for table_info in [table1, table2]: + remove_tags_from_asset( + client, + table_info.qualified_name, + table_info.name, + [TAG_ISSUE, TAG_CONFIDENTIAL], + ) + + +@pytest.mark.order(1) +def test_append_single_tag( + client: AtlanClient, + table1: TableInfo, +) -> None: + """ + Test APPEND semantic - Add a single tag. + Expected: Tag is added to the asset. + """ + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + ] + + response = client.asset.save(entity=table) + time.sleep(3) + + # Verify response and tags + assert response is not None + assert response.mutated_entities is not None + tags = get_current_tags(client, table1.qualified_name) + assert TAG_ISSUE in tags + LOGGER.info(f"APPEND single tag test passed. Tags: {tags}") + + +@pytest.mark.order(after="test_append_single_tag") +def test_append_multiple_tags( + client: AtlanClient, + table1: TableInfo, +) -> None: + """ + Test APPEND semantic - Add multiple tags in one request. + Expected: All tags are added to the asset. + """ + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.APPEND, + ), + ] + + response = client.asset.save(entity=table) + time.sleep(3) + + # Verify response and tags + assert response is not None + tags = get_current_tags(client, table1.qualified_name) + assert TAG_ISSUE in tags + assert TAG_CONFIDENTIAL in tags + LOGGER.info(f"APPEND multiple tags test passed. Tags: {tags}") + + +@pytest.mark.order(after="test_append_multiple_tags") +def test_remove_tag_after_append( + client: AtlanClient, + table1: TableInfo, +) -> None: + """ + Test REMOVE semantic - Add a tag, then remove it. + Expected: Tag is removed from the asset. + """ + # First add a tag + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + ] + client.asset.save(entity=table) + time.sleep(3) + + # Verify tag was added + tags_after_add = get_current_tags(client, table1.qualified_name) + assert TAG_ISSUE in tags_after_add, "Setup failed: tag not added" + + # Remove the tag + table_remove = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table_remove.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.REMOVE, + ), + ] + response = client.asset.save(entity=table_remove) + time.sleep(3) + + # Verify response and tags + assert response is not None + tags = get_current_tags(client, table1.qualified_name) + assert TAG_ISSUE not in tags + LOGGER.info(f"REMOVE tag test passed. Tags: {tags}") + + +@pytest.mark.order(after="test_remove_tag_after_append") +def test_replace_all_tags( + client: AtlanClient, + table2: TableInfo, +) -> None: + """ + Test REPLACE semantic - Replace all tags with a new one. + Expected: Only the new tag remains on the asset. + """ + # First add Issue tag + table = Table.updater(qualified_name=table2.qualified_name, name=table2.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + ] + client.asset.save(entity=table) + time.sleep(3) + + # Verify tag was added + tags_after_add = get_current_tags(client, table2.qualified_name) + assert TAG_ISSUE in tags_after_add, "Setup failed: tag not added" + + # Replace with Confidential + table_replace = Table.updater( + qualified_name=table2.qualified_name, name=table2.name + ) + table_replace.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.REPLACE, + ), + ] + response = client.asset.save(entity=table_replace) + time.sleep(3) + + # Verify response and tags + assert response is not None + tags = get_current_tags(client, table2.qualified_name) + assert TAG_CONFIDENTIAL in tags + assert TAG_ISSUE not in tags + LOGGER.info(f"REPLACE tags test passed. Tags: {tags}") + + +@pytest.mark.order(after="test_replace_all_tags") +def test_append_and_remove_combined( + client: AtlanClient, + table1: TableInfo, +) -> None: + """ + Test mixed semantics - APPEND and REMOVE in same request. + Expected: Issue is added, Confidential is removed. + """ + # First add Confidential tag + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.APPEND, + ), + ] + client.asset.save(entity=table) + time.sleep(3) + + # Verify setup + tags_after_add = get_current_tags(client, table1.qualified_name) + assert TAG_CONFIDENTIAL in tags_after_add, ( + "Setup failed: Confidential tag not added" + ) + + # APPEND Issue and REMOVE Confidential in same request + table_mixed = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table_mixed.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.REMOVE, + ), + ] + response = client.asset.save(entity=table_mixed) + time.sleep(3) + + # Verify response and tags + assert response is not None + tags = get_current_tags(client, table1.qualified_name) + assert TAG_ISSUE in tags + assert TAG_CONFIDENTIAL not in tags + LOGGER.info(f"Mixed APPEND/REMOVE test passed. Tags: {tags}") + + +@pytest.mark.order(after="test_append_and_remove_combined") +def test_bulk_save_with_mixed_semantics_raises_error( + client: AtlanClient, + table1: TableInfo, + table2: TableInfo, +) -> None: + """ + Test bulk save with APPEND/REMOVE and REPLACE semantics across different assets raises error. + - Table1: APPEND/REMOVE semantics + - Table2: REPLACE semantic + Expected: InvalidRequestError because SDK sets both appendTags and replaceTags to True. + + Note: To save entities with different semantics, use separate save() calls. + """ + t1 = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + t1.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + ] + + t2 = Table.updater(qualified_name=table2.qualified_name, name=table2.name) + t2.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.REPLACE, + ), + ] + + # This should raise an error because mixing APPEND/REMOVE and REPLACE + # in the same save() call sets both appendTags=True and replaceTags=True + with pytest.raises(InvalidRequestError) as exc_info: + client.asset.save(entity=[t1, t2]) + + assert "Only one of" in str(exc_info.value) or "replaceTags" in str(exc_info.value) + LOGGER.info( + f"Expected error raised for bulk save with mixed semantics: {exc_info.value}" + ) + + +@pytest.mark.order(after="test_bulk_save_with_mixed_semantics_raises_error") +def test_mixed_semantics_on_same_asset_raises_error( + client: AtlanClient, + table1: TableInfo, +) -> None: + """ + Test APPEND and REPLACE semantics on the same asset raises an error. + SDK sets both appendTags=True and replaceTags=True which backend rejects. + Expected: InvalidRequestError from server. + """ + # Apply both APPEND and REPLACE on the same asset + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.REPLACE, + ), + ] + + # This WILL raise "Only one of [replaceClassifications, replaceTags, appendTags]" error + # because SDK sets both appendTags=True and replaceTags=True + with pytest.raises(InvalidRequestError) as exc_info: + client.asset.save(entity=table) + + assert "Only one of" in str(exc_info.value) or "replaceTags" in str(exc_info.value) + LOGGER.info( + f"Expected error raised for mixed APPEND+REPLACE on same asset: {exc_info.value}" + ) + + +@pytest.mark.order(after="test_mixed_semantics_on_same_asset_raises_error") +def test_all_three_semantics_raises_error( + client: AtlanClient, + table1: TableInfo, +) -> None: + """ + Test that mixing all three semantics (APPEND, REPLACE, None) raises an error. + SDK sets both appendTags=True and replaceTags=True which backend rejects. + """ + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.REPLACE, + ), + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + # No semantic + ), + ] + + # This should raise an error because both appendTags and replaceTags will be True + with pytest.raises(InvalidRequestError) as exc_info: + client.asset.save(entity=table) + + assert "Only one of" in str(exc_info.value) or "replaceTags" in str(exc_info.value) + LOGGER.info(f"Expected error raised for mixed semantics: {exc_info.value}") + + +@pytest.mark.order(after="test_all_three_semantics_raises_error") +def test_backward_compatibility_no_semantic( + client: AtlanClient, + table1: TableInfo, +) -> None: + """ + Test backward compatibility - tags with no semantic (None) work as before. + Expected: Tag is added using existing behavior. + """ + # Use AtlanTag directly without semantic (backward compatible) + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE) + ), # No semantic - backward compatible + ] + + response = client.asset.save(entity=table, replace_atlan_tags=True) + time.sleep(3) + + # Verify response and tags + assert response is not None + tags = get_current_tags(client, table1.qualified_name) + assert TAG_ISSUE in tags + LOGGER.info(f"Backward compatibility test passed. Tags: {tags}") diff --git a/tests/integration/test_save_semantic.py b/tests/integration/test_save_semantic.py new file mode 100644 index 000000000..12d16eb31 --- /dev/null +++ b/tests/integration/test_save_semantic.py @@ -0,0 +1,497 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2026 Atlan Pte. Ltd. + +""" +Integration tests for SaveSemantic feature for in-bulk asset tag management. + +These tests verify APPEND, REMOVE, and REPLACE semantics work correctly +for managing Atlan tags on assets. +""" + +import logging +import time +from typing import Generator, List, NamedTuple, Set + +import pytest + +from pyatlan.client.atlan import AtlanClient +from pyatlan.model.assets import Table +from pyatlan.model.core import AtlanTag, AtlanTagName +from pyatlan.model.enums import SaveSemantic +from pyatlan.model.fluent_search import FluentSearch + +# Table names to search for (in production Snowflake connection) +TABLE1_NAME = "BUYINGGROUPS" +TABLE2_NAME = "CITIES_ARCHIVE" + +# Tag names to use in tests +TAG_ISSUE = "Issue" +TAG_CONFIDENTIAL = "Confidential" + +LOGGER = logging.getLogger(__name__) + + +class TableInfo(NamedTuple): + """Container for table information.""" + + name: str + qualified_name: str + + +def find_table_by_name( + client: AtlanClient, table_name: str, connector_type: str = "snowflake" +) -> TableInfo: + """ + Find a table by name using FluentSearch, filtering by connector type. + + :param client: AtlanClient instance + :param table_name: name of the table to find + :param connector_type: connector type to filter by (default: snowflake) + :returns: TableInfo with name and qualified_name + :raises ValueError: if table not found + """ + results = ( + FluentSearch() + .where(FluentSearch.asset_type(Table)) + .where(Table.NAME.eq(table_name)) + .where(Table.QUALIFIED_NAME.wildcard(f"*/{connector_type}/*")) + .include_on_results(Table.NAME) + .include_on_results(Table.QUALIFIED_NAME) + .page_size(10) + ).execute(client) + + tables = list(results) + if not tables: + raise ValueError( + f"Table '{table_name}' not found in tenant " + f"(connector_type={connector_type})" + ) + + # If multiple tables found, log them and use the first one + if len(tables) > 1: + LOGGER.warning( + f"Multiple {connector_type} tables found with name '{table_name}'. " + f"Using first one. All qualified names: " + f"{[t.qualified_name for t in tables]}" + ) + + table = tables[0] + if not table.qualified_name: + raise ValueError(f"Table '{table_name}' has no qualified_name") + + LOGGER.info( + f"Found {connector_type} table '{table_name}' " + f"with qualified_name: {table.qualified_name}" + ) + return TableInfo(name=table_name, qualified_name=table.qualified_name) + + +def get_current_tags(client: AtlanClient, qualified_name: str) -> Set[str]: + """Helper to retrieve current tags on an asset.""" + asset = client.asset.get_by_qualified_name( + qualified_name=qualified_name, + asset_type=Table, + ) + return {str(t.type_name) for t in (asset.atlan_tags or [])} + + +def remove_tags_from_asset( + client: AtlanClient, qualified_name: str, name: str, tag_names: List[str] +) -> None: + """Helper to remove specific tags from an asset.""" + if not tag_names: + return + table = Table.updater(qualified_name=qualified_name, name=name) + table.atlan_tags = [ + AtlanTag.of(atlan_tag_name=AtlanTagName(tag), semantic=SaveSemantic.REMOVE) + for tag in tag_names + ] + try: + client.asset.save(entity=table) + except Exception as e: + LOGGER.debug(f"Tag removal (may be expected if tag not present): {e}") + + +def remove_all_tags_from_asset( + client: AtlanClient, qualified_name: str, name: str +) -> None: + """Helper to remove ALL tags from an asset (complete cleanup).""" + try: + current_tags = get_current_tags(client, qualified_name) + if current_tags: + LOGGER.info(f"Removing all tags from {name}: {current_tags}") + remove_tags_from_asset(client, qualified_name, name, list(current_tags)) + except Exception as e: + LOGGER.debug(f"Complete tag removal (may be expected): {e}") + + +@pytest.fixture(scope="module") +def table1(client: AtlanClient) -> TableInfo: + """Fixture to find TABLE1 (BUYINGGROUPS) dynamically.""" + return find_table_by_name(client, TABLE1_NAME) + + +@pytest.fixture(scope="module") +def table2(client: AtlanClient) -> TableInfo: + """Fixture to find TABLE2 (CITIES_ARCHIVE) dynamically.""" + return find_table_by_name(client, TABLE2_NAME) + + +@pytest.fixture(scope="module", autouse=True) +def complete_cleanup_before_tests( + client: AtlanClient, table1: TableInfo, table2: TableInfo +) -> Generator[None, None, None]: + """ + Module-scoped fixture to do a COMPLETE cleanup of ALL tags before any tests run. + This ensures a clean slate at the start of the test module. + """ + LOGGER.info("Performing complete cleanup of ALL tags before tests...") + + # Remove ALL tags from both tables + for table_info in [table1, table2]: + remove_all_tags_from_asset( + client, + table_info.qualified_name, + table_info.name, + ) + time.sleep(3) + + # Verify cleanup was successful + for table_info in [table1, table2]: + tags = get_current_tags(client, table_info.qualified_name) + LOGGER.info(f"After complete cleanup - {table_info.name} tags: {tags}") + + yield + + # Final cleanup after all tests + LOGGER.info("Final cleanup after all tests...") + for table_info in [table1, table2]: + remove_all_tags_from_asset( + client, + table_info.qualified_name, + table_info.name, + ) + + +@pytest.fixture(autouse=True) +def cleanup_tags( + client: AtlanClient, table1: TableInfo, table2: TableInfo +) -> Generator[None, None, None]: + """ + Fixture to ensure test tags are removed before and after each test. + This maintains a clean state for testing. + """ + # Cleanup before test - remove our test tags + for table_info in [table1, table2]: + remove_tags_from_asset( + client, + table_info.qualified_name, + table_info.name, + [TAG_ISSUE, TAG_CONFIDENTIAL], + ) + time.sleep(2) + + yield + + # Cleanup after test - remove our test tags + for table_info in [table1, table2]: + remove_tags_from_asset( + client, + table_info.qualified_name, + table_info.name, + [TAG_ISSUE, TAG_CONFIDENTIAL], + ) + + +@pytest.mark.order(1) +def test_append_single_tag( + client: AtlanClient, + table1: TableInfo, +) -> None: + """ + Test APPEND semantic - Add a single tag. + Expected: Tag is added to the asset. + """ + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + ] + + response = client.asset.save(entity=table) + time.sleep(3) + + # Verify response and tags + assert response is not None + assert response.mutated_entities is not None + tags = get_current_tags(client, table1.qualified_name) + assert TAG_ISSUE in tags + LOGGER.info(f"APPEND single tag test passed. Tags: {tags}") + + +@pytest.mark.order(after="test_append_single_tag") +def test_append_multiple_tags( + client: AtlanClient, + table1: TableInfo, +) -> None: + """ + Test APPEND semantic - Add multiple tags in one request. + Expected: All tags are added to the asset. + """ + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.APPEND, + ), + ] + + response = client.asset.save(entity=table) + time.sleep(3) + + # Verify response and tags + assert response is not None + tags = get_current_tags(client, table1.qualified_name) + assert TAG_ISSUE in tags + assert TAG_CONFIDENTIAL in tags + LOGGER.info(f"APPEND multiple tags test passed. Tags: {tags}") + + +@pytest.mark.order(after="test_append_multiple_tags") +def test_remove_tag_after_append( + client: AtlanClient, + table1: TableInfo, +) -> None: + """ + Test REMOVE semantic - Add a tag, then remove it. + Expected: Tag is removed from the asset. + """ + # First add a tag + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + ] + client.asset.save(entity=table) + time.sleep(3) + + # Verify tag was added + tags_after_add = get_current_tags(client, table1.qualified_name) + assert TAG_ISSUE in tags_after_add, "Setup failed: tag not added" + + # Remove the tag + table_remove = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table_remove.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.REMOVE, + ), + ] + response = client.asset.save(entity=table_remove) + time.sleep(3) + + # Verify response and tags + assert response is not None + tags = get_current_tags(client, table1.qualified_name) + assert TAG_ISSUE not in tags + LOGGER.info(f"REMOVE tag test passed. Tags: {tags}") + + +@pytest.mark.order(after="test_remove_tag_after_append") +def test_replace_all_tags( + client: AtlanClient, + table2: TableInfo, +) -> None: + """ + Test REPLACE semantic - Replace all tags with a new one. + Expected: Only the new tag remains on the asset. + """ + # First add Issue tag + table = Table.updater(qualified_name=table2.qualified_name, name=table2.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + ] + client.asset.save(entity=table) + time.sleep(3) + + # Verify tag was added + tags_after_add = get_current_tags(client, table2.qualified_name) + assert TAG_ISSUE in tags_after_add, "Setup failed: tag not added" + + # Replace with Confidential + table_replace = Table.updater( + qualified_name=table2.qualified_name, name=table2.name + ) + table_replace.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.REPLACE, + ), + ] + response = client.asset.save(entity=table_replace) + time.sleep(3) + + # Verify response and tags + assert response is not None + tags = get_current_tags(client, table2.qualified_name) + assert TAG_CONFIDENTIAL in tags + assert TAG_ISSUE not in tags + LOGGER.info(f"REPLACE tags test passed. Tags: {tags}") + + +@pytest.mark.order(after="test_replace_all_tags") +def test_append_and_remove_combined( + client: AtlanClient, + table1: TableInfo, +) -> None: + """ + Test mixed semantics - APPEND and REMOVE in same request. + Expected: Issue is added, Confidential is removed. + """ + # First add Confidential tag + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.APPEND, + ), + ] + client.asset.save(entity=table) + time.sleep(3) + + # Verify setup + tags_after_add = get_current_tags(client, table1.qualified_name) + assert TAG_CONFIDENTIAL in tags_after_add, ( + "Setup failed: Confidential tag not added" + ) + + # APPEND Issue and REMOVE Confidential in same request + table_mixed = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table_mixed.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.REMOVE, + ), + ] + response = client.asset.save(entity=table_mixed) + time.sleep(3) + + # Verify response and tags + assert response is not None + tags = get_current_tags(client, table1.qualified_name) + assert TAG_ISSUE in tags + assert TAG_CONFIDENTIAL not in tags + LOGGER.info(f"Mixed APPEND/REMOVE test passed. Tags: {tags}") + + +@pytest.mark.order(after="test_append_and_remove_combined") +def test_bulk_save_with_mixed_semantics( + client: AtlanClient, + table1: TableInfo, + table2: TableInfo, +) -> None: + """ + Test bulk save with APPEND and REPLACE semantics across different assets. + - Table1: APPEND Issue and Confidential + - Table2: REPLACE with Issue only + Expected: Both tables updated correctly based on their semantics. + """ + # First add Confidential to Table2 to test REPLACE + table2_setup = Table.updater(qualified_name=table2.qualified_name, name=table2.name) + table2_setup.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.APPEND, + ), + ] + client.asset.save(entity=table2_setup) + time.sleep(3) + + # Verify setup + tags_table2_setup = get_current_tags(client, table2.qualified_name) + assert TAG_CONFIDENTIAL in tags_table2_setup, ( + "Setup failed: Confidential not on Table2" + ) + + # Bulk save with different semantics + t1 = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + t1.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.APPEND, + ), + ] + + t2 = Table.updater(qualified_name=table2.qualified_name, name=table2.name) + t2.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.REPLACE, + ), + ] + + response = client.asset.save(entity=[t1, t2]) + time.sleep(3) + + # Verify response + assert response is not None + + # Check Table1 - should have both tags (APPEND) + tags1 = get_current_tags(client, table1.qualified_name) + assert TAG_ISSUE in tags1, f"Table1: Expected '{TAG_ISSUE}' in tags" + assert TAG_CONFIDENTIAL in tags1, f"Table1: Expected '{TAG_CONFIDENTIAL}' in tags" + + # Check Table2 - should only have Issue (REPLACE removed Confidential) + tags2 = get_current_tags(client, table2.qualified_name) + assert TAG_ISSUE in tags2, f"Table2: Expected '{TAG_ISSUE}' in tags" + assert TAG_CONFIDENTIAL not in tags2, ( + f"Table2: REPLACE should have removed '{TAG_CONFIDENTIAL}'" + ) + + LOGGER.info(f"Bulk save test passed. Table1 tags: {tags1}, Table2 tags: {tags2}") + + +@pytest.mark.order(after="test_bulk_save_with_mixed_semantics") +def test_backward_compatibility_no_semantic( + client: AtlanClient, + table1: TableInfo, +) -> None: + """ + Test backward compatibility - tags with no semantic (None) work as before. + Expected: Tag is added using existing behavior. + """ + # Use AtlanTag directly without semantic (backward compatible) + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE) + ), # No semantic - backward compatible + ] + + response = client.asset.save(entity=table, replace_atlan_tags=True) + time.sleep(3) + + # Verify response and tags + assert response is not None + tags = get_current_tags(client, table1.qualified_name) + assert TAG_ISSUE in tags + LOGGER.info(f"Backward compatibility test passed. Tags: {tags}") diff --git a/tests/unit/aio/test_atlan_tag_semantic.py b/tests/unit/aio/test_atlan_tag_semantic.py new file mode 100644 index 000000000..552bbf09a --- /dev/null +++ b/tests/unit/aio/test_atlan_tag_semantic.py @@ -0,0 +1,429 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2026 Atlan Pte. Ltd. +""" +Async unit tests for AtlanTag semantic functionality. +Mirrors tests/unit/test_atlan_tag_semantic.py for the async client. +""" + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from pyatlan.client.aio.asset import AsyncAssetClient +from pyatlan.client.common import AsyncApiCaller +from pyatlan.client.common.asset import Save +from pyatlan.model.assets import Table +from pyatlan.model.core import AtlanTag, AtlanTagName +from pyatlan.model.enums import SaveSemantic + + +def test_atlan_tag_semantic_defaults_to_none(): + """Verify AtlanTag.semantic defaults to None for backward compatibility.""" + tag = AtlanTag(type_name=AtlanTagName("TestTag")) + assert tag.semantic is None + + +def test_atlan_tag_of_with_semantic(): + """Verify AtlanTag.of() factory method accepts semantic parameter.""" + tag_append = AtlanTag.of( + atlan_tag_name=AtlanTagName("Tag1"), + semantic=SaveSemantic.APPEND, + ) + tag_remove = AtlanTag.of( + atlan_tag_name=AtlanTagName("Tag2"), + semantic=SaveSemantic.REMOVE, + ) + tag_replace = AtlanTag.of( + atlan_tag_name=AtlanTagName("Tag3"), + semantic=SaveSemantic.REPLACE, + ) + + assert tag_append.semantic == SaveSemantic.APPEND + assert tag_remove.semantic == SaveSemantic.REMOVE + assert tag_replace.semantic == SaveSemantic.REPLACE + + +def test_atlan_tag_semantic_excluded_from_json(): + """Verify semantic field is excluded from JSON (not sent to API).""" + tag = AtlanTag(type_name=AtlanTagName("TestTag"), semantic=SaveSemantic.APPEND) + json_dict = tag.dict(by_alias=True, exclude_none=True) + assert "semantic" not in json_dict + + +def test_has_tags_with_semantic_detection(): + """Verify has_tags_with_semantic correctly detects semantic vs non-semantic tags.""" + # Entity with APPEND semantic - should return True + table_append = Table() + table_append.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND) + ] + assert Save.has_tags_with_semantic([table_append]) is True + + # Entity with no semantic (None) - should return False + table_none = Table() + table_none.atlan_tags = [AtlanTag(type_name=AtlanTagName("Tag2"))] + assert Save.has_tags_with_semantic([table_none]) is False + + # Entity with no tags - should return False + table_empty = Table() + table_empty.atlan_tags = None + assert Save.has_tags_with_semantic([table_empty]) is False + + +def test_get_semantic_flags(): + """Verify get_semantic_flags correctly identifies APPEND/REMOVE and REPLACE semantics.""" + # APPEND only + table_append = Table() + table_append.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND) + ] + has_append_remove, has_replace = Save.get_semantic_flags([table_append]) + assert has_append_remove is True + assert has_replace is False + + # REPLACE only + table_replace = Table() + table_replace.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.REPLACE) + ] + has_append_remove, has_replace = Save.get_semantic_flags([table_replace]) + assert has_append_remove is False + assert has_replace is True + + # Both APPEND and REPLACE + table_mixed = Table() + table_mixed.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND), + AtlanTag(type_name=AtlanTagName("Tag2"), semantic=SaveSemantic.REPLACE), + ] + has_append_remove, has_replace = Save.get_semantic_flags([table_mixed]) + assert has_append_remove is True + assert has_replace is True + + # No semantic + table_none = Table() + table_none.atlan_tags = [AtlanTag(type_name=AtlanTagName("Tag1"))] + has_append_remove, has_replace = Save.get_semantic_flags([table_none]) + assert has_append_remove is False + assert has_replace is False + + +def test_process_asset_append_remove_semantic(): + """Verify APPEND tags go to add_or_update_classifications, REMOVE to remove_classifications.""" + table = Table() + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("AppendTag"), semantic=SaveSemantic.APPEND), + AtlanTag(type_name=AtlanTagName("RemoveTag"), semantic=SaveSemantic.REMOVE), + ] + + Save.process_asset_for_append_remove_semantic(table) + + # atlan_tags should be cleared (no REPLACE or None semantic tags to keep) + assert table.atlan_tags is None + # APPEND tag should be in add_or_update_classifications + assert table.add_or_update_classifications is not None + assert len(table.add_or_update_classifications) == 1 + # REMOVE tag should be in remove_classifications + assert table.remove_classifications is not None + assert len(table.remove_classifications) == 1 + + +def test_process_asset_keeps_replace_tags(): + """Verify REPLACE tags remain in atlan_tags after processing.""" + table = Table() + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("AppendTag"), semantic=SaveSemantic.APPEND), + AtlanTag(type_name=AtlanTagName("ReplaceTag"), semantic=SaveSemantic.REPLACE), + ] + + Save.process_asset_for_append_remove_semantic(table) + + # REPLACE tag should remain in atlan_tags + assert table.atlan_tags is not None + assert len(table.atlan_tags) == 1 + assert table.atlan_tags[0].semantic == SaveSemantic.REPLACE + # APPEND tag should be in add_or_update_classifications + assert table.add_or_update_classifications is not None + assert len(table.add_or_update_classifications) == 1 + + +def test_process_asset_keeps_none_semantic_tags(): + """Verify None semantic tags remain in atlan_tags after processing.""" + table = Table() + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("AppendTag"), semantic=SaveSemantic.APPEND), + AtlanTag(type_name=AtlanTagName("NoneTag")), # None semantic + ] + + Save.process_asset_for_append_remove_semantic(table) + + # None semantic tag should remain in atlan_tags + assert table.atlan_tags is not None + assert len(table.atlan_tags) == 1 + assert table.atlan_tags[0].semantic is None + # APPEND tag should be in add_or_update_classifications + assert table.add_or_update_classifications is not None + assert len(table.add_or_update_classifications) == 1 + + +def test_process_asset_only_replace_tags_unchanged(): + """Verify entity with only REPLACE tags keeps all tags in atlan_tags.""" + table = Table() + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("ReplaceTag"), semantic=SaveSemantic.REPLACE), + ] + + Save.process_asset_for_append_remove_semantic(table) + + # REPLACE tag should remain in atlan_tags + assert table.atlan_tags is not None + assert len(table.atlan_tags) == 1 + # No classification fields should be set + assert table.add_or_update_classifications is None + assert table.remove_classifications is None + + +def test_merge_responses(): + """Verify merge_responses correctly combines multiple AssetMutationResponse objects.""" + from pyatlan.model.response import AssetMutationResponse, MutatedEntities + + table1 = Table() + table1.guid = "guid1" + table2 = Table() + table2.guid = "guid2" + + response1 = AssetMutationResponse( + guid_assignments={"temp1": "real1"}, + mutated_entities=MutatedEntities(CREATE=[table1]), + ) + response2 = AssetMutationResponse( + guid_assignments={"temp2": "real2"}, + mutated_entities=MutatedEntities(UPDATE=[table2]), + ) + + result = Save.merge_responses([response1, response2]) + + assert result is not None + assert result.guid_assignments == {"temp1": "real1", "temp2": "real2"} + assert result.mutated_entities is not None + assert result.mutated_entities.CREATE is not None + assert result.mutated_entities.UPDATE is not None + assert len(result.mutated_entities.CREATE) == 1 + assert len(result.mutated_entities.UPDATE) == 1 + + +def _create_mock_response(): + """Helper to create a mock API response JSON.""" + return { + "guidAssignments": {"temp": "real"}, + "mutatedEntities": { + "CREATE": [], + "UPDATE": [{"typeName": "Table", "guid": "test-guid"}], + }, + } + + +@pytest.fixture +def mock_async_asset_client(): + """Create a mock AsyncAssetClient for testing API call counts.""" + mock_api_caller = MagicMock(spec=AsyncApiCaller) + mock_api_caller._call_api = AsyncMock(return_value=_create_mock_response()) + + # Create AsyncAssetClient with mocked api caller + client = AsyncAssetClient.__new__(AsyncAssetClient) + object.__setattr__(client, "_client", mock_api_caller) + + return client, mock_api_caller._call_api + + +async def test_api_call_count_single_append(mock_async_asset_client): + """Verify single APPEND semantic results in 1 API call.""" + client, mock_call_api = mock_async_asset_client + + table = Table() + table.qualified_name = "test/table" + table.name = "test_table" + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND) + ] + + await client.save(entity=table) + + assert mock_call_api.call_count == 1 + + +async def test_api_call_count_single_remove(mock_async_asset_client): + """Verify single REMOVE semantic results in 1 API call.""" + client, mock_call_api = mock_async_asset_client + + table = Table() + table.qualified_name = "test/table" + table.name = "test_table" + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.REMOVE) + ] + + await client.save(entity=table) + + assert mock_call_api.call_count == 1 + + +async def test_api_call_count_single_replace(mock_async_asset_client): + """Verify single REPLACE semantic results in 1 API call.""" + client, mock_call_api = mock_async_asset_client + + table = Table() + table.qualified_name = "test/table" + table.name = "test_table" + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.REPLACE) + ] + + await client.save(entity=table) + + assert mock_call_api.call_count == 1 + + +async def test_api_call_count_append_remove_combined(mock_async_asset_client): + """Verify APPEND and REMOVE on same entity results in 1 API call.""" + client, mock_call_api = mock_async_asset_client + + table = Table() + table.qualified_name = "test/table" + table.name = "test_table" + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND), + AtlanTag(type_name=AtlanTagName("Tag2"), semantic=SaveSemantic.REMOVE), + ] + + await client.save(entity=table) + + # APPEND and REMOVE go in same call (appendTags=True) + assert mock_call_api.call_count == 1 + + +async def test_api_call_count_append_replace_different_entities( + mock_async_asset_client, +): + """Verify APPEND and REPLACE on different entities results in 1 API call (both flags set).""" + client, mock_call_api = mock_async_asset_client + + table1 = Table() + table1.qualified_name = "test/table1" + table1.name = "test_table1" + table1.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND) + ] + + table2 = Table() + table2.qualified_name = "test/table2" + table2.name = "test_table2" + table2.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag2"), semantic=SaveSemantic.REPLACE) + ] + + await client.save(entity=[table1, table2]) + + # SDK makes single API call with both flags set (backend may return error) + assert mock_call_api.call_count == 1 + + +async def test_api_call_count_no_semantic(mock_async_asset_client): + """Verify no semantic (None) results in 1 API call via existing path.""" + client, mock_call_api = mock_async_asset_client + + table = Table() + table.qualified_name = "test/table" + table.name = "test_table" + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1")) # No semantic + ] + + await client.save(entity=table) + + assert mock_call_api.call_count == 1 + + +async def test_api_call_count_all_three_semantics(mock_async_asset_client): + """Verify APPEND, REPLACE, and None semantics results in 1 API call.""" + client, mock_call_api = mock_async_asset_client + + table_append = Table() + table_append.qualified_name = "test/table_append" + table_append.name = "table_append" + table_append.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND) + ] + + table_replace = Table() + table_replace.qualified_name = "test/table_replace" + table_replace.name = "table_replace" + table_replace.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag2"), semantic=SaveSemantic.REPLACE) + ] + + table_none = Table() + table_none.qualified_name = "test/table_none" + table_none.name = "table_none" + table_none.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag3")) # No semantic + ] + + await client.save(entity=[table_append, table_replace, table_none]) + + # SDK makes single API call (backend may return error if conflicting flags) + assert mock_call_api.call_count == 1 + + +async def test_api_call_count_multiple_append_batched(mock_async_asset_client): + """Verify multiple APPEND entities are batched into 1 API call.""" + client, mock_call_api = mock_async_asset_client + + tables = [] + for i in range(5): + table = Table() + table.qualified_name = f"test/table_{i}" + table.name = f"table_{i}" + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName(f"Tag{i}"), semantic=SaveSemantic.APPEND) + ] + tables.append(table) + + await client.save(entity=tables) + + # All APPEND entities batched into single API call + assert mock_call_api.call_count == 1 + + +async def test_api_call_count_mixed_on_same_entity(mock_async_asset_client): + """Verify entity with APPEND+REPLACE+REMOVE results in 1 API call (both flags set).""" + client, mock_call_api = mock_async_asset_client + + table = Table() + table.qualified_name = "test/table" + table.name = "test_table" + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND), + AtlanTag(type_name=AtlanTagName("Tag2"), semantic=SaveSemantic.REPLACE), + AtlanTag(type_name=AtlanTagName("Tag3"), semantic=SaveSemantic.REMOVE), + ] + + await client.save(entity=table) + + # SDK makes single API call with both appendTags and replaceTags set + assert mock_call_api.call_count == 1 + + +async def test_api_call_count_asset_without_tags(mock_async_asset_client): + """Verify asset without tags results in 1 API call (standard save).""" + client, mock_call_api = mock_async_asset_client + + table = Table() + table.qualified_name = "test/table" + table.name = "test_table" + table.atlan_tags = None # No tags at all + + await client.save(entity=table) + + # Standard save path = 1 API call + assert mock_call_api.call_count == 1 diff --git a/tests/unit/test_atlan_tag_semantic.py b/tests/unit/test_atlan_tag_semantic.py new file mode 100644 index 000000000..89096ba40 --- /dev/null +++ b/tests/unit/test_atlan_tag_semantic.py @@ -0,0 +1,423 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2026 Atlan Pte. Ltd. +from unittest.mock import MagicMock + +import pytest + +from pyatlan.client.common.asset import Save +from pyatlan.model.assets import Table +from pyatlan.model.core import AtlanTag, AtlanTagName +from pyatlan.model.enums import SaveSemantic + + +def test_atlan_tag_semantic_defaults_to_none(): + """Verify AtlanTag.semantic defaults to None for backward compatibility.""" + tag = AtlanTag(type_name=AtlanTagName("TestTag")) + assert tag.semantic is None + + +def test_atlan_tag_of_with_semantic(): + """Verify AtlanTag.of() factory method accepts semantic parameter.""" + tag_append = AtlanTag.of( + atlan_tag_name=AtlanTagName("Tag1"), + semantic=SaveSemantic.APPEND, + ) + tag_remove = AtlanTag.of( + atlan_tag_name=AtlanTagName("Tag2"), + semantic=SaveSemantic.REMOVE, + ) + tag_replace = AtlanTag.of( + atlan_tag_name=AtlanTagName("Tag3"), + semantic=SaveSemantic.REPLACE, + ) + + assert tag_append.semantic == SaveSemantic.APPEND + assert tag_remove.semantic == SaveSemantic.REMOVE + assert tag_replace.semantic == SaveSemantic.REPLACE + + +def test_atlan_tag_semantic_excluded_from_json(): + """Verify semantic field is excluded from JSON (not sent to API).""" + tag = AtlanTag(type_name=AtlanTagName("TestTag"), semantic=SaveSemantic.APPEND) + json_dict = tag.dict(by_alias=True, exclude_none=True) + assert "semantic" not in json_dict + + +def test_has_tags_with_semantic_detection(): + """Verify has_tags_with_semantic correctly detects semantic vs non-semantic tags.""" + # Entity with APPEND semantic - should return True + table_append = Table() + table_append.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND) + ] + assert Save.has_tags_with_semantic([table_append]) is True + + # Entity with no semantic (None) - should return False + table_none = Table() + table_none.atlan_tags = [AtlanTag(type_name=AtlanTagName("Tag2"))] + assert Save.has_tags_with_semantic([table_none]) is False + + # Entity with no tags - should return False + table_empty = Table() + table_empty.atlan_tags = None + assert Save.has_tags_with_semantic([table_empty]) is False + + +def test_get_semantic_flags(): + """Verify get_semantic_flags correctly identifies APPEND/REMOVE and REPLACE semantics.""" + # APPEND only + table_append = Table() + table_append.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND) + ] + has_append_remove, has_replace = Save.get_semantic_flags([table_append]) + assert has_append_remove is True + assert has_replace is False + + # REPLACE only + table_replace = Table() + table_replace.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.REPLACE) + ] + has_append_remove, has_replace = Save.get_semantic_flags([table_replace]) + assert has_append_remove is False + assert has_replace is True + + # Both APPEND and REPLACE + table_mixed = Table() + table_mixed.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND), + AtlanTag(type_name=AtlanTagName("Tag2"), semantic=SaveSemantic.REPLACE), + ] + has_append_remove, has_replace = Save.get_semantic_flags([table_mixed]) + assert has_append_remove is True + assert has_replace is True + + # No semantic + table_none = Table() + table_none.atlan_tags = [AtlanTag(type_name=AtlanTagName("Tag1"))] + has_append_remove, has_replace = Save.get_semantic_flags([table_none]) + assert has_append_remove is False + assert has_replace is False + + +def test_process_asset_append_remove_semantic(): + """Verify APPEND tags go to add_or_update_classifications, REMOVE to remove_classifications.""" + table = Table() + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("AppendTag"), semantic=SaveSemantic.APPEND), + AtlanTag(type_name=AtlanTagName("RemoveTag"), semantic=SaveSemantic.REMOVE), + ] + + Save.process_asset_for_append_remove_semantic(table) + + # atlan_tags should be cleared (no REPLACE or None semantic tags to keep) + assert table.atlan_tags is None + # APPEND tag should be in add_or_update_classifications + assert table.add_or_update_classifications is not None + assert len(table.add_or_update_classifications) == 1 + # REMOVE tag should be in remove_classifications + assert table.remove_classifications is not None + assert len(table.remove_classifications) == 1 + + +def test_process_asset_keeps_replace_tags(): + """Verify REPLACE tags remain in atlan_tags after processing.""" + table = Table() + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("AppendTag"), semantic=SaveSemantic.APPEND), + AtlanTag(type_name=AtlanTagName("ReplaceTag"), semantic=SaveSemantic.REPLACE), + ] + + Save.process_asset_for_append_remove_semantic(table) + + # REPLACE tag should remain in atlan_tags + assert table.atlan_tags is not None + assert len(table.atlan_tags) == 1 + assert table.atlan_tags[0].semantic == SaveSemantic.REPLACE + # APPEND tag should be in add_or_update_classifications + assert table.add_or_update_classifications is not None + assert len(table.add_or_update_classifications) == 1 + + +def test_process_asset_keeps_none_semantic_tags(): + """Verify None semantic tags remain in atlan_tags after processing.""" + table = Table() + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("AppendTag"), semantic=SaveSemantic.APPEND), + AtlanTag(type_name=AtlanTagName("NoneTag")), # None semantic + ] + + Save.process_asset_for_append_remove_semantic(table) + + # None semantic tag should remain in atlan_tags + assert table.atlan_tags is not None + assert len(table.atlan_tags) == 1 + assert table.atlan_tags[0].semantic is None + # APPEND tag should be in add_or_update_classifications + assert table.add_or_update_classifications is not None + assert len(table.add_or_update_classifications) == 1 + + +def test_process_asset_only_replace_tags_unchanged(): + """Verify entity with only REPLACE tags keeps all tags in atlan_tags.""" + table = Table() + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("ReplaceTag"), semantic=SaveSemantic.REPLACE), + ] + + Save.process_asset_for_append_remove_semantic(table) + + # REPLACE tag should remain in atlan_tags + assert table.atlan_tags is not None + assert len(table.atlan_tags) == 1 + # No classification fields should be set + assert table.add_or_update_classifications is None + assert table.remove_classifications is None + + +def test_merge_responses(): + """Verify merge_responses correctly combines multiple AssetMutationResponse objects.""" + from pyatlan.model.response import AssetMutationResponse, MutatedEntities + + table1 = Table() + table1.guid = "guid1" + table2 = Table() + table2.guid = "guid2" + + response1 = AssetMutationResponse( + guid_assignments={"temp1": "real1"}, + mutated_entities=MutatedEntities(CREATE=[table1]), + ) + response2 = AssetMutationResponse( + guid_assignments={"temp2": "real2"}, + mutated_entities=MutatedEntities(UPDATE=[table2]), + ) + + result = Save.merge_responses([response1, response2]) + + assert result is not None + assert result.guid_assignments == {"temp1": "real1", "temp2": "real2"} + assert result.mutated_entities is not None + assert result.mutated_entities.CREATE is not None + assert result.mutated_entities.UPDATE is not None + assert len(result.mutated_entities.CREATE) == 1 + assert len(result.mutated_entities.UPDATE) == 1 + + +def _create_mock_response(): + """Helper to create a mock API response JSON.""" + return { + "guidAssignments": {"temp": "real"}, + "mutatedEntities": { + "CREATE": [], + "UPDATE": [{"typeName": "Table", "guid": "test-guid"}], + }, + } + + +@pytest.fixture +def mock_asset_client(): + """Create a mock AssetClient for testing API call counts.""" + from pyatlan.client.asset import AssetClient + from pyatlan.client.common import ApiCaller + + mock_api_caller = MagicMock(spec=ApiCaller) + mock_api_caller._call_api = MagicMock(return_value=_create_mock_response()) + + # Create AssetClient with mocked api caller + client = AssetClient.__new__(AssetClient) + object.__setattr__(client, "_client", mock_api_caller) + + return client, mock_api_caller._call_api + + +def test_api_call_count_single_append(mock_asset_client): + """Verify single APPEND semantic results in 1 API call.""" + client, mock_call_api = mock_asset_client + + table = Table() + table.qualified_name = "test/table" + table.name = "test_table" + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND) + ] + + client.save(entity=table) + + assert mock_call_api.call_count == 1 + + +def test_api_call_count_single_remove(mock_asset_client): + """Verify single REMOVE semantic results in 1 API call.""" + client, mock_call_api = mock_asset_client + + table = Table() + table.qualified_name = "test/table" + table.name = "test_table" + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.REMOVE) + ] + + client.save(entity=table) + + assert mock_call_api.call_count == 1 + + +def test_api_call_count_single_replace(mock_asset_client): + """Verify single REPLACE semantic results in 1 API call.""" + client, mock_call_api = mock_asset_client + + table = Table() + table.qualified_name = "test/table" + table.name = "test_table" + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.REPLACE) + ] + + client.save(entity=table) + + assert mock_call_api.call_count == 1 + + +def test_api_call_count_append_remove_combined(mock_asset_client): + """Verify APPEND and REMOVE on same entity results in 1 API call.""" + client, mock_call_api = mock_asset_client + + table = Table() + table.qualified_name = "test/table" + table.name = "test_table" + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND), + AtlanTag(type_name=AtlanTagName("Tag2"), semantic=SaveSemantic.REMOVE), + ] + + client.save(entity=table) + + # APPEND and REMOVE go in same call (appendTags=True) + assert mock_call_api.call_count == 1 + + +def test_api_call_count_append_replace_different_entities(mock_asset_client): + """Verify APPEND and REPLACE on different entities results in 1 API call (both flags set).""" + client, mock_call_api = mock_asset_client + + table1 = Table() + table1.qualified_name = "test/table1" + table1.name = "test_table1" + table1.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND) + ] + + table2 = Table() + table2.qualified_name = "test/table2" + table2.name = "test_table2" + table2.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag2"), semantic=SaveSemantic.REPLACE) + ] + + client.save(entity=[table1, table2]) + + # SDK makes single API call with both flags set (backend may return error) + assert mock_call_api.call_count == 1 + + +def test_api_call_count_no_semantic(mock_asset_client): + """Verify no semantic (None) results in 1 API call via existing path.""" + client, mock_call_api = mock_asset_client + + table = Table() + table.qualified_name = "test/table" + table.name = "test_table" + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1")) # No semantic + ] + + client.save(entity=table) + + assert mock_call_api.call_count == 1 + + +def test_api_call_count_all_three_semantics(mock_asset_client): + """Verify APPEND, REPLACE, and None semantics results in 1 API call.""" + client, mock_call_api = mock_asset_client + + table_append = Table() + table_append.qualified_name = "test/table_append" + table_append.name = "table_append" + table_append.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND) + ] + + table_replace = Table() + table_replace.qualified_name = "test/table_replace" + table_replace.name = "table_replace" + table_replace.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag2"), semantic=SaveSemantic.REPLACE) + ] + + table_none = Table() + table_none.qualified_name = "test/table_none" + table_none.name = "table_none" + table_none.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag3")) # No semantic + ] + + client.save(entity=[table_append, table_replace, table_none]) + + # SDK makes single API call (backend may return error if conflicting flags) + assert mock_call_api.call_count == 1 + + +def test_api_call_count_multiple_append_batched(mock_asset_client): + """Verify multiple APPEND entities are batched into 1 API call.""" + client, mock_call_api = mock_asset_client + + tables = [] + for i in range(5): + table = Table() + table.qualified_name = f"test/table_{i}" + table.name = f"table_{i}" + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName(f"Tag{i}"), semantic=SaveSemantic.APPEND) + ] + tables.append(table) + + client.save(entity=tables) + + # All APPEND entities batched into single API call + assert mock_call_api.call_count == 1 + + +def test_api_call_count_mixed_on_same_entity(mock_asset_client): + """Verify entity with APPEND+REPLACE+REMOVE results in 1 API call (both flags set).""" + client, mock_call_api = mock_asset_client + + table = Table() + table.qualified_name = "test/table" + table.name = "test_table" + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND), + AtlanTag(type_name=AtlanTagName("Tag2"), semantic=SaveSemantic.REPLACE), + AtlanTag(type_name=AtlanTagName("Tag3"), semantic=SaveSemantic.REMOVE), + ] + + client.save(entity=table) + + # SDK makes single API call with both appendTags and replaceTags set + assert mock_call_api.call_count == 1 + + +def test_api_call_count_asset_without_tags(mock_asset_client): + """Verify asset without tags results in 1 API call (standard save).""" + client, mock_call_api = mock_asset_client + + table = Table() + table.qualified_name = "test/table" + table.name = "test_table" + table.atlan_tags = None # No tags at all + + client.save(entity=table) + + # Standard save path = 1 API call + assert mock_call_api.call_count == 1