diff --git a/CHANGELOG.md b/CHANGELOG.md index f9f11b924..136b8cf46 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Fixed +- Fixed bulk_sync_prep_create_item to properly detect duplicates across indexes. [#575](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/575) + ### Removed ### Updated diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index bfece8bda..3d6eec30c 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -1006,14 +1006,31 @@ async def create_item( database=self.database, settings=self.settings ) features = item_dict["features"] - processed_items = [ + all_prepped = [ bulk_client.preprocess_item( feature, base_url, BulkTransactionMethod.INSERT ) for feature in features ] + # Filter out None values (skipped duplicates from DB check) + processed_items = [item for item in all_prepped if item is not None] + skipped_db_duplicates = len(all_prepped) - len(processed_items) + + # Deduplicate items within the batch by ID (keep last occurrence) + # This matches ES behavior where later items overwrite earlier ones + seen_ids: dict = {} + for item in processed_items: + seen_ids[item["id"]] = item + unique_items = list(seen_ids.values()) + skipped_batch_duplicates = len(processed_items) - len(unique_items) + processed_items = unique_items + + skipped = skipped_db_duplicates + skipped_batch_duplicates attempted = len(processed_items) + if not processed_items: + return f"No items to insert. {skipped} items were skipped (duplicates)." + success, errors = await self.database.bulk_async( collection_id=collection_id, processed_items=processed_items, @@ -1027,7 +1044,7 @@ async def create_item( logger.info( f"Bulk async operation succeeded with {success} actions for collection {collection_id}." ) - return f"Successfully added {success} Items. {attempted - success} errors occurred." + return f"Successfully added {success} Items. {skipped} skipped (duplicates). {attempted - success} errors occurred." # Handle single item await self.database.create_item( @@ -1340,18 +1357,35 @@ def bulk_item_insert( base_url = "" processed_items = [] + skipped_db_duplicates = 0 for item in items.items.values(): try: validated = Item(**item) if not isinstance(item, Item) else item - processed_items.append( - self.preprocess_item( - validated.model_dump(mode="json"), base_url, items.method - ) + prepped = self.preprocess_item( + validated.model_dump(mode="json"), base_url, items.method ) + if prepped is not None: + processed_items.append(prepped) + else: + skipped_db_duplicates += 1 except ValidationError: # Immediately raise on the first invalid item (strict mode) raise + # Deduplicate items within the batch by ID (keep last occurrence) + # This matches ES behavior where later items overwrite earlier ones + seen_ids: dict = {} + for item in processed_items: + seen_ids[item["id"]] = item + unique_items = list(seen_ids.values()) + skipped_batch_duplicates = len(processed_items) - len(unique_items) + processed_items = unique_items + + skipped = skipped_db_duplicates + skipped_batch_duplicates + + if not processed_items: + return f"No items to insert. {skipped} items were skipped (duplicates)." + collection_id = processed_items[0]["collection"] attempted = len(processed_items) success, errors = self.database.bulk_sync( @@ -1364,4 +1398,4 @@ def bulk_item_insert( else: logger.info(f"Bulk sync operation succeeded with {success} actions.") - return f"Successfully added/updated {success} Items. {attempted - success} errors occurred." + return f"Successfully added/updated {success} Items. {skipped} skipped (duplicates). {attempted - success} errors occurred." diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py index d58a3002b..de72217a6 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/database_logic.py @@ -33,11 +33,14 @@ PatchOperation, ) from stac_fastapi.sfeos_helpers.database import ( + ItemAlreadyExistsError, add_bbox_shape_to_collection, apply_collections_bbox_filter_shared, apply_collections_datetime_filter_shared, apply_free_text_filter_shared, apply_intersects_filter_shared, + check_item_exists_in_alias, + check_item_exists_in_alias_sync, create_index_templates_shared, delete_item_index_shared, get_queryables_mapping_shared, @@ -996,6 +999,44 @@ async def check_collection_exists(self, collection_id: str): if not await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id): raise NotFoundError(f"Collection {collection_id} does not exist") + async def _check_item_exists_in_collection( + self, collection_id: str, item_id: str + ) -> bool: + """Check if an item exists across all indexes for a collection. + + Args: + collection_id (str): The collection identifier. + item_id (str): The item identifier. + + Returns: + bool: True if the item exists in any index, False otherwise. + """ + alias = index_alias_by_collection_id(collection_id) + doc_id = mk_item_id(item_id, collection_id) + try: + return await check_item_exists_in_alias(self.client, alias, doc_id) + except Exception: + return False + + def _check_item_exists_in_collection_sync( + self, collection_id: str, item_id: str + ) -> bool: + """Check if an item exists across all indexes for a collection (sync version). + + Args: + collection_id (str): The collection identifier. + item_id (str): The item identifier. + + Returns: + bool: True if the item exists in any index, False otherwise. + """ + alias = index_alias_by_collection_id(collection_id) + doc_id = mk_item_id(item_id, collection_id) + try: + return check_item_exists_in_alias_sync(self.sync_client, alias, doc_id) + except Exception: + return False + async def async_prep_create_item( self, item: Item, base_url: str, exist_ok: bool = False ) -> Item: @@ -1011,31 +1052,21 @@ async def async_prep_create_item( Item: The prepped item. Raises: - ConflictError: If the item already exists in the database. + ItemAlreadyExistsError: If the item already exists in the database. """ await self.check_collection_exists(collection_id=item["collection"]) - alias = index_alias_by_collection_id(item["collection"]) - doc_id = mk_item_id(item["id"], item["collection"]) - - if not exist_ok: - alias_exists = await self.client.indices.exists_alias(name=alias) - - if alias_exists: - alias_info = await self.client.indices.get_alias(name=alias) - indices = list(alias_info.keys()) - for index in indices: - if await self.client.exists(index=index, id=doc_id): - raise ConflictError( - f"Item {item['id']} in collection {item['collection']} already exists" - ) + if not exist_ok and await self._check_item_exists_in_collection( + item["collection"], item["id"] + ): + raise ItemAlreadyExistsError(item["id"], item["collection"]) return self.item_serializer.stac_to_db(item, base_url) async def bulk_async_prep_create_item( self, item: Item, base_url: str, exist_ok: bool = False - ) -> Item: + ) -> Optional[Item]: """ Prepare an item for insertion into the database. @@ -1063,20 +1094,18 @@ async def bulk_async_prep_create_item( # Check if the collection exists await self.check_collection_exists(collection_id=item["collection"]) - # Check if the item already exists in the database - if not exist_ok and await self.client.exists( - index=index_alias_by_collection_id(item["collection"]), - id=mk_item_id(item["id"], item["collection"]), + # Check if the item already exists in the database (across all datetime indexes) + if not exist_ok and await self._check_item_exists_in_collection( + item["collection"], item["id"] ): - error_message = ( - f"Item {item['id']} in collection {item['collection']} already exists." - ) if self.async_settings.raise_on_bulk_error: - raise ConflictError(error_message) + raise ItemAlreadyExistsError(item["id"], item["collection"]) else: logger.warning( - f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false." + f"Item {item['id']} in collection {item['collection']} already exists. " + "Skipping as `RAISE_ON_BULK_ERROR` is set to false." ) + return None # Serialize the item into a database-compatible format prepped_item = self.item_serializer.stac_to_db(item, base_url) @@ -1085,7 +1114,7 @@ async def bulk_async_prep_create_item( def bulk_sync_prep_create_item( self, item: Item, base_url: str, exist_ok: bool = False - ) -> Item: + ) -> Optional[Item]: """ Prepare an item for insertion into the database. @@ -1114,26 +1143,18 @@ def bulk_sync_prep_create_item( if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=item["collection"]): raise NotFoundError(f"Collection {item['collection']} does not exist") - # Check if the item already exists in the database - alias = index_alias_by_collection_id(item["collection"]) - doc_id = mk_item_id(item["id"], item["collection"]) - - if not exist_ok: - alias_exists = self.sync_client.indices.exists_alias(name=alias) - - if alias_exists: - alias_info = self.sync_client.indices.get_alias(name=alias) - indices = list(alias_info.keys()) - - for index in indices: - if self.sync_client.exists(index=index, id=doc_id): - error_message = f"Item {item['id']} in collection {item['collection']} already exists." - if self.sync_settings.raise_on_bulk_error: - raise ConflictError(error_message) - else: - logger.warning( - f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false." - ) + # Check if the item already exists in the database (across all datetime indexes) + if not exist_ok and self._check_item_exists_in_collection_sync( + item["collection"], item["id"] + ): + if self.sync_settings.raise_on_bulk_error: + raise ItemAlreadyExistsError(item["id"], item["collection"]) + else: + logger.warning( + f"Item {item['id']} in collection {item['collection']} already exists. " + "Skipping as `RAISE_ON_BULK_ERROR` is set to false." + ) + return None # Serialize the item into a database-compatible format prepped_item = self.item_serializer.stac_to_db(item, base_url) diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py index 3a2dcbf78..ff1bccd3f 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/database_logic.py @@ -29,11 +29,14 @@ ) from stac_fastapi.opensearch.config import OpensearchSettings as SyncSearchSettings from stac_fastapi.sfeos_helpers.database import ( + ItemAlreadyExistsError, add_bbox_shape_to_collection, apply_collections_bbox_filter_shared, apply_collections_datetime_filter_shared, apply_free_text_filter_shared, apply_intersects_filter_shared, + check_item_exists_in_alias, + check_item_exists_in_alias_sync, create_index_templates_shared, delete_item_index_shared, get_queryables_mapping_shared, @@ -1000,6 +1003,44 @@ async def check_collection_exists(self, collection_id: str): if not await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id): raise NotFoundError(f"Collection {collection_id} does not exist") + async def _check_item_exists_in_collection( + self, collection_id: str, item_id: str + ) -> bool: + """Check if an item exists across all indexes for a collection. + + Args: + collection_id (str): The collection identifier. + item_id (str): The item identifier. + + Returns: + bool: True if the item exists in any index, False otherwise. + """ + alias = index_alias_by_collection_id(collection_id) + doc_id = mk_item_id(item_id, collection_id) + try: + return await check_item_exists_in_alias(self.client, alias, doc_id) + except Exception: + return False + + def _check_item_exists_in_collection_sync( + self, collection_id: str, item_id: str + ) -> bool: + """Check if an item exists across all indexes for a collection (sync version). + + Args: + collection_id (str): The collection identifier. + item_id (str): The item identifier. + + Returns: + bool: True if the item exists in any index, False otherwise. + """ + alias = index_alias_by_collection_id(collection_id) + doc_id = mk_item_id(item_id, collection_id) + try: + return check_item_exists_in_alias_sync(self.sync_client, alias, doc_id) + except Exception: + return False + async def async_prep_create_item( self, item: Item, base_url: str, exist_ok: bool = False ) -> Item: @@ -1015,31 +1056,21 @@ async def async_prep_create_item( Item: The prepped item. Raises: - ConflictError: If the item already exists in the database. + ItemAlreadyExistsError: If the item already exists in the database. """ await self.check_collection_exists(collection_id=item["collection"]) - alias = index_alias_by_collection_id(item["collection"]) - doc_id = mk_item_id(item["id"], item["collection"]) - - if not exist_ok: - alias_exists = await self.client.indices.exists_alias(name=alias) - if alias_exists: - alias_info = await self.client.indices.get_alias(name=alias) - indices = list(alias_info.keys()) - - for index in indices: - if await self.client.exists(index=index, id=doc_id): - raise ConflictError( - f"Item {item['id']} in collection {item['collection']} already exists" - ) + if not exist_ok and await self._check_item_exists_in_collection( + item["collection"], item["id"] + ): + raise ItemAlreadyExistsError(item["id"], item["collection"]) return self.item_serializer.stac_to_db(item, base_url) async def bulk_async_prep_create_item( self, item: Item, base_url: str, exist_ok: bool = False - ) -> Item: + ) -> Optional[Item]: """ Prepare an item for insertion into the database. @@ -1067,20 +1098,19 @@ async def bulk_async_prep_create_item( # Check if the collection exists await self.check_collection_exists(collection_id=item["collection"]) - # Check if the item already exists in the database - if not exist_ok and await self.client.exists( - index=index_alias_by_collection_id(item["collection"]), - id=mk_item_id(item["id"], item["collection"]), + # Check if the item already exists in the database (across all datetime indexes) + if not exist_ok and await self._check_item_exists_in_collection( + item["collection"], item["id"] ): - error_message = ( - f"Item {item['id']} in collection {item['collection']} already exists." - ) if self.async_settings.raise_on_bulk_error: - raise ConflictError(error_message) + raise ItemAlreadyExistsError(item["id"], item["collection"]) else: logger.warning( - f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false." + f"Item {item['id']} in collection {item['collection']} already exists. " + "Skipping as `RAISE_ON_BULK_ERROR` is set to false." ) + return None + # Serialize the item into a database-compatible format prepped_item = self.item_serializer.stac_to_db(item, base_url) logger.debug(f"Item {item['id']} prepared successfully.") @@ -1088,7 +1118,7 @@ async def bulk_async_prep_create_item( def bulk_sync_prep_create_item( self, item: Item, base_url: str, exist_ok: bool = False - ) -> Item: + ) -> Optional[Item]: """ Prepare an item for insertion into the database. @@ -1117,26 +1147,18 @@ def bulk_sync_prep_create_item( if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=item["collection"]): raise NotFoundError(f"Collection {item['collection']} does not exist") - # Check if the item already exists in the database - alias = index_alias_by_collection_id(item["collection"]) - doc_id = mk_item_id(item["id"], item["collection"]) - - if not exist_ok: - alias_exists = self.sync_client.indices.exists_alias(name=alias) - - if alias_exists: - alias_info = self.sync_client.indices.get_alias(name=alias) - indices = list(alias_info.keys()) - - for index in indices: - if self.sync_client.exists(index=index, id=doc_id): - error_message = f"Item {item['id']} in collection {item['collection']} already exists." - if self.sync_settings.raise_on_bulk_error: - raise ConflictError(error_message) - else: - logger.warning( - f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false." - ) + # Check if the item already exists in the database (across all datetime indexes) + if not exist_ok and self._check_item_exists_in_collection_sync( + item["collection"], item["id"] + ): + if self.sync_settings.raise_on_bulk_error: + raise ItemAlreadyExistsError(item["id"], item["collection"]) + else: + logger.warning( + f"Item {item['id']} in collection {item['collection']} already exists. " + "Skipping as `RAISE_ON_BULK_ERROR` is set to false." + ) + return None # Serialize the item into a database-compatible format prepped_item = self.item_serializer.stac_to_db(item, base_url) diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/__init__.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/__init__.py index 207e6f4df..005d62c67 100644 --- a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/__init__.py +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/__init__.py @@ -60,7 +60,14 @@ apply_intersects_filter_shared, populate_sort_shared, ) -from .utils import add_bbox_shape_to_collection, get_bool_env, validate_refresh +from .utils import ( + ItemAlreadyExistsError, + add_bbox_shape_to_collection, + check_item_exists_in_alias, + check_item_exists_in_alias_sync, + get_bool_env, + validate_refresh, +) __all__ = [ # Catalog operations @@ -90,6 +97,10 @@ "validate_refresh", "get_bool_env", "add_bbox_shape_to_collection", + "check_item_exists_in_alias", + "check_item_exists_in_alias_sync", + # Errors + "ItemAlreadyExistsError", # Datetime utilities "return_date", "extract_date", diff --git a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/utils.py b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/utils.py index d1c838181..da137607e 100644 --- a/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/utils.py +++ b/stac_fastapi/sfeos_helpers/stac_fastapi/sfeos_helpers/database/utils.py @@ -14,10 +14,73 @@ PatchRemove, ) from stac_fastapi.sfeos_helpers.models.patch import ElasticPath, ESCommandSet +from stac_fastapi.types.errors import ConflictError logger = logging.getLogger(__name__) +class ItemAlreadyExistsError(ConflictError): + """Error raised when attempting to create an item that already exists. + + Attributes: + item_id: The ID of the item that already exists. + collection_id: The ID of the collection containing the item. + """ + + def __init__(self, item_id: str, collection_id: str): + """Initialize the error with item and collection IDs.""" + self.item_id = item_id + self.collection_id = collection_id + message = f"Item {item_id} in collection {collection_id} already exists" + super().__init__(message) + + +async def check_item_exists_in_alias(client: Any, alias: str, doc_id: str) -> bool: + """Check if an item exists across all indexes for an alias. + + Args: + client: The async Elasticsearch/OpenSearch client. + alias: The index alias to search against. + doc_id: The document ID to check for existence. + + Returns: + bool: True if the item exists in any index under the alias, False otherwise. + """ + resp = await client.search( + index=alias, + body={ + "query": {"ids": {"values": [doc_id]}}, + "_source": False, + }, + size=0, + terminate_after=1, + ) + return bool(resp["hits"]["total"]["value"]) + + +def check_item_exists_in_alias_sync(client: Any, alias: str, doc_id: str) -> bool: + """Check if an item exists across all indexes for an alias (sync). + + Args: + client: The sync Elasticsearch/OpenSearch client. + alias: The index alias to search against. + doc_id: The document ID to check for existence. + + Returns: + bool: True if the item exists in any index under the alias, False otherwise. + """ + resp = client.search( + index=alias, + body={ + "query": {"ids": {"values": [doc_id]}}, + "_source": False, + }, + size=0, + terminate_after=1, + ) + return bool(resp["hits"]["total"]["value"]) + + def add_bbox_shape_to_collection(collection: Dict[str, Any]) -> bool: """Add bbox_shape field to a collection document for spatial queries. diff --git a/stac_fastapi/tests/database/test_utils.py b/stac_fastapi/tests/database/test_utils.py new file mode 100644 index 000000000..4eeb485cb --- /dev/null +++ b/stac_fastapi/tests/database/test_utils.py @@ -0,0 +1,151 @@ +"""Tests for database utility functions.""" + +import uuid +from copy import deepcopy + +import pytest + +from stac_fastapi.sfeos_helpers.database import ( + check_item_exists_in_alias, + check_item_exists_in_alias_sync, + index_alias_by_collection_id, + mk_item_id, +) + +from ..conftest import create_item, database + + +@pytest.mark.asyncio +async def test_check_item_exists_in_alias_returns_true_when_exists(ctx, txn_client): + """Test that check_item_exists_in_alias returns True when item exists.""" + collection_id = ctx.collection["id"] + item_id = ctx.item["id"] + + alias = index_alias_by_collection_id(collection_id) + doc_id = mk_item_id(item_id, collection_id) + assert doc_id == f"{item_id}|{collection_id}" + + result = await check_item_exists_in_alias(database.client, alias, doc_id) + + assert result is True + + +@pytest.mark.asyncio +async def test_check_item_exists_in_alias_returns_false_when_not_exists(ctx): + """Test that check_item_exists_in_alias returns False when item doesn't exist.""" + collection_id = ctx.collection["id"] + non_existent_item_id = str(uuid.uuid4()) + + alias = index_alias_by_collection_id(collection_id) + doc_id = mk_item_id(non_existent_item_id, collection_id) + assert doc_id == f"{non_existent_item_id}|{collection_id}" + + result = await check_item_exists_in_alias(database.client, alias, doc_id) + + assert result is False + + +@pytest.mark.asyncio +async def test_check_item_exists_in_alias_sync_returns_true_when_exists(ctx): + """Test that check_item_exists_in_alias_sync returns True when item exists.""" + collection_id = ctx.collection["id"] + item_id = ctx.item["id"] + + alias = index_alias_by_collection_id(collection_id) + doc_id = mk_item_id(item_id, collection_id) + assert doc_id == f"{item_id}|{collection_id}" + + result = check_item_exists_in_alias_sync(database.sync_client, alias, doc_id) + + assert result is True + + +@pytest.mark.asyncio +async def test_check_item_exists_in_alias_sync_returns_false_when_not_exists(ctx): + """Test that check_item_exists_in_alias_sync returns False when item doesn't exist.""" + collection_id = ctx.collection["id"] + non_existent_item_id = str(uuid.uuid4()) + + alias = index_alias_by_collection_id(collection_id) + doc_id = mk_item_id(non_existent_item_id, collection_id) + assert doc_id == f"{non_existent_item_id}|{collection_id}" + + result = check_item_exists_in_alias_sync(database.sync_client, alias, doc_id) + + assert result is False + + +@pytest.mark.asyncio +async def test_check_item_exists_in_alias_with_multiple_items(ctx, txn_client): + """Test check_item_exists_in_alias works correctly with multiple items in collection.""" + collection_id = ctx.collection["id"] + alias = index_alias_by_collection_id(collection_id) + + # Create additional items + additional_item_ids = [] + for i in range(3): + new_item = deepcopy(ctx.item) + new_item["id"] = str(uuid.uuid4()) + await create_item(txn_client, new_item) + additional_item_ids.append(new_item["id"]) + + original_doc_id = mk_item_id(ctx.item["id"], collection_id) + assert original_doc_id == f"{ctx.item['id']}|{collection_id}" + assert ( + await check_item_exists_in_alias(database.client, alias, original_doc_id) + is True + ) + + for item_id in additional_item_ids: + doc_id = mk_item_id(item_id, collection_id) + assert doc_id == f"{item_id}|{collection_id}" + assert await check_item_exists_in_alias(database.client, alias, doc_id) is True + + non_existent_item_id = str(uuid.uuid4()) + non_existent_doc_id = mk_item_id(non_existent_item_id, collection_id) + assert non_existent_doc_id == f"{non_existent_item_id}|{collection_id}" + assert ( + await check_item_exists_in_alias(database.client, alias, non_existent_doc_id) + is False + ) + + +@pytest.mark.asyncio +async def test_check_item_exists_with_different_datetime(ctx, txn_client): + """ + Test that check_item_exists_in_alias finds items regardless of datetime value. + + This test verifies the core functionality that the optimized search query + correctly finds items across different datetime partitions (when datetime + index filtering is enabled). + """ + collection_id = ctx.collection["id"] + alias = index_alias_by_collection_id(collection_id) + + # Create an item with a significantly different datetime + new_item = deepcopy(ctx.item) + new_item["id"] = str(uuid.uuid4()) + new_item["properties"]["datetime"] = "2030-12-31T23:59:59Z" + await create_item(txn_client, new_item) + + # Create another item with a different datetime + another_item = deepcopy(ctx.item) + another_item["id"] = str(uuid.uuid4()) + another_item["properties"]["datetime"] = "2020-01-01T00:00:00Z" + await create_item(txn_client, another_item) + + doc_id_1 = mk_item_id(new_item["id"], collection_id) + doc_id_2 = mk_item_id(another_item["id"], collection_id) + assert doc_id_1 == f"{new_item['id']}|{collection_id}" + assert doc_id_2 == f"{another_item['id']}|{collection_id}" + + assert await check_item_exists_in_alias(database.client, alias, doc_id_1) is True + assert await check_item_exists_in_alias(database.client, alias, doc_id_2) is True + + # Sync versions should also work + assert ( + check_item_exists_in_alias_sync(database.sync_client, alias, doc_id_1) is True + ) + assert ( + check_item_exists_in_alias_sync(database.sync_client, alias, doc_id_2) is True + ) diff --git a/stac_fastapi/tests/extensions/test_bulk_transactions.py b/stac_fastapi/tests/extensions/test_bulk_transactions.py index a74059382..5beb05c79 100644 --- a/stac_fastapi/tests/extensions/test_bulk_transactions.py +++ b/stac_fastapi/tests/extensions/test_bulk_transactions.py @@ -6,7 +6,7 @@ from pydantic import ValidationError from stac_fastapi.extensions.third_party.bulk_transactions import Items -from stac_fastapi.types.errors import ConflictError +from stac_fastapi.sfeos_helpers.database import ItemAlreadyExistsError from ..conftest import MockRequest, create_item @@ -42,7 +42,7 @@ async def test_bulk_item_insert_with_raise_on_error( """ Test bulk_item_insert behavior with RAISE_ON_BULK_ERROR set to true and false. - This test verifies that when RAISE_ON_BULK_ERROR is set to true, a ConflictError + This test verifies that when RAISE_ON_BULK_ERROR is set to true, a ItemAlreadyExistsError is raised for conflicting items. When set to false, the operation logs errors and continues gracefully. """ @@ -63,7 +63,7 @@ async def test_bulk_item_insert_with_raise_on_error( os.environ["RAISE_ON_BULK_ERROR"] = "true" bulk_txn_client.database.sync_settings = SearchSettings() - with pytest.raises(ConflictError): + with pytest.raises(ItemAlreadyExistsError): bulk_txn_client.bulk_item_insert(Items(items=conflicting_items), refresh=True) # Test with RAISE_ON_BULK_ERROR set to false @@ -73,8 +73,8 @@ async def test_bulk_item_insert_with_raise_on_error( Items(items=conflicting_items), refresh=True ) - # Validate the results - assert "Successfully added/updated 1 Items" in result + # Validate the results - duplicate should be skipped, not inserted + assert "1 items were skipped (duplicates)" in result # Clean up the inserted item await txn_client.delete_item(initial_item["id"], ctx.item["collection"]) @@ -148,3 +148,222 @@ async def test_feature_collection_insert_validation_error( # Assert that a ValidationError is raised due to the invalid item with pytest.raises(ValidationError): await create_item(txn_client, feature_collection) + + +@pytest.mark.asyncio +async def test_feature_collection_insert_duplicate_detection( + ctx, core_client, txn_client +): + """ + Test that duplicate items are detected when inserting via FeatureCollection. + + This test verifies that when an item already exists in the collection, + attempting to insert the same item ID via FeatureCollection will raise + a ItemAlreadyExistsError (when RAISE_ON_BULK_ERROR is true). + """ + # ctx.item is already created in the fixture + existing_item_id = ctx.item["id"] + + # Create a FeatureCollection with a duplicate item (same ID as existing) + duplicate_item = deepcopy(ctx.item) + # Change datetime to simulate the scenario where same item ID is sent with different datetime + duplicate_item["properties"]["datetime"] = "2025-06-15T00:00:00Z" + + feature_collection = {"type": "FeatureCollection", "features": [duplicate_item]} + + # Set RAISE_ON_BULK_ERROR to true to get ItemAlreadyExistsError + os.environ["RAISE_ON_BULK_ERROR"] = "true" + txn_client.database.sync_settings = SearchSettings() + + # Should raise ItemAlreadyExistsError because item already exists + with pytest.raises(ItemAlreadyExistsError) as exc_info: + await create_item(txn_client, feature_collection) + + assert existing_item_id in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_feature_collection_insert_duplicate_with_different_datetime( + ctx, core_client, txn_client +): + """ + Test that duplicate detection works when item has different datetime. + + This test specifically verifies the fix for the datetime index filtering issue + where items with different datetime values could potentially bypass duplicate + detection when stored in different datetime-based indexes. + """ + existing_item_id = ctx.item["id"] + + # Create a duplicate item with significantly different datetime + duplicate_item = deepcopy(ctx.item) + duplicate_item["properties"]["datetime"] = "2030-12-31T23:59:59Z" + + feature_collection = {"type": "FeatureCollection", "features": [duplicate_item]} + + os.environ["RAISE_ON_BULK_ERROR"] = "true" + txn_client.database.sync_settings = SearchSettings() + + # Should still detect the duplicate even with different datetime + with pytest.raises(ItemAlreadyExistsError) as exc_info: + await create_item(txn_client, feature_collection) + + assert existing_item_id in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_bulk_sync_duplicate_detection( + ctx, core_client, txn_client, bulk_txn_client +): + """ + Test that bulk_sync_prep_create_item properly detects duplicates across indexes. + + This test verifies that the synchronous bulk insert also correctly detects + duplicates when an item with the same ID already exists in the collection. + """ + existing_item_id = ctx.item["id"] + + # Create a duplicate item with different datetime + duplicate_item = deepcopy(ctx.item) + duplicate_item["properties"]["datetime"] = "2028-07-20T12:00:00Z" + + conflicting_item = {existing_item_id: duplicate_item} + + # Test with RAISE_ON_BULK_ERROR set to true + os.environ["RAISE_ON_BULK_ERROR"] = "true" + bulk_txn_client.database.sync_settings = SearchSettings() + + with pytest.raises(ItemAlreadyExistsError) as exc_info: + bulk_txn_client.bulk_item_insert(Items(items=conflicting_item), refresh=True) + + assert existing_item_id in str(exc_info.value) + assert exc_info.value.item_id == existing_item_id + assert exc_info.value.collection_id == ctx.item["collection"] + + +@pytest.mark.asyncio +async def test_bulk_insert_multiple_items_with_one_duplicate( + ctx, core_client, txn_client, bulk_txn_client +): + """ + Test bulk insert behavior when one item out of many is a duplicate. + + When RAISE_ON_BULK_ERROR is true, the entire batch should fail if any + item is a duplicate. + """ + existing_item_id = ctx.item["id"] + + # Create items: 2 new + 1 duplicate + items = {} + + # Add 2 new valid items + for i in range(2): + new_item = deepcopy(ctx.item) + new_item["id"] = str(uuid.uuid4()) + items[new_item["id"]] = new_item + + # Add 1 duplicate item (with different datetime) + duplicate_item = deepcopy(ctx.item) + duplicate_item["properties"]["datetime"] = "2027-03-15T08:30:00Z" + items[existing_item_id] = duplicate_item + + os.environ["RAISE_ON_BULK_ERROR"] = "true" + bulk_txn_client.database.sync_settings = SearchSettings() + + # Should fail on the duplicate + with pytest.raises(ItemAlreadyExistsError) as exc_info: + bulk_txn_client.bulk_item_insert(Items(items=items), refresh=True) + + assert existing_item_id in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_bulk_insert_with_in_batch_duplicates(ctx, core_client, bulk_txn_client): + """ + Test bulk insert behavior when the same item ID appears multiple times in the batch. + + When the same ID is submitted multiple times in a single batch: + - Duplicates within the batch should be detected and counted as skipped + - Only the last occurrence of each ID should be inserted + - The response message should accurately reflect the number of items added + """ + # Create a unique item for this test + test_item = deepcopy(ctx.item) + unique_id = str(uuid.uuid4()) + test_item["id"] = unique_id + + # Create 3 copies of the same item with the same ID + items = {} + for i in range(3): + item_copy = deepcopy(test_item) + item_copy["properties"]["description"] = f"Version {i}" + items[f"{unique_id}_{i}"] = item_copy # Different keys but same item ID + + result = bulk_txn_client.bulk_item_insert(Items(items=items), refresh=True) + + # Should report 1 item added and 2 skipped (in-batch duplicates) + # bulk_item_insert returns: "Successfully added/updated {n} Items. {m} skipped (duplicates). {k} errors occurred." + assert "Successfully added/updated 1 Items" in result + assert "2 skipped (duplicates)" in result + + # Verify only 1 item exists in the collection with this ID + fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest()) + matching_items = [f for f in fc["features"] if f["id"] == unique_id] + assert len(matching_items) == 1 + + # Clean up + from stac_fastapi.core.core import TransactionsClient + + txn = TransactionsClient(database=bulk_txn_client.database, settings=None) + await txn.delete_item(unique_id, ctx.item["collection"]) + + +@pytest.mark.asyncio +async def test_feature_collection_insert_with_in_batch_duplicates( + ctx, core_client, txn_client +): + """ + Test FeatureCollection insert behavior when the same item ID appears multiple times. + + When the same ID is submitted multiple times in a FeatureCollection: + - Duplicates within the batch should be detected and counted as skipped + - Only the last occurrence of each ID should be inserted + - The response message should accurately reflect the number of items added + """ + from stac_pydantic import ItemCollection as api_ItemCollection + + unique_id = str(uuid.uuid4()) + + # Create 3 features with the same ID but different properties + features = [] + for i in range(3): + item = deepcopy(ctx.item) + item["id"] = unique_id + item["properties"]["description"] = f"Version {i}" + features.append(item) + + feature_collection = {"type": "FeatureCollection", "features": features} + + # Call create_item directly to get the result string + result = await txn_client.create_item( + collection_id=ctx.collection["id"], + item=api_ItemCollection(**feature_collection), + request=MockRequest(), + refresh=True, + ) + + # Should report 1 item added and 2 skipped (in-batch duplicates) + # create_item (FeatureCollection) returns: "Successfully added {n} Items. {m} skipped (duplicates). {k} errors occurred." + assert "Successfully added 1 Items" in result + assert "2 skipped (duplicates)" in result + + # Verify only 1 item exists in the collection with this ID + fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest()) + matching_items = [f for f in fc["features"] if f["id"] == unique_id] + assert len(matching_items) == 1 + + # The last version should be the one that was inserted + assert matching_items[0]["properties"].get("description") == "Version 2" + + # Clean up + await txn_client.delete_item(unique_id, ctx.item["collection"])