From 33892ebb41093e8aa23132a8f8e4ad2af20a7ee1 Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Sat, 4 Apr 2026 14:59:22 +0100 Subject: [PATCH 1/7] Extract WaitAckCache to deduplicate bulk response handling in OpenSearch bolts The waitAck cache logic and bulk response processing were duplicated across DeletionBolt, IndexerBolt, and StatusUpdaterBolt. This extracts the shared logic into a new WaitAckCache class and adds unit tests covering the core scenarios (success, failure, conflicts, eviction, duplicate doc IDs). Co-Authored-By: Claude Opus 4.6 --- .../stormcrawler/opensearch/WaitAckCache.java | 304 ++++++++++++++++ .../opensearch/bolt/DeletionBolt.java | 198 +---------- .../opensearch/bolt/IndexerBolt.java | 253 ++------------ .../persistence/StatusUpdaterBolt.java | 233 ++----------- .../opensearch/WaitAckCacheTest.java | 330 ++++++++++++++++++ 5 files changed, 723 insertions(+), 595 deletions(-) create mode 100644 external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/WaitAckCache.java create mode 100644 external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/WaitAckCache.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/WaitAckCache.java new file mode 100644 index 000000000..768a4e788 --- /dev/null +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/WaitAckCache.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.opensearch; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.storm.metric.api.MultiCountMetric; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; +import org.jetbrains.annotations.Nullable; +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.bulk.BulkItemResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.core.rest.RestStatus; +import org.slf4j.Logger; + +/** + * Thread-safe cache that tracks in-flight tuples awaiting bulk acknowledgment from OpenSearch. + * Provides shared logic for processing bulk responses and failing tuples on error, used by + * IndexerBolt, DeletionBolt, and StatusUpdaterBolt. + */ +public class WaitAckCache { + + /** Callback invoked for each tuple when processing a successful bulk response. */ + @FunctionalInterface + public interface TupleAction { + void handle(String id, Tuple tuple, BulkItemResponseToFailedFlag selected); + } + + private final Cache> cache; + private final java.util.concurrent.locks.ReentrantLock lock = + new java.util.concurrent.locks.ReentrantLock(true); + private final Logger log; + + /** Creates a cache with a fixed 60-second expiry. */ + public WaitAckCache(Logger log, Consumer onEviction) { + this(Caffeine.newBuilder().expireAfterWrite(60, TimeUnit.SECONDS), log, onEviction); + } + + /** Creates a cache from a Caffeine spec string (e.g. "expireAfterWrite=300s"). */ + public WaitAckCache(String cacheSpec, Logger log, Consumer onEviction) { + this(Caffeine.from(cacheSpec), log, onEviction); + } + + private WaitAckCache( + Caffeine builder, Logger log, Consumer onEviction) { + this.log = log; + this.cache = + builder.>removalListener( + (String key, List value, RemovalCause cause) -> { + if (!cause.wasEvicted()) { + return; + } + if (value != null) { + log.error( + "Purged from waitAck {} with {} values", + key, + value.size()); + for (Tuple t : value) { + onEviction.accept(t); + } + } else { + log.error("Purged from waitAck {} with no values", key); + } + }) + .build(); + } + + /** Registers a gauge metric that reports the estimated cache size. */ + public void registerMetric(TopologyContext context, String name, int timeBucketSecs) { + context.registerMetric(name, () -> cache.estimatedSize(), timeBucketSecs); + } + + public long estimatedSize() { + return cache.estimatedSize(); + } + + /** Adds a tuple to the cache under the given document ID, creating the list if needed. */ + public void addTuple(String docID, Tuple tuple) { + lock.lock(); + try { + List tt = cache.get(docID, k -> new LinkedList<>()); + tt.add(tuple); + if (log.isDebugEnabled()) { + String url = (String) tuple.getValueByField("url"); + log.debug("Added to waitAck {} with ID {} total {}", url, docID, tt.size()); + } + } finally { + lock.unlock(); + } + } + + /** Returns true if the cache contains an entry for the given document ID. */ + public boolean contains(String docID) { + lock.lock(); + try { + return cache.getIfPresent(docID) != null; + } finally { + lock.unlock(); + } + } + + /** Invalidates a single cache entry. */ + public void invalidate(String docID) { + lock.lock(); + try { + cache.invalidate(docID); + } finally { + lock.unlock(); + } + } + + /** + * Processes a successful bulk response: classifies each item (conflict vs failure), retrieves + * cached tuples, selects the best response per document ID, and invokes the action for each + * tuple. + * + * @param conflictCounter optional metric counter; if non-null, increments "doc_conflicts" scope + * for each conflict + */ + public void processBulkResponse( + BulkResponse response, + long executionId, + @Nullable MultiCountMetric conflictCounter, + TupleAction action) { + + var idsToBulkItems = + Arrays.stream(response.getItems()) + .map( + bir -> { + BulkItemResponse.Failure f = bir.getFailure(); + boolean failed = false; + if (f != null) { + if (f.getStatus().equals(RestStatus.CONFLICT)) { + if (conflictCounter != null) { + conflictCounter + .scope("doc_conflicts") + .incrBy(1); + } + log.debug("Doc conflict ID {}", bir.getId()); + } else { + log.error( + "Bulk item failure ID {}: {}", + bir.getId(), + f); + failed = true; + } + } + return new BulkItemResponseToFailedFlag(bir, failed); + }) + .collect( + // https://github.com/apache/stormcrawler/issues/832 + Collectors.groupingBy( + b -> b.id, Collectors.toUnmodifiableList())); + + Map> presentTuples; + long estimatedSize; + Set debugInfo = null; + lock.lock(); + try { + presentTuples = cache.getAllPresent(idsToBulkItems.keySet()); + if (!presentTuples.isEmpty()) { + cache.invalidateAll(presentTuples.keySet()); + } + estimatedSize = cache.estimatedSize(); + if (log.isDebugEnabled() && estimatedSize > 0L) { + debugInfo = new HashSet<>(cache.asMap().keySet()); + } + } finally { + lock.unlock(); + } + + int ackCount = 0; + int failureCount = 0; + + for (var entry : presentTuples.entrySet()) { + final var id = entry.getKey(); + final var tuples = entry.getValue(); + final var bulkItems = idsToBulkItems.get(id); + + BulkItemResponseToFailedFlag selected = selectBest(bulkItems, id); + + if (tuples != null) { + log.debug("Found {} tuple(s) for ID {}", tuples.size(), id); + for (Tuple t : tuples) { + if (selected.failed) { + failureCount++; + } else { + ackCount++; + } + action.handle(id, t, selected); + } + } else { + log.warn("Could not find unacked tuples for {}", id); + } + } + + log.info( + "Bulk response [{}] : items {}, waitAck {}, acked {}, failed {}", + executionId, + idsToBulkItems.size(), + estimatedSize, + ackCount, + failureCount); + + if (debugInfo != null) { + for (String k : debugInfo) { + log.debug("Still in wait ack after bulk response [{}] => {}", executionId, k); + } + } + } + + /** Processes a failed bulk request by failing all associated tuples. */ + public void processFailedBulk( + BulkRequest request, + long executionId, + Throwable failure, + Consumer failAction) { + + log.error("Exception with bulk {} - failing the whole lot ", executionId, failure); + + final var failedIds = + request.requests().stream() + .map(DocWriteRequest::id) + .collect(Collectors.toUnmodifiableSet()); + + Map> failedTupleLists; + lock.lock(); + try { + failedTupleLists = cache.getAllPresent(failedIds); + if (!failedTupleLists.isEmpty()) { + cache.invalidateAll(failedTupleLists.keySet()); + } + } finally { + lock.unlock(); + } + + for (var id : failedIds) { + var tuples = failedTupleLists.get(id); + if (tuples != null) { + log.debug("Failed {} tuple(s) for ID {}", tuples.size(), id); + for (Tuple t : tuples) { + failAction.accept(t); + } + } else { + log.warn("Could not find unacked tuple for {}", id); + } + } + } + + /** + * Selects the best response when there are multiple bulk items for the same document ID. + * Prefers non-failed responses; warns when there is a mix of success and failure. + */ + private BulkItemResponseToFailedFlag selectBest( + List items, String id) { + if (items.size() == 1) { + return items.get(0); + } + + BulkItemResponseToFailedFlag best = items.get(0); + int failedCount = 0; + for (var item : items) { + if (item.failed) { + failedCount++; + } else { + best = item; + } + } + if (failedCount > 0 && failedCount < items.size()) { + log.warn( + "The id {} would result in an ack and a failure." + + " Using only the ack for processing.", + id); + } + return best; + } +} diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java index c67b90951..f09b375a8 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java @@ -17,37 +17,21 @@ package org.apache.stormcrawler.opensearch.bolt; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.RemovalCause; -import com.github.benmanes.caffeine.cache.RemovalListener; import java.lang.invoke.MethodHandles; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import org.apache.stormcrawler.Metadata; -import org.apache.stormcrawler.opensearch.BulkItemResponseToFailedFlag; import org.apache.stormcrawler.opensearch.OpenSearchConnection; +import org.apache.stormcrawler.opensearch.WaitAckCache; import org.apache.stormcrawler.util.ConfUtils; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.opensearch.action.DocWriteRequest; -import org.opensearch.action.bulk.BulkItemResponse; import org.opensearch.action.bulk.BulkProcessor.Listener; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.delete.DeleteRequest; -import org.opensearch.core.rest.RestStatus; import org.slf4j.LoggerFactory; /** @@ -56,8 +40,7 @@ * will also try to delete documents even though they were never indexed and it currently won't * delete documents which were indexed under the canonical URL. */ -public class DeletionBolt extends BaseRichBolt - implements RemovalListener>, Listener { +public class DeletionBolt extends BaseRichBolt implements Listener { static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -70,10 +53,7 @@ public class DeletionBolt extends BaseRichBolt private OpenSearchConnection connection; - private Cache> waitAck; - - // Be fair due to cache timeout - private final ReentrantLock waitAckLock = new ReentrantLock(true); + private WaitAckCache waitAck; public DeletionBolt() {} @@ -97,29 +77,8 @@ public void prepare( throw new RuntimeException(e1); } - waitAck = - Caffeine.newBuilder() - .expireAfterWrite(60, TimeUnit.SECONDS) - .removalListener(this) - .build(); - - context.registerMetric("waitAck", () -> waitAck.estimatedSize(), 10); - } - - public void onRemoval( - @Nullable String key, @Nullable List value, @NotNull RemovalCause cause) { - if (!cause.wasEvicted()) { - return; - } - if (value != null) { - LOG.error("Purged from waitAck {} with {} values", key, value.size()); - for (Tuple t : value) { - _collector.fail(t); - } - } else { - // This should never happen, but log it anyway. - LOG.error("Purged from waitAck {} with no values", key); - } + waitAck = new WaitAckCache(LOG, _collector::fail); + waitAck.registerMetric(context, "waitAck", 10); } @Override @@ -141,18 +100,7 @@ public void execute(Tuple tuple) { DeleteRequest dr = new DeleteRequest(getIndexName(metadata), docID); connection.addToProcessor(dr); - waitAckLock.lock(); - try { - List tt = waitAck.getIfPresent(docID); - if (tt == null) { - tt = new LinkedList<>(); - waitAck.put(docID, tt); - } - tt.add(tuple); - LOG.debug("Added to waitAck {} with ID {} total {}", url, docID, tt.size()); - } finally { - waitAckLock.unlock(); - } + waitAck.addTuple(docID, tuple); } @Override @@ -184,135 +132,27 @@ public void beforeBulk(long executionId, BulkRequest request) {} @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - var idsToBulkItemsWithFailedFlag = - Arrays.stream(response.getItems()) - .map( - bir -> { - String id = bir.getId(); - BulkItemResponse.Failure f = bir.getFailure(); - boolean failed = false; - if (f != null) { - if (f.getStatus().equals(RestStatus.CONFLICT)) { - LOG.debug("Doc conflict ID {}", id); - } else { - failed = true; - } - } - return new BulkItemResponseToFailedFlag(bir, failed); - }) - .collect( - // https://github.com/apache/stormcrawler/issues/832 - Collectors.groupingBy( - idWithFailedFlagTuple -> idWithFailedFlagTuple.id, - Collectors.toUnmodifiableList())); - Map> presentTuples; - long estimatedSize; - waitAckLock.lock(); - try { - presentTuples = waitAck.getAllPresent(idsToBulkItemsWithFailedFlag.keySet()); - if (!presentTuples.isEmpty()) { - waitAck.invalidateAll(presentTuples.keySet()); - } - estimatedSize = waitAck.estimatedSize(); - } finally { - waitAckLock.unlock(); - } - - int ackCount = 0; - int failureCount = 0; - - for (var entry : presentTuples.entrySet()) { - final var id = entry.getKey(); - final var associatedTuple = entry.getValue(); - final var bulkItemsWithFailedFlag = idsToBulkItemsWithFailedFlag.get(id); - - BulkItemResponseToFailedFlag selected; - - if (bulkItemsWithFailedFlag.size() == 1) { - selected = bulkItemsWithFailedFlag.get(0); - } else { - // Fallback if there are multiple responses for the same id - BulkItemResponseToFailedFlag tmp = null; - var ctFailed = 0; - for (var buwff : bulkItemsWithFailedFlag) { - if (tmp == null) { - tmp = buwff; - } - if (buwff.failed) { - ctFailed++; - } else { - tmp = buwff; - } - } - if (ctFailed != bulkItemsWithFailedFlag.size()) { - LOG.warn( - "The id {} would result in an ack and a failure. Using only the ack for processing.", - id); - } - selected = Objects.requireNonNull(tmp); - } - - if (associatedTuple != null) { - LOG.debug("Found {} tuple(s) for ID {}", associatedTuple.size(), id); - for (Tuple t : associatedTuple) { - String url = (String) t.getValueByField("url"); - - Metadata metadata = (Metadata) t.getValueByField("metadata"); - + waitAck.processBulkResponse( + response, + executionId, + null, + (id, t, selected) -> { if (!selected.failed) { - ackCount++; _collector.ack(t); } else { - failureCount++; - var failure = selected.getFailure(); - LOG.error("update ID {}, URL {}, failure: {}", id, url, failure); + String url = (String) t.getValueByField("url"); + LOG.error( + "update ID {}, URL {}, failure: {}", + id, + url, + selected.getFailure()); _collector.fail(t); } - } - } else { - LOG.warn("Could not find unacked tuples for {}", entry.getKey()); - } - } - - LOG.info( - "Bulk response [{}] : items {}, waitAck {}, acked {}, failed {}", - executionId, - idsToBulkItemsWithFailedFlag.size(), - estimatedSize, - ackCount, - failureCount); + }); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - LOG.error("Exception with bulk {} - failing the whole lot ", executionId, failure); - - final var failedIds = - request.requests().stream() - .map(DocWriteRequest::id) - .collect(Collectors.toUnmodifiableSet()); - Map> failedTupleLists; - waitAckLock.lock(); - try { - failedTupleLists = waitAck.getAllPresent(failedIds); - if (!failedTupleLists.isEmpty()) { - waitAck.invalidateAll(failedTupleLists.keySet()); - } - } finally { - waitAckLock.unlock(); - } - - for (var id : failedIds) { - var failedTuples = failedTupleLists.get(id); - if (failedTuples != null) { - LOG.debug("Failed {} tuple(s) for ID {}", failedTuples.size(), id); - for (Tuple x : failedTuples) { - // fail it - _collector.fail(x); - } - } else { - LOG.warn("Could not find unacked tuple for {}", id); - } - } + waitAck.processFailedBulk(request, executionId, failure, _collector::fail); } } diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java index 04de31cae..29dbcc62a 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java @@ -20,22 +20,9 @@ import static org.apache.stormcrawler.Constants.StatusStreamName; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.RemovalCause; -import com.github.benmanes.caffeine.cache.RemovalListener; import java.io.IOException; -import java.util.Arrays; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.storm.metric.api.MultiCountMetric; import org.apache.storm.metric.api.MultiReducedMetric; @@ -46,16 +33,12 @@ import org.apache.stormcrawler.Constants; import org.apache.stormcrawler.Metadata; import org.apache.stormcrawler.indexing.AbstractIndexerBolt; -import org.apache.stormcrawler.opensearch.BulkItemResponseToFailedFlag; import org.apache.stormcrawler.opensearch.IndexCreation; import org.apache.stormcrawler.opensearch.OpenSearchConnection; +import org.apache.stormcrawler.opensearch.WaitAckCache; import org.apache.stormcrawler.persistence.Status; import org.apache.stormcrawler.util.ConfUtils; import org.apache.stormcrawler.util.PerSecondReducer; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.opensearch.action.DocWriteRequest; -import org.opensearch.action.bulk.BulkItemResponse; import org.opensearch.action.bulk.BulkProcessor; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; @@ -69,8 +52,7 @@ * Sends documents to opensearch. Indexes all the fields from the tuples or a Map * <String,Object> from a named field. */ -public class IndexerBolt extends AbstractIndexerBolt - implements RemovalListener>, BulkProcessor.Listener { +public class IndexerBolt extends AbstractIndexerBolt implements BulkProcessor.Listener { private static final Logger LOG = LoggerFactory.getLogger(IndexerBolt.class); @@ -99,10 +81,7 @@ public class IndexerBolt extends AbstractIndexerBolt private MultiReducedMetric perSecMetrics; - private Cache> waitAck; - - // Be fair due to cache timeout - private final ReentrantLock waitAckLock = new ReentrantLock(true); + private WaitAckCache waitAck; public IndexerBolt() {} @@ -138,13 +117,8 @@ public void prepare( new MultiReducedMetric(new PerSecondReducer()), 10); - waitAck = - Caffeine.newBuilder() - .expireAfterWrite(60, TimeUnit.SECONDS) - .removalListener(this) - .build(); - - context.registerMetric("waitAck", () -> waitAck.estimatedSize(), 10); + waitAck = new WaitAckCache(LOG, _collector::fail); + waitAck.registerMetric(context, "waitAck", 10); // use the default status schema if none has been specified try { @@ -154,22 +128,6 @@ public void prepare( } } - public void onRemoval( - @Nullable String key, @Nullable List value, @NotNull RemovalCause cause) { - if (!cause.wasEvicted()) { - return; - } - if (value != null) { - LOG.error("Purged from waitAck {} with {} values", key, value.size()); - for (Tuple t : value) { - _collector.fail(t); - } - } else { - // This should never happen, but log it anyway. - LOG.error("Purged from waitAck {} with no values", key); - } - } - @Override public void cleanup() { if (connection != null) { @@ -249,29 +207,12 @@ public void execute(Tuple tuple) { eventCounter.scope("Indexed").incrBy(1); perSecMetrics.scope("Indexed").update(1); - waitAckLock.lock(); - try { - List tt = waitAck.getIfPresent(docID); - if (tt == null) { - tt = new LinkedList<>(); - waitAck.put(docID, tt); - } - tt.add(tuple); - LOG.debug("Added to waitAck {} with ID {} total {}", url, docID, tt.size()); - } finally { - waitAckLock.unlock(); - } + waitAck.addTuple(docID, tuple); } catch (IOException e) { LOG.error("Error building document for OpenSearch", e); // do not send to status stream so that it gets replayed _collector.fail(tuple); - - waitAckLock.lock(); - try { - waitAck.invalidate(docID); - } finally { - waitAckLock.unlock(); - } + waitAck.invalidate(docID); } } @@ -293,181 +234,53 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon eventCounter.scope("bulks_received").incrBy(1); eventCounter.scope("bulk_msec").incrBy(response.getTook().getMillis()); - var idsToBulkItemsWithFailedFlag = - Arrays.stream(response.getItems()) - .map( - bir -> { - String id = bir.getId(); - BulkItemResponse.Failure f = bir.getFailure(); - boolean failed = false; - if (f != null) { - if (f.getStatus().equals(RestStatus.CONFLICT)) { - eventCounter.scope("doc_conflicts").incrBy(1); - LOG.debug("Doc conflict ID {}", id); - } else { - failed = true; - } - } - return new BulkItemResponseToFailedFlag(bir, failed); - }) - .collect( - // https://github.com/apache/stormcrawler/issues/832 - Collectors.groupingBy( - idWithFailedFlagTuple -> idWithFailedFlagTuple.id, - Collectors.toUnmodifiableList())); - - Map> presentTuples; - long estimatedSize; - Set debugInfo = null; - waitAckLock.lock(); - try { - presentTuples = waitAck.getAllPresent(idsToBulkItemsWithFailedFlag.keySet()); - if (!presentTuples.isEmpty()) { - waitAck.invalidateAll(presentTuples.keySet()); - } - estimatedSize = waitAck.estimatedSize(); - // Only if we have to. - if (LOG.isDebugEnabled() && estimatedSize > 0L) { - debugInfo = new HashSet<>(waitAck.asMap().keySet()); - } - } finally { - waitAckLock.unlock(); - } - - int ackCount = 0; - int failureCount = 0; - - for (var entry : presentTuples.entrySet()) { - final var id = entry.getKey(); - final var associatedTuple = entry.getValue(); - final var bulkItemsWithFailedFlag = idsToBulkItemsWithFailedFlag.get(id); - - BulkItemResponseToFailedFlag selected; - - if (bulkItemsWithFailedFlag.size() == 1) { - selected = bulkItemsWithFailedFlag.get(0); - } else { - // Fallback if there are multiple responses for the same id - BulkItemResponseToFailedFlag tmp = null; - var ctFailed = 0; - for (var buwff : bulkItemsWithFailedFlag) { - if (tmp == null) { - tmp = buwff; - } - if (buwff.failed) { - ctFailed++; - } else { - tmp = buwff; - } - } - if (ctFailed != bulkItemsWithFailedFlag.size()) { - LOG.warn( - "The id {} would result in an ack and a failure. Using only the ack for processing.", - id); - } - selected = Objects.requireNonNull(tmp); - } - - if (associatedTuple != null) { - LOG.debug("Found {} tuple(s) for ID {}", associatedTuple.size(), id); - for (Tuple t : associatedTuple) { + waitAck.processBulkResponse( + response, + executionId, + eventCounter, + (id, t, selected) -> { String url = (String) t.getValueByField("url"); - Metadata metadata = (Metadata) t.getValueByField("metadata"); if (!selected.failed) { - ackCount++; _collector.emit( - StatusStreamName, t, new Values(url, metadata, Status.FETCHED)); + StatusStreamName, + t, + new Values(url, metadata, Status.FETCHED)); _collector.ack(t); } else { - failureCount++; var failure = selected.getFailure(); LOG.error("update ID {}, URL {}, failure: {}", id, url, failure); // there is something wrong with the content we should - // treat - // it as an ERROR - if (selected.getFailure().getStatus().equals(RestStatus.BAD_REQUEST)) { - metadata.setValue(Constants.STATUS_ERROR_SOURCE, "OpenSearch indexing"); - metadata.setValue(Constants.STATUS_ERROR_MESSAGE, "invalid content"); + // treat it as an ERROR + if (failure.getStatus().equals(RestStatus.BAD_REQUEST)) { + metadata.setValue( + Constants.STATUS_ERROR_SOURCE, "OpenSearch indexing"); + metadata.setValue( + Constants.STATUS_ERROR_MESSAGE, "invalid content"); _collector.emit( - StatusStreamName, t, new Values(url, metadata, Status.ERROR)); + StatusStreamName, + t, + new Values(url, metadata, Status.ERROR)); _collector.ack(t); - LOG.debug("Acked {} with ID {}", url, id); } else { - LOG.error("update ID {}, URL {}, failure: {}", id, url, failure); - // there is something wrong with the content we - // should - // treat - // it as an ERROR - if (failure.getStatus().equals(RestStatus.BAD_REQUEST)) { - metadata.setValue( - Constants.STATUS_ERROR_SOURCE, "OpenSearch indexing"); - metadata.setValue( - Constants.STATUS_ERROR_MESSAGE, "invalid content"); - _collector.emit( - StatusStreamName, - t, - new Values(url, metadata, Status.ERROR)); - _collector.ack(t); - } else { - // otherwise just fail it - _collector.fail(t); - } + _collector.fail(t); } } - } - } else { - LOG.warn("Could not find unacked tuples for {}", entry.getKey()); - } - } - - LOG.info( - "Bulk response [{}] : items {}, waitAck {}, acked {}, failed {}", - executionId, - idsToBulkItemsWithFailedFlag.size(), - estimatedSize, - ackCount, - failureCount); - if (debugInfo != null) { - for (String kinaw : debugInfo) { - LOG.debug("Still in wait ack after bulk response [{}] => {}", executionId, kinaw); - } - } + }); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { eventCounter.scope("bulks_received").incrBy(1); - LOG.error("Exception with bulk {} - failing the whole lot ", executionId, failure); - - final var failedIds = - request.requests().stream() - .map(DocWriteRequest::id) - .collect(Collectors.toUnmodifiableSet()); - Map> failedTupleLists; - waitAckLock.lock(); - try { - failedTupleLists = waitAck.getAllPresent(failedIds); - if (!failedTupleLists.isEmpty()) { - waitAck.invalidateAll(failedTupleLists.keySet()); - } - } finally { - waitAckLock.unlock(); - } - for (var id : failedIds) { - var failedTuples = failedTupleLists.get(id); - if (failedTuples != null) { - LOG.debug("Failed {} tuple(s) for ID {}", failedTuples.size(), id); - for (Tuple x : failedTuples) { - // fail it + waitAck.processFailedBulk( + request, + executionId, + failure, + t -> { eventCounter.scope("failed").incrBy(1); - _collector.fail(x); - } - } else { - LOG.warn("Could not find unacked tuple for {}", id); - } - } + _collector.fail(t); + }); } } diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java index bd178f7db..491641151 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java @@ -17,23 +17,11 @@ package org.apache.stormcrawler.opensearch.persistence; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.RemovalCause; -import com.github.benmanes.caffeine.cache.RemovalListener; import java.io.IOException; -import java.util.Arrays; import java.util.Date; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Objects; import java.util.Optional; -import java.util.Set; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.storm.metric.api.MultiCountMetric; import org.apache.storm.metric.api.MultiReducedMetric; @@ -41,25 +29,20 @@ import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.apache.stormcrawler.Metadata; -import org.apache.stormcrawler.opensearch.BulkItemResponseToFailedFlag; import org.apache.stormcrawler.opensearch.Constants; import org.apache.stormcrawler.opensearch.IndexCreation; import org.apache.stormcrawler.opensearch.OpenSearchConnection; +import org.apache.stormcrawler.opensearch.WaitAckCache; import org.apache.stormcrawler.persistence.AbstractStatusUpdaterBolt; import org.apache.stormcrawler.persistence.Status; import org.apache.stormcrawler.util.ConfUtils; import org.apache.stormcrawler.util.PerSecondReducer; import org.apache.stormcrawler.util.URLPartitioner; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.opensearch.action.DocWriteRequest; -import org.opensearch.action.bulk.BulkItemResponse; import org.opensearch.action.bulk.BulkProcessor; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.index.IndexRequest; import org.opensearch.common.xcontent.XContentFactory; -import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.XContentBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,7 +52,7 @@ * 'status' stream. To be used in combination with a Spout to read from the index. */ public class StatusUpdaterBolt extends AbstractStatusUpdaterBolt - implements RemovalListener>, BulkProcessor.Listener { + implements BulkProcessor.Listener { private static final Logger LOG = LoggerFactory.getLogger(StatusUpdaterBolt.class); @@ -95,10 +78,7 @@ public class StatusUpdaterBolt extends AbstractStatusUpdaterBolt private OpenSearchConnection connection; - private Cache> waitAck; - - // Be fair due to cache timeout - private final ReentrantLock waitAckLock = new ReentrantLock(true); + private WaitAckCache waitAck; private MultiCountMetric eventCounter; @@ -160,6 +140,12 @@ public void prepare( fieldNameForRoutingKey = fieldNameForRoutingKey.replaceAll("\\.", "%2E"); } + int metrics_time_bucket_secs = 30; + + this.eventCounter = + context.registerMetric( + "counters", new MultiCountMetric(), metrics_time_bucket_secs); + String defaultSpec = String.format( Locale.ROOT, @@ -169,12 +155,17 @@ public void prepare( String waitAckSpec = ConfUtils.getString(stormConf, "opensearch.status.waitack.cache.spec", defaultSpec); - waitAck = Caffeine.from(waitAckSpec).removalListener(this).build(); - - int metrics_time_bucket_secs = 30; + waitAck = + new WaitAckCache( + waitAckSpec, + LOG, + t -> { + eventCounter.scope("purged").incrBy(1); + collector.fail(t); + }); // create gauge for waitAck - context.registerMetric("waitAck", () -> waitAck.estimatedSize(), metrics_time_bucket_secs); + waitAck.registerMetric(context, "waitAck", metrics_time_bucket_secs); // benchmarking - average number of items received back by Elastic per second this.receivedPerSecMetrics = @@ -183,10 +174,6 @@ public void prepare( new MultiReducedMetric(new PerSecondReducer()), metrics_time_bucket_secs); - this.eventCounter = - context.registerMetric( - "counters", new MultiCountMetric(), metrics_time_bucket_secs); - try { connection = OpenSearchConnection.getConnection(stormConf, OSBoltType, this); } catch (Exception e1) { @@ -218,19 +205,8 @@ public void store( String documentID = getDocumentID(metadata, url); - boolean isAlreadySentAndDiscovered; - // need to synchronize: otherwise it might get added to the cache - // without having been sent to OpenSearch - waitAckLock.lock(); - try { - // check that the same URL is not being sent to OpenSearch - final var alreadySent = waitAck.getIfPresent(documentID); - isAlreadySentAndDiscovered = status.equals(Status.DISCOVERED) && alreadySent != null; - } finally { - waitAckLock.unlock(); - } - - if (isAlreadySentAndDiscovered) { + // check that the same URL is not being sent to OpenSearch + if (status.equals(Status.DISCOVERED) && waitAck.contains(documentID)) { // if this object is discovered - adding another version of it // won't make any difference LOG.debug( @@ -291,33 +267,13 @@ public void store( request.routing(partitionKey); } - waitAckLock.lock(); - try { - final List tt = waitAck.get(documentID, k -> new LinkedList<>()); - tt.add(tuple); - LOG.debug("Added to waitAck {} with ID {} total {}", url, documentID, tt.size()); - } finally { - waitAckLock.unlock(); - } + waitAck.addTuple(documentID, tuple); LOG.debug("Sending to OpenSearch buffer {} with ID {}", url, documentID); connection.addToProcessor(request); } - @Override - public void onRemoval( - @Nullable String key, @Nullable List value, @NotNull RemovalCause cause) { - if (!cause.wasEvicted()) { - return; - } - LOG.error("Purged from waitAck {} with {} values", key, value.size()); - for (Tuple t : value) { - eventCounter.scope("purged").incrBy(1); - collector.fail(t); - } - } - @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { LOG.debug("afterBulk [{}] with {} responses", executionId, request.numberOfActions()); @@ -326,115 +282,21 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon eventCounter.scope("received").incrBy(request.numberOfActions()); receivedPerSecMetrics.scope("received").update(request.numberOfActions()); - var idsToBulkItemsWithFailedFlag = - Arrays.stream(response.getItems()) - .map( - bir -> { - String id = bir.getId(); - BulkItemResponse.Failure f = bir.getFailure(); - boolean failed = false; - if (f != null) { - // already discovered - if (f.getStatus().equals(RestStatus.CONFLICT)) { - eventCounter.scope("doc_conflicts").incrBy(1); - LOG.debug("Doc conflict ID {}", id); - } else { - LOG.error("Update ID {}, failure: {}", id, f); - failed = true; - } - } - return new BulkItemResponseToFailedFlag(bir, failed); - }) - .collect( - // https://github.com/apache/stormcrawler/issues/832 - Collectors.groupingBy( - idWithFailedFlagTuple -> idWithFailedFlagTuple.id, - Collectors.toUnmodifiableList())); - - Map> presentTuples; - long estimatedSize; - Set debugInfo = null; - waitAckLock.lock(); - try { - presentTuples = waitAck.getAllPresent(idsToBulkItemsWithFailedFlag.keySet()); - if (!presentTuples.isEmpty()) { - waitAck.invalidateAll(presentTuples.keySet()); - } - estimatedSize = waitAck.estimatedSize(); - // Only if we have to. - if (LOG.isDebugEnabled() && estimatedSize > 0L) { - debugInfo = new HashSet<>(waitAck.asMap().keySet()); - } - } finally { - waitAckLock.unlock(); - } - - int ackCount = 0; - int failureCount = 0; - - for (var entry : presentTuples.entrySet()) { - final var id = entry.getKey(); - final var associatedTuple = entry.getValue(); - final var bulkItemsWithFailedFlag = idsToBulkItemsWithFailedFlag.get(id); - - BulkItemResponseToFailedFlag selected; - if (bulkItemsWithFailedFlag.size() == 1) { - selected = bulkItemsWithFailedFlag.get(0); - } else { - // Fallback if there are multiple responses for the same id - BulkItemResponseToFailedFlag tmp = null; - var ctFailed = 0; - for (var buwff : bulkItemsWithFailedFlag) { - if (tmp == null) { - tmp = buwff; - } - if (buwff.failed) { - ctFailed++; - } else { - tmp = buwff; - } - } - if (ctFailed != bulkItemsWithFailedFlag.size()) { - LOG.warn( - "The id {} would result in an ack and a failure. Using only the ack for processing.", - id); - } - selected = Objects.requireNonNull(tmp); - } - - if (associatedTuple != null) { - LOG.debug("Acked {} tuple(s) for ID {}", associatedTuple.size(), id); - for (Tuple tuple : associatedTuple) { + waitAck.processBulkResponse( + response, + executionId, + eventCounter, + (id, t, selected) -> { if (!selected.failed) { - String url = tuple.getStringByField("url"); - ackCount++; - // ack and put in cache + String url = t.getStringByField("url"); LOG.debug("Acked {} with ID {}", url, id); eventCounter.scope("acked").incrBy(1); - super.ack(tuple, url); + ack(t, url); } else { - failureCount++; eventCounter.scope("failed").incrBy(1); - collector.fail(tuple); + collector.fail(t); } - } - } else { - LOG.warn("Could not find unacked tuple for {}", id); - } - } - - LOG.info( - "Bulk response [{}] : items {}, waitAck {}, acked {}, failed {}", - executionId, - idsToBulkItemsWithFailedFlag.size(), - estimatedSize, - ackCount, - failureCount); - if (debugInfo != null) { - for (String kinaw : debugInfo) { - LOG.debug("Still in wait ack after bulk response [{}] => {}", executionId, kinaw); - } - } + }); } @Override @@ -442,36 +304,15 @@ public void afterBulk(long executionId, BulkRequest request, Throwable throwable eventCounter.scope("bulks_received").incrBy(1); eventCounter.scope("received").incrBy(request.numberOfActions()); receivedPerSecMetrics.scope("received").update(request.numberOfActions()); - LOG.error("Exception with bulk {} - failing the whole lot ", executionId, throwable); - - final var failedIds = - request.requests().stream() - .map(DocWriteRequest::id) - .collect(Collectors.toUnmodifiableSet()); - Map> failedTupleLists; - waitAckLock.lock(); - try { - failedTupleLists = waitAck.getAllPresent(failedIds); - if (!failedTupleLists.isEmpty()) { - waitAck.invalidateAll(failedTupleLists.keySet()); - } - } finally { - waitAckLock.unlock(); - } - for (var id : failedIds) { - var failedTuples = failedTupleLists.get(id); - if (failedTuples != null) { - LOG.debug("Failed {} tuple(s) for ID {}", failedTuples.size(), id); - for (Tuple x : failedTuples) { - // fail it + waitAck.processFailedBulk( + request, + executionId, + throwable, + t -> { eventCounter.scope("failed").incrBy(1); - collector.fail(x); - } - } else { - LOG.warn("Could not find unacked tuple for {}", id); - } - } + collector.fail(t); + }); } @Override diff --git a/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java b/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java new file mode 100644 index 000000000..23945c6a0 --- /dev/null +++ b/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.opensearch; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import org.apache.storm.metric.api.MultiCountMetric; +import org.apache.storm.tuple.Tuple; +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.bulk.BulkItemResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.rest.RestStatus; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.LoggerFactory; + +class WaitAckCacheTest { + + private WaitAckCache cache; + private List evicted; + private List acked; + private List failed; + + @BeforeEach + void setUp() { + evicted = new CopyOnWriteArrayList<>(); + acked = new ArrayList<>(); + failed = new ArrayList<>(); + cache = + new WaitAckCache( + LoggerFactory.getLogger(WaitAckCacheTest.class), evicted::add); + } + + private Tuple mockTuple(String url) { + Tuple t = mock(Tuple.class); + when(t.getValueByField("url")).thenReturn(url); + when(t.getStringByField("url")).thenReturn(url); + return t; + } + + private static ShardId shardId() { + return new ShardId("index", "_na_", 0); + } + + private static BulkItemResponse successItem(int itemId, String docId) { + IndexResponse indexResponse = + new IndexResponse(shardId(), docId, 1, 1, 1, true); + return new BulkItemResponse(itemId, DocWriteRequest.OpType.INDEX, indexResponse); + } + + private static BulkItemResponse failedItem(int itemId, String docId, RestStatus status) { + BulkItemResponse.Failure failure = + new BulkItemResponse.Failure( + "index", docId, new Exception("test failure"), status); + return new BulkItemResponse(itemId, DocWriteRequest.OpType.INDEX, failure); + } + + private static BulkResponse bulkResponse(BulkItemResponse... items) { + return new BulkResponse(items, 10L); + } + + @Test + void addAndContains() { + Tuple t = mockTuple("http://example.com"); + assertFalse(cache.contains("doc1")); + + cache.addTuple("doc1", t); + assertTrue(cache.contains("doc1")); + assertEquals(1, cache.estimatedSize()); + } + + @Test + void invalidateRemovesEntry() { + Tuple t = mockTuple("http://example.com"); + cache.addTuple("doc1", t); + assertTrue(cache.contains("doc1")); + + cache.invalidate("doc1"); + assertFalse(cache.contains("doc1")); + } + + @Test + void processBulkResponse_successfulItem_ackedViaTupleAction() { + Tuple t = mockTuple("http://example.com"); + cache.addTuple("doc1", t); + + BulkResponse response = bulkResponse(successItem(0, "doc1")); + + cache.processBulkResponse( + response, + 1L, + null, + (id, tuple, selected) -> { + if (!selected.failed) { + acked.add(tuple); + } else { + failed.add(tuple); + } + }); + + assertEquals(1, acked.size()); + assertEquals(0, failed.size()); + assertSame(t, acked.get(0)); + assertFalse(cache.contains("doc1")); + } + + @Test + void processBulkResponse_failedItem_failedViaTupleAction() { + Tuple t = mockTuple("http://example.com"); + cache.addTuple("doc1", t); + + BulkResponse response = + bulkResponse(failedItem(0, "doc1", RestStatus.INTERNAL_SERVER_ERROR)); + + cache.processBulkResponse( + response, + 1L, + null, + (id, tuple, selected) -> { + if (!selected.failed) { + acked.add(tuple); + } else { + failed.add(tuple); + } + }); + + assertEquals(0, acked.size()); + assertEquals(1, failed.size()); + assertSame(t, failed.get(0)); + } + + @Test + void processBulkResponse_conflictIsNotAFailure() { + Tuple t = mockTuple("http://example.com"); + cache.addTuple("doc1", t); + + MultiCountMetric counter = new MultiCountMetric(); + BulkResponse response = bulkResponse(failedItem(0, "doc1", RestStatus.CONFLICT)); + + cache.processBulkResponse( + response, + 1L, + counter, + (id, tuple, selected) -> { + if (!selected.failed) { + acked.add(tuple); + } else { + failed.add(tuple); + } + }); + + assertEquals(1, acked.size()); + assertEquals(0, failed.size()); + } + + @Test + void processBulkResponse_multipleTuplesForSameDocId() { + Tuple t1 = mockTuple("http://example.com/1"); + Tuple t2 = mockTuple("http://example.com/2"); + cache.addTuple("doc1", t1); + cache.addTuple("doc1", t2); + + BulkResponse response = bulkResponse(successItem(0, "doc1")); + + cache.processBulkResponse( + response, + 1L, + null, + (id, tuple, selected) -> acked.add(tuple)); + + assertEquals(2, acked.size()); + assertTrue(acked.contains(t1)); + assertTrue(acked.contains(t2)); + } + + @Test + void processBulkResponse_duplicateDocIdInBulk_prefersSuccess() { + // https://github.com/apache/stormcrawler/issues/832 + Tuple t = mockTuple("http://example.com"); + cache.addTuple("doc1", t); + + BulkResponse response = + bulkResponse( + failedItem(0, "doc1", RestStatus.INTERNAL_SERVER_ERROR), + successItem(1, "doc1")); + + cache.processBulkResponse( + response, + 1L, + null, + (id, tuple, selected) -> { + if (!selected.failed) { + acked.add(tuple); + } else { + failed.add(tuple); + } + }); + + assertEquals(1, acked.size()); + assertEquals(0, failed.size()); + } + + @Test + void processFailedBulk_failsAllMatchingTuples() { + Tuple t1 = mockTuple("http://example.com/1"); + Tuple t2 = mockTuple("http://example.com/2"); + cache.addTuple("doc1", t1); + cache.addTuple("doc2", t2); + + BulkRequest request = new BulkRequest(); + request.add(new DeleteRequest("index", "doc1")); + request.add(new DeleteRequest("index", "doc2")); + + cache.processFailedBulk( + request, + 1L, + new Exception("connection lost"), + failed::add); + + assertEquals(2, failed.size()); + assertTrue(failed.contains(t1)); + assertTrue(failed.contains(t2)); + assertFalse(cache.contains("doc1")); + assertFalse(cache.contains("doc2")); + } + + @Test + void processFailedBulk_ignoresMissingIds() { + Tuple t = mockTuple("http://example.com"); + cache.addTuple("doc1", t); + + BulkRequest request = new BulkRequest(); + request.add(new DeleteRequest("index", "doc_unknown")); + + cache.processFailedBulk( + request, + 1L, + new Exception("test"), + failed::add); + + assertEquals(0, failed.size()); + // doc1 should still be in cache since it wasn't in the failed request + assertTrue(cache.contains("doc1")); + } + + @Test + void eviction_failsTuplesOnExpiry() { + cache = + new WaitAckCache( + "expireAfterWrite=1s", + LoggerFactory.getLogger(WaitAckCacheTest.class), + evicted::add); + Tuple t = mockTuple("http://example.com"); + cache.addTuple("doc1", t); + + // Force cache maintenance after expiry by doing a contains() check + // which accesses the cache and triggers Caffeine's cleanup + await().atMost(5, TimeUnit.SECONDS) + .pollInterval(200, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + // contains() accesses the cache which triggers cleanup + cache.contains("doc1"); + // also try adding and invalidating a dummy entry to force maintenance + Tuple dummy = mockTuple("http://dummy"); + cache.addTuple("_probe_", dummy); + cache.invalidate("_probe_"); + assertFalse(evicted.isEmpty(), "Eviction callback should have fired"); + }); + + assertTrue(evicted.contains(t)); + } + + @Test + void processBulkResponse_multipleDocIds() { + Tuple t1 = mockTuple("http://example.com/1"); + Tuple t2 = mockTuple("http://example.com/2"); + cache.addTuple("doc1", t1); + cache.addTuple("doc2", t2); + + BulkResponse response = + bulkResponse( + successItem(0, "doc1"), + failedItem(1, "doc2", RestStatus.INTERNAL_SERVER_ERROR)); + + cache.processBulkResponse( + response, + 1L, + null, + (id, tuple, selected) -> { + if (!selected.failed) { + acked.add(tuple); + } else { + failed.add(tuple); + } + }); + + assertEquals(1, acked.size()); + assertSame(t1, acked.get(0)); + assertEquals(1, failed.size()); + assertSame(t2, failed.get(0)); + } +} From 98a370ff2f6745e333b50f9659b61d3da5314557 Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Sat, 4 Apr 2026 15:00:56 +0100 Subject: [PATCH 2/7] Fix formatting Signed-off-by: Julien Nioche --- .../stormcrawler/opensearch/WaitAckCache.java | 19 +++-------- .../opensearch/bolt/IndexerBolt.java | 14 +++----- .../persistence/StatusUpdaterBolt.java | 3 +- .../opensearch/WaitAckCacheTest.java | 33 +++++-------------- 4 files changed, 18 insertions(+), 51 deletions(-) diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/WaitAckCache.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/WaitAckCache.java index 768a4e788..1ef781028 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/WaitAckCache.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/WaitAckCache.java @@ -68,8 +68,7 @@ public WaitAckCache(String cacheSpec, Logger log, Consumer onEviction) { this(Caffeine.from(cacheSpec), log, onEviction); } - private WaitAckCache( - Caffeine builder, Logger log, Consumer onEviction) { + private WaitAckCache(Caffeine builder, Logger log, Consumer onEviction) { this.log = log; this.cache = builder.>removalListener( @@ -159,16 +158,12 @@ public void processBulkResponse( if (f != null) { if (f.getStatus().equals(RestStatus.CONFLICT)) { if (conflictCounter != null) { - conflictCounter - .scope("doc_conflicts") - .incrBy(1); + conflictCounter.scope("doc_conflicts").incrBy(1); } log.debug("Doc conflict ID {}", bir.getId()); } else { log.error( - "Bulk item failure ID {}: {}", - bir.getId(), - f); + "Bulk item failure ID {}: {}", bir.getId(), f); failed = true; } } @@ -176,8 +171,7 @@ public void processBulkResponse( }) .collect( // https://github.com/apache/stormcrawler/issues/832 - Collectors.groupingBy( - b -> b.id, Collectors.toUnmodifiableList())); + Collectors.groupingBy(b -> b.id, Collectors.toUnmodifiableList())); Map> presentTuples; long estimatedSize; @@ -238,10 +232,7 @@ public void processBulkResponse( /** Processes a failed bulk request by failing all associated tuples. */ public void processFailedBulk( - BulkRequest request, - long executionId, - Throwable failure, - Consumer failAction) { + BulkRequest request, long executionId, Throwable failure, Consumer failAction) { log.error("Exception with bulk {} - failing the whole lot ", executionId, failure); diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java index 29dbcc62a..870720dc5 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java @@ -244,9 +244,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon if (!selected.failed) { _collector.emit( - StatusStreamName, - t, - new Values(url, metadata, Status.FETCHED)); + StatusStreamName, t, new Values(url, metadata, Status.FETCHED)); _collector.ack(t); } else { var failure = selected.getFailure(); @@ -254,14 +252,10 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon // there is something wrong with the content we should // treat it as an ERROR if (failure.getStatus().equals(RestStatus.BAD_REQUEST)) { - metadata.setValue( - Constants.STATUS_ERROR_SOURCE, "OpenSearch indexing"); - metadata.setValue( - Constants.STATUS_ERROR_MESSAGE, "invalid content"); + metadata.setValue(Constants.STATUS_ERROR_SOURCE, "OpenSearch indexing"); + metadata.setValue(Constants.STATUS_ERROR_MESSAGE, "invalid content"); _collector.emit( - StatusStreamName, - t, - new Values(url, metadata, Status.ERROR)); + StatusStreamName, t, new Values(url, metadata, Status.ERROR)); _collector.ack(t); } else { _collector.fail(t); diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java index 491641151..af178ea51 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java @@ -51,8 +51,7 @@ * Simple bolt which stores the status of URLs into OpenSearch. Takes the tuples coming from the * 'status' stream. To be used in combination with a Spout to read from the index. */ -public class StatusUpdaterBolt extends AbstractStatusUpdaterBolt - implements BulkProcessor.Listener { +public class StatusUpdaterBolt extends AbstractStatusUpdaterBolt implements BulkProcessor.Listener { private static final Logger LOG = LoggerFactory.getLogger(StatusUpdaterBolt.class); diff --git a/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java b/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java index 23945c6a0..fcb3d91d8 100644 --- a/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java +++ b/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java @@ -28,8 +28,9 @@ import java.util.concurrent.TimeUnit; import org.apache.storm.metric.api.MultiCountMetric; import org.apache.storm.tuple.Tuple; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.opensearch.action.DocWriteRequest; -import org.opensearch.action.DocWriteResponse; import org.opensearch.action.bulk.BulkItemResponse; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; @@ -37,8 +38,6 @@ import org.opensearch.action.index.IndexResponse; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.rest.RestStatus; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.slf4j.LoggerFactory; class WaitAckCacheTest { @@ -53,9 +52,7 @@ void setUp() { evicted = new CopyOnWriteArrayList<>(); acked = new ArrayList<>(); failed = new ArrayList<>(); - cache = - new WaitAckCache( - LoggerFactory.getLogger(WaitAckCacheTest.class), evicted::add); + cache = new WaitAckCache(LoggerFactory.getLogger(WaitAckCacheTest.class), evicted::add); } private Tuple mockTuple(String url) { @@ -70,15 +67,13 @@ private static ShardId shardId() { } private static BulkItemResponse successItem(int itemId, String docId) { - IndexResponse indexResponse = - new IndexResponse(shardId(), docId, 1, 1, 1, true); + IndexResponse indexResponse = new IndexResponse(shardId(), docId, 1, 1, 1, true); return new BulkItemResponse(itemId, DocWriteRequest.OpType.INDEX, indexResponse); } private static BulkItemResponse failedItem(int itemId, String docId, RestStatus status) { BulkItemResponse.Failure failure = - new BulkItemResponse.Failure( - "index", docId, new Exception("test failure"), status); + new BulkItemResponse.Failure("index", docId, new Exception("test failure"), status); return new BulkItemResponse(itemId, DocWriteRequest.OpType.INDEX, failure); } @@ -189,11 +184,7 @@ void processBulkResponse_multipleTuplesForSameDocId() { BulkResponse response = bulkResponse(successItem(0, "doc1")); - cache.processBulkResponse( - response, - 1L, - null, - (id, tuple, selected) -> acked.add(tuple)); + cache.processBulkResponse(response, 1L, null, (id, tuple, selected) -> acked.add(tuple)); assertEquals(2, acked.size()); assertTrue(acked.contains(t1)); @@ -238,11 +229,7 @@ void processFailedBulk_failsAllMatchingTuples() { request.add(new DeleteRequest("index", "doc1")); request.add(new DeleteRequest("index", "doc2")); - cache.processFailedBulk( - request, - 1L, - new Exception("connection lost"), - failed::add); + cache.processFailedBulk(request, 1L, new Exception("connection lost"), failed::add); assertEquals(2, failed.size()); assertTrue(failed.contains(t1)); @@ -259,11 +246,7 @@ void processFailedBulk_ignoresMissingIds() { BulkRequest request = new BulkRequest(); request.add(new DeleteRequest("index", "doc_unknown")); - cache.processFailedBulk( - request, - 1L, - new Exception("test"), - failed::add); + cache.processFailedBulk(request, 1L, new Exception("test"), failed::add); assertEquals(0, failed.size()); // doc1 should still be in cache since it wasn't in the failed request From 7788d48c9645df8948b9ca50ab9d00929f4b09ff Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Sat, 4 Apr 2026 15:21:01 +0100 Subject: [PATCH 3/7] checkstyle fix Signed-off-by: Julien Nioche --- .../org/apache/stormcrawler/opensearch/WaitAckCacheTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java b/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java index fcb3d91d8..899a5dc63 100644 --- a/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java +++ b/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java @@ -18,7 +18,9 @@ package org.apache.stormcrawler.opensearch; import static org.awaitility.Awaitility.await; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; From 55ff901244520588fae4b157905ba0f0937e35bf Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Sat, 4 Apr 2026 15:24:57 +0100 Subject: [PATCH 4/7] checkstyle fix (again - should go back to eating chocolate instead of working in my spare time) Signed-off-by: Julien Nioche --- .../org/apache/stormcrawler/opensearch/WaitAckCacheTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java b/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java index 899a5dc63..78dcf3edb 100644 --- a/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java +++ b/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java @@ -20,6 +20,7 @@ import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; From 6521e7f1514d4a7225c3d0d27c389e7063fb1113 Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Thu, 9 Apr 2026 08:10:42 +0100 Subject: [PATCH 5/7] Addressed comments by reviewer Signed-off-by: Julien Nioche --- .../stormcrawler/opensearch/WaitAckCache.java | 36 ++++++++++++++- .../opensearch/bolt/DeletionBolt.java | 1 + .../opensearch/bolt/IndexerBolt.java | 1 + .../persistence/StatusUpdaterBolt.java | 1 + .../opensearch/WaitAckCacheTest.java | 45 ++++++++++++------- 5 files changed, 67 insertions(+), 17 deletions(-) diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/WaitAckCache.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/WaitAckCache.java index 1ef781028..7de117d4d 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/WaitAckCache.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/WaitAckCache.java @@ -20,6 +20,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.Ticker; import java.util.Arrays; import java.util.HashSet; import java.util.LinkedList; @@ -57,6 +58,7 @@ public interface TupleAction { private final java.util.concurrent.locks.ReentrantLock lock = new java.util.concurrent.locks.ReentrantLock(true); private final Logger log; + private final Consumer onEviction; /** Creates a cache with a fixed 60-second expiry. */ public WaitAckCache(Logger log, Consumer onEviction) { @@ -68,8 +70,14 @@ public WaitAckCache(String cacheSpec, Logger log, Consumer onEviction) { this(Caffeine.from(cacheSpec), log, onEviction); } + /** Creates a cache with a custom ticker for deterministic time control in tests. */ + WaitAckCache(String cacheSpec, Logger log, Consumer onEviction, Ticker ticker) { + this(Caffeine.from(cacheSpec).ticker(ticker).executor(Runnable::run), log, onEviction); + } + private WaitAckCache(Caffeine builder, Logger log, Consumer onEviction) { this.log = log; + this.onEviction = onEviction; this.cache = builder.>removalListener( (String key, List value, RemovalCause cause) -> { @@ -125,6 +133,31 @@ public boolean contains(String docID) { } } + /** Forces pending cache maintenance, triggering eviction listeners for expired entries. */ + public void cleanUp() { + cache.cleanUp(); + } + + /** Fails all remaining tuples in the cache and invalidates all entries. */ + public void shutdown() { + lock.lock(); + try { + Map> remaining = cache.asMap(); + for (var entry : remaining.entrySet()) { + log.warn( + "Shutdown: failing {} tuple(s) for ID {}", + entry.getValue().size(), + entry.getKey()); + for (Tuple t : entry.getValue()) { + onEviction.accept(t); + } + } + cache.invalidateAll(); + } finally { + lock.unlock(); + } + } + /** Invalidates a single cache entry. */ public void invalidate(String docID) { lock.lock(); @@ -267,7 +300,8 @@ public void processFailedBulk( /** * Selects the best response when there are multiple bulk items for the same document ID. - * Prefers non-failed responses; warns when there is a mix of success and failure. + * Prefers non-failed responses; warns when there is a mix of success and failure. If all items + * are failed, returns the first one (no warning logged since there is no ambiguity). */ private BulkItemResponseToFailedFlag selectBest( List items, String id) { diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java index f09b375a8..b0023506f 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java @@ -83,6 +83,7 @@ public void prepare( @Override public void cleanup() { + waitAck.shutdown(); if (connection != null) { connection.close(); } diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java index 870720dc5..cbf898ac3 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java @@ -130,6 +130,7 @@ public void prepare( @Override public void cleanup() { + waitAck.shutdown(); if (connection != null) { connection.close(); } diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java index af178ea51..c717910ec 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java @@ -190,6 +190,7 @@ public void prepare( @Override public void cleanup() { + waitAck.shutdown(); if (connection == null) { return; } diff --git a/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java b/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java index 78dcf3edb..4d95d32f1 100644 --- a/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java +++ b/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java @@ -17,7 +17,6 @@ package org.apache.stormcrawler.opensearch; -import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertSame; @@ -25,10 +24,12 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.github.benmanes.caffeine.cache.Ticker; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.storm.metric.api.MultiCountMetric; import org.apache.storm.tuple.Tuple; import org.junit.jupiter.api.BeforeEach; @@ -258,29 +259,25 @@ void processFailedBulk_ignoresMissingIds() { @Test void eviction_failsTuplesOnExpiry() { + AtomicLong fakeTime = new AtomicLong(0); + Ticker fakeTicker = fakeTime::get; cache = new WaitAckCache( "expireAfterWrite=1s", LoggerFactory.getLogger(WaitAckCacheTest.class), - evicted::add); + evicted::add, + fakeTicker); Tuple t = mockTuple("http://example.com"); cache.addTuple("doc1", t); - // Force cache maintenance after expiry by doing a contains() check - // which accesses the cache and triggers Caffeine's cleanup - await().atMost(5, TimeUnit.SECONDS) - .pollInterval(200, TimeUnit.MILLISECONDS) - .untilAsserted( - () -> { - // contains() accesses the cache which triggers cleanup - cache.contains("doc1"); - // also try adding and invalidating a dummy entry to force maintenance - Tuple dummy = mockTuple("http://dummy"); - cache.addTuple("_probe_", dummy); - cache.invalidate("_probe_"); - assertFalse(evicted.isEmpty(), "Eviction callback should have fired"); - }); + // Advance past the 1s expiry + fakeTime.set(TimeUnit.SECONDS.toNanos(2)); + // Access triggers expiration; cleanUp forces listener execution + cache.contains("doc1"); + cache.cleanUp(); + + assertFalse(evicted.isEmpty(), "Eviction callback should have fired"); assertTrue(evicted.contains(t)); } @@ -313,4 +310,20 @@ void processBulkResponse_multipleDocIds() { assertEquals(1, failed.size()); assertSame(t2, failed.get(0)); } + + @Test + void shutdown_failsAllRemainingTuples() { + Tuple t1 = mockTuple("http://example.com/1"); + Tuple t2 = mockTuple("http://example.com/2"); + cache.addTuple("doc1", t1); + cache.addTuple("doc2", t2); + + cache.shutdown(); + + assertEquals(2, evicted.size()); + assertTrue(evicted.contains(t1)); + assertTrue(evicted.contains(t2)); + assertFalse(cache.contains("doc1")); + assertFalse(cache.contains("doc2")); + } } From fb31ad8d19b2307283f5ad6ed084ef662a2bf7e1 Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Fri, 10 Apr 2026 10:31:59 +0100 Subject: [PATCH 6/7] Formatting Signed-off-by: Julien Nioche --- .../java/org/apache/stormcrawler/opensearch/WaitAckCache.java | 2 +- .../org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java | 3 ++- .../stormcrawler/opensearch/persistence/StatusUpdaterBolt.java | 1 - .../org/apache/stormcrawler/opensearch/WaitAckCacheTest.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/WaitAckCache.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/WaitAckCache.java index 95c788f2d..bdb9a8dca 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/WaitAckCache.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/WaitAckCache.java @@ -30,8 +30,8 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; -import org.apache.stormcrawler.metrics.ScopedCounter; import org.apache.storm.tuple.Tuple; +import org.apache.stormcrawler.metrics.ScopedCounter; import org.jetbrains.annotations.Nullable; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.bulk.BulkItemResponse; diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java index cc5ceecd2..d5f37be4b 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java @@ -137,7 +137,8 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon waitAck.processBulkResponse( response, executionId, - null, // no conflict counter — deletion conflicts are expected and only logged at debug + null, // no conflict counter — deletion conflicts are expected and only logged at + // debug (id, t, selected) -> { if (!selected.failed) { _collector.ack(t); diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java index 2bfd14bb5..22512975d 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java @@ -172,7 +172,6 @@ public void prepare( CrawlerMetrics.registerCounter( context, stormConf, "counters", metrics_time_bucket_secs); - try { connection = OpenSearchConnection.getConnection(stormConf, OSBoltType, this); } catch (Exception e1) { diff --git a/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java b/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java index 83747a50d..0fd22d181 100644 --- a/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java +++ b/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java @@ -30,8 +30,8 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import org.apache.stormcrawler.metrics.ScopedCounter; import org.apache.storm.tuple.Tuple; +import org.apache.stormcrawler.metrics.ScopedCounter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.opensearch.action.DocWriteRequest; From 28afa35d11dd8c2ce6ef0c152803eec2e6192345 Mon Sep 17 00:00:00 2001 From: Julien Nioche Date: Fri, 10 Apr 2026 10:39:59 +0100 Subject: [PATCH 7/7] Formatting Signed-off-by: Julien Nioche --- .../org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java index d5f37be4b..9295aa15f 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java @@ -137,8 +137,7 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon waitAck.processBulkResponse( response, executionId, - null, // no conflict counter — deletion conflicts are expected and only logged at - // debug + null, // no conflict counter — deletion conflicts are expected and only logged (id, t, selected) -> { if (!selected.failed) { _collector.ack(t);