Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Fixed

- Fixed bulk_sync_prep_create_item to properly detect duplicates across indexes. [#575](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/575)

### Removed

### Updated
Expand Down
48 changes: 41 additions & 7 deletions stac_fastapi/core/stac_fastapi/core/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1006,14 +1006,31 @@ async def create_item(
database=self.database, settings=self.settings
)
features = item_dict["features"]
processed_items = [
all_prepped = [
bulk_client.preprocess_item(
feature, base_url, BulkTransactionMethod.INSERT
)
for feature in features
]
# Filter out None values (skipped duplicates from DB check)
processed_items = [item for item in all_prepped if item is not None]
skipped_db_duplicates = len(all_prepped) - len(processed_items)

# Deduplicate items within the batch by ID (keep last occurrence)
# This matches ES behavior where later items overwrite earlier ones
seen_ids: dict = {}
for item in processed_items:
seen_ids[item["id"]] = item
unique_items = list(seen_ids.values())
skipped_batch_duplicates = len(processed_items) - len(unique_items)
processed_items = unique_items

skipped = skipped_db_duplicates + skipped_batch_duplicates
attempted = len(processed_items)

if not processed_items:
return f"No items to insert. {skipped} items were skipped (duplicates)."

success, errors = await self.database.bulk_async(
collection_id=collection_id,
processed_items=processed_items,
Expand All @@ -1027,7 +1044,7 @@ async def create_item(
logger.info(
f"Bulk async operation succeeded with {success} actions for collection {collection_id}."
)
return f"Successfully added {success} Items. {attempted - success} errors occurred."
return f"Successfully added {success} Items. {skipped} skipped (duplicates). {attempted - success} errors occurred."

# Handle single item
await self.database.create_item(
Expand Down Expand Up @@ -1340,18 +1357,35 @@ def bulk_item_insert(
base_url = ""

processed_items = []
skipped_db_duplicates = 0
for item in items.items.values():
try:
validated = Item(**item) if not isinstance(item, Item) else item
processed_items.append(
self.preprocess_item(
validated.model_dump(mode="json"), base_url, items.method
)
prepped = self.preprocess_item(
validated.model_dump(mode="json"), base_url, items.method
)
if prepped is not None:
processed_items.append(prepped)
else:
skipped_db_duplicates += 1
except ValidationError:
# Immediately raise on the first invalid item (strict mode)
raise

# Deduplicate items within the batch by ID (keep last occurrence)
# This matches ES behavior where later items overwrite earlier ones
seen_ids: dict = {}
for item in processed_items:
seen_ids[item["id"]] = item
unique_items = list(seen_ids.values())
skipped_batch_duplicates = len(processed_items) - len(unique_items)
processed_items = unique_items

skipped = skipped_db_duplicates + skipped_batch_duplicates

if not processed_items:
return f"No items to insert. {skipped} items were skipped (duplicates)."

collection_id = processed_items[0]["collection"]
attempted = len(processed_items)
success, errors = self.database.bulk_sync(
Expand All @@ -1364,4 +1398,4 @@ def bulk_item_insert(
else:
logger.info(f"Bulk sync operation succeeded with {success} actions.")

return f"Successfully added/updated {success} Items. {attempted - success} errors occurred."
return f"Successfully added/updated {success} Items. {skipped} skipped (duplicates). {attempted - success} errors occurred."
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@
PatchOperation,
)
from stac_fastapi.sfeos_helpers.database import (
ItemAlreadyExistsError,
add_bbox_shape_to_collection,
apply_collections_bbox_filter_shared,
apply_collections_datetime_filter_shared,
apply_free_text_filter_shared,
apply_intersects_filter_shared,
check_item_exists_in_alias,
check_item_exists_in_alias_sync,
create_index_templates_shared,
delete_item_index_shared,
get_queryables_mapping_shared,
Expand Down Expand Up @@ -996,6 +999,44 @@ async def check_collection_exists(self, collection_id: str):
if not await self.client.exists(index=COLLECTIONS_INDEX, id=collection_id):
raise NotFoundError(f"Collection {collection_id} does not exist")

async def _check_item_exists_in_collection(
self, collection_id: str, item_id: str
) -> bool:
"""Check if an item exists across all indexes for a collection.

Args:
collection_id (str): The collection identifier.
item_id (str): The item identifier.

Returns:
bool: True if the item exists in any index, False otherwise.
"""
alias = index_alias_by_collection_id(collection_id)
doc_id = mk_item_id(item_id, collection_id)
try:
return await check_item_exists_in_alias(self.client, alias, doc_id)
Copy link
Collaborator

@YuriZmytrakov YuriZmytrakov Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional, but would be great. I can see that its raising ConflictError, please consider adding a custom error that would have a relevant message error that this item already exists to improve the error quality.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

except Exception:
return False

def _check_item_exists_in_collection_sync(
self, collection_id: str, item_id: str
) -> bool:
"""Check if an item exists across all indexes for a collection (sync version).

Args:
collection_id (str): The collection identifier.
item_id (str): The item identifier.

Returns:
bool: True if the item exists in any index, False otherwise.
"""
alias = index_alias_by_collection_id(collection_id)
doc_id = mk_item_id(item_id, collection_id)
try:
return check_item_exists_in_alias_sync(self.sync_client, alias, doc_id)
except Exception:
return False

async def async_prep_create_item(
self, item: Item, base_url: str, exist_ok: bool = False
) -> Item:
Expand All @@ -1011,31 +1052,21 @@ async def async_prep_create_item(
Item: The prepped item.

Raises:
ConflictError: If the item already exists in the database.
ItemAlreadyExistsError: If the item already exists in the database.

"""
await self.check_collection_exists(collection_id=item["collection"])
alias = index_alias_by_collection_id(item["collection"])
doc_id = mk_item_id(item["id"], item["collection"])

if not exist_ok:
alias_exists = await self.client.indices.exists_alias(name=alias)

if alias_exists:
alias_info = await self.client.indices.get_alias(name=alias)
indices = list(alias_info.keys())

for index in indices:
if await self.client.exists(index=index, id=doc_id):
raise ConflictError(
f"Item {item['id']} in collection {item['collection']} already exists"
)
if not exist_ok and await self._check_item_exists_in_collection(
item["collection"], item["id"]
):
raise ItemAlreadyExistsError(item["id"], item["collection"])

return self.item_serializer.stac_to_db(item, base_url)

async def bulk_async_prep_create_item(
self, item: Item, base_url: str, exist_ok: bool = False
) -> Item:
) -> Optional[Item]:
"""
Prepare an item for insertion into the database.

Expand Down Expand Up @@ -1063,20 +1094,18 @@ async def bulk_async_prep_create_item(
# Check if the collection exists
await self.check_collection_exists(collection_id=item["collection"])

# Check if the item already exists in the database
if not exist_ok and await self.client.exists(
index=index_alias_by_collection_id(item["collection"]),
id=mk_item_id(item["id"], item["collection"]),
# Check if the item already exists in the database (across all datetime indexes)
if not exist_ok and await self._check_item_exists_in_collection(
item["collection"], item["id"]
):
error_message = (
f"Item {item['id']} in collection {item['collection']} already exists."
)
if self.async_settings.raise_on_bulk_error:
raise ConflictError(error_message)
raise ItemAlreadyExistsError(item["id"], item["collection"])
else:
logger.warning(
f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false."
f"Item {item['id']} in collection {item['collection']} already exists. "
"Skipping as `RAISE_ON_BULK_ERROR` is set to false."
)
return None

# Serialize the item into a database-compatible format
prepped_item = self.item_serializer.stac_to_db(item, base_url)
Expand All @@ -1085,7 +1114,7 @@ async def bulk_async_prep_create_item(

def bulk_sync_prep_create_item(
self, item: Item, base_url: str, exist_ok: bool = False
) -> Item:
) -> Optional[Item]:
"""
Prepare an item for insertion into the database.

Expand Down Expand Up @@ -1114,26 +1143,18 @@ def bulk_sync_prep_create_item(
if not self.sync_client.exists(index=COLLECTIONS_INDEX, id=item["collection"]):
raise NotFoundError(f"Collection {item['collection']} does not exist")

# Check if the item already exists in the database
alias = index_alias_by_collection_id(item["collection"])
doc_id = mk_item_id(item["id"], item["collection"])

if not exist_ok:
alias_exists = self.sync_client.indices.exists_alias(name=alias)

if alias_exists:
alias_info = self.sync_client.indices.get_alias(name=alias)
indices = list(alias_info.keys())

for index in indices:
if self.sync_client.exists(index=index, id=doc_id):
error_message = f"Item {item['id']} in collection {item['collection']} already exists."
if self.sync_settings.raise_on_bulk_error:
raise ConflictError(error_message)
else:
logger.warning(
f"{error_message} Continuing as `RAISE_ON_BULK_ERROR` is set to false."
)
# Check if the item already exists in the database (across all datetime indexes)
if not exist_ok and self._check_item_exists_in_collection_sync(
item["collection"], item["id"]
):
if self.sync_settings.raise_on_bulk_error:
raise ItemAlreadyExistsError(item["id"], item["collection"])
else:
logger.warning(
f"Item {item['id']} in collection {item['collection']} already exists. "
"Skipping as `RAISE_ON_BULK_ERROR` is set to false."
)
return None

# Serialize the item into a database-compatible format
prepped_item = self.item_serializer.stac_to_db(item, base_url)
Expand Down
Loading