diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/WaitAckCache.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/WaitAckCache.java new file mode 100644 index 000000000..bdb9a8dca --- /dev/null +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/WaitAckCache.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.opensearch; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.Ticker; +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.storm.tuple.Tuple; +import org.apache.stormcrawler.metrics.ScopedCounter; +import org.jetbrains.annotations.Nullable; +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.bulk.BulkItemResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.core.rest.RestStatus; +import org.slf4j.Logger; + +/** + * Thread-safe cache that tracks in-flight tuples awaiting bulk acknowledgment from OpenSearch. + * Provides shared logic for processing bulk responses and failing tuples on error, used by + * IndexerBolt, DeletionBolt, and StatusUpdaterBolt. + */ +public class WaitAckCache { + + /** Callback invoked for each tuple when processing a successful bulk response. */ + @FunctionalInterface + public interface TupleAction { + void handle(String id, Tuple tuple, BulkItemResponseToFailedFlag selected); + } + + private final Cache> cache; + private final java.util.concurrent.locks.ReentrantLock lock = + new java.util.concurrent.locks.ReentrantLock(true); + private final Logger log; + private final Consumer onEviction; + + /** Creates a cache with a fixed 60-second expiry. */ + public WaitAckCache(Logger log, Consumer onEviction) { + this(Caffeine.newBuilder().expireAfterWrite(60, TimeUnit.SECONDS), log, onEviction); + } + + /** Creates a cache from a Caffeine spec string (e.g. "expireAfterWrite=300s"). */ + public WaitAckCache(String cacheSpec, Logger log, Consumer onEviction) { + this(Caffeine.from(cacheSpec), log, onEviction); + } + + /** Creates a cache with a custom ticker for deterministic time control in tests. */ + WaitAckCache(String cacheSpec, Logger log, Consumer onEviction, Ticker ticker) { + this(Caffeine.from(cacheSpec).ticker(ticker).executor(Runnable::run), log, onEviction); + } + + private WaitAckCache(Caffeine builder, Logger log, Consumer onEviction) { + this.log = log; + this.onEviction = onEviction; + this.cache = + builder.>removalListener( + (String key, List value, RemovalCause cause) -> { + if (!cause.wasEvicted()) { + return; + } + if (value != null) { + log.error( + "Purged from waitAck {} with {} values", + key, + value.size()); + for (Tuple t : value) { + onEviction.accept(t); + } + } else { + log.error("Purged from waitAck {} with no values", key); + } + }) + .build(); + } + + public long estimatedSize() { + return cache.estimatedSize(); + } + + /** Adds a tuple to the cache under the given document ID, creating the list if needed. */ + public void addTuple(String docID, Tuple tuple) { + lock.lock(); + try { + List tt = cache.get(docID, k -> new LinkedList<>()); + tt.add(tuple); + if (log.isDebugEnabled()) { + String url = (String) tuple.getValueByField("url"); + log.debug("Added to waitAck {} with ID {} total {}", url, docID, tt.size()); + } + } finally { + lock.unlock(); + } + } + + /** Returns true if the cache contains an entry for the given document ID. */ + public boolean contains(String docID) { + lock.lock(); + try { + return cache.getIfPresent(docID) != null; + } finally { + lock.unlock(); + } + } + + /** Forces pending cache maintenance, triggering eviction listeners for expired entries. */ + public void cleanUp() { + cache.cleanUp(); + } + + /** Fails all remaining tuples in the cache and invalidates all entries. */ + public void shutdown() { + lock.lock(); + try { + Map> remaining = cache.asMap(); + for (var entry : remaining.entrySet()) { + log.warn( + "Shutdown: failing {} tuple(s) for ID {}", + entry.getValue().size(), + entry.getKey()); + for (Tuple t : entry.getValue()) { + onEviction.accept(t); + } + } + cache.invalidateAll(); + } finally { + lock.unlock(); + } + } + + /** Invalidates a single cache entry. */ + public void invalidate(String docID) { + lock.lock(); + try { + cache.invalidate(docID); + } finally { + lock.unlock(); + } + } + + /** + * Processes a successful bulk response: classifies each item (conflict vs failure), retrieves + * cached tuples, selects the best response per document ID, and invokes the action for each + * tuple. + * + * @param conflictCounter optional metric counter; if non-null, increments "doc_conflicts" scope + * for each conflict + */ + public void processBulkResponse( + BulkResponse response, + long executionId, + @Nullable ScopedCounter conflictCounter, + TupleAction action) { + + var idsToBulkItems = + Arrays.stream(response.getItems()) + .map( + bir -> { + BulkItemResponse.Failure f = bir.getFailure(); + boolean failed = false; + if (f != null) { + if (f.getStatus().equals(RestStatus.CONFLICT)) { + if (conflictCounter != null) { + conflictCounter.scope("doc_conflicts").incrBy(1); + } + log.debug("Doc conflict ID {}", bir.getId()); + } else { + log.error( + "Bulk item failure ID {}: {}", bir.getId(), f); + failed = true; + } + } + return new BulkItemResponseToFailedFlag(bir, failed); + }) + .collect( + // https://github.com/apache/stormcrawler/issues/832 + Collectors.groupingBy(b -> b.id, Collectors.toUnmodifiableList())); + + Map> presentTuples; + long estimatedSize; + Set debugInfo = null; + lock.lock(); + try { + presentTuples = cache.getAllPresent(idsToBulkItems.keySet()); + if (!presentTuples.isEmpty()) { + cache.invalidateAll(presentTuples.keySet()); + } + estimatedSize = cache.estimatedSize(); + if (log.isDebugEnabled() && estimatedSize > 0L) { + debugInfo = new HashSet<>(cache.asMap().keySet()); + } + } finally { + lock.unlock(); + } + + int ackCount = 0; + int failureCount = 0; + + for (var entry : presentTuples.entrySet()) { + final var id = entry.getKey(); + final var tuples = entry.getValue(); + final var bulkItems = idsToBulkItems.get(id); + + BulkItemResponseToFailedFlag selected = selectBest(bulkItems, id); + + if (tuples != null) { + log.debug("Found {} tuple(s) for ID {}", tuples.size(), id); + for (Tuple t : tuples) { + if (selected.failed) { + failureCount++; + } else { + ackCount++; + } + action.handle(id, t, selected); + } + } else { + log.warn("Could not find unacked tuples for {}", id); + } + } + + log.info( + "Bulk response [{}] : items {}, waitAck {}, acked {}, failed {}", + executionId, + idsToBulkItems.size(), + estimatedSize, + ackCount, + failureCount); + + if (debugInfo != null) { + for (String k : debugInfo) { + log.debug("Still in wait ack after bulk response [{}] => {}", executionId, k); + } + } + } + + /** Processes a failed bulk request by failing all associated tuples. */ + public void processFailedBulk( + BulkRequest request, long executionId, Throwable failure, Consumer failAction) { + + log.error("Exception with bulk {} - failing the whole lot ", executionId, failure); + + final var failedIds = + request.requests().stream() + .map(DocWriteRequest::id) + .collect(Collectors.toUnmodifiableSet()); + + Map> failedTupleLists; + lock.lock(); + try { + failedTupleLists = cache.getAllPresent(failedIds); + if (!failedTupleLists.isEmpty()) { + cache.invalidateAll(failedTupleLists.keySet()); + } + } finally { + lock.unlock(); + } + + for (var id : failedIds) { + var tuples = failedTupleLists.get(id); + if (tuples != null) { + log.debug("Failed {} tuple(s) for ID {}", tuples.size(), id); + for (Tuple t : tuples) { + failAction.accept(t); + } + } else { + log.warn("Could not find unacked tuple for {}", id); + } + } + } + + /** + * Selects the best response when there are multiple bulk items for the same document ID. + * Prefers non-failed responses; warns when there is a mix of success and failure. If all items + * are failed, returns the first one (no warning logged since there is no ambiguity). + */ + private BulkItemResponseToFailedFlag selectBest( + List items, String id) { + if (items.size() == 1) { + return items.get(0); + } + + BulkItemResponseToFailedFlag best = items.get(0); + int failedCount = 0; + for (var item : items) { + if (item.failed) { + failedCount++; + } else { + best = item; + } + } + if (failedCount > 0 && failedCount < items.size()) { + log.warn( + "The id {} would result in an ack and a failure." + + " Using only the ack for processing.", + id); + } + return best; + } +} diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java index f0fdec9ea..9295aa15f 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/DeletionBolt.java @@ -17,19 +17,8 @@ package org.apache.stormcrawler.opensearch.bolt; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.RemovalCause; -import com.github.benmanes.caffeine.cache.RemovalListener; import java.lang.invoke.MethodHandles; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -37,18 +26,13 @@ import org.apache.storm.tuple.Tuple; import org.apache.stormcrawler.Metadata; import org.apache.stormcrawler.metrics.CrawlerMetrics; -import org.apache.stormcrawler.opensearch.BulkItemResponseToFailedFlag; import org.apache.stormcrawler.opensearch.OpenSearchConnection; +import org.apache.stormcrawler.opensearch.WaitAckCache; import org.apache.stormcrawler.util.ConfUtils; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.opensearch.action.DocWriteRequest; -import org.opensearch.action.bulk.BulkItemResponse; import org.opensearch.action.bulk.BulkProcessor.Listener; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.delete.DeleteRequest; -import org.opensearch.core.rest.RestStatus; import org.slf4j.LoggerFactory; /** @@ -57,8 +41,7 @@ * will also try to delete documents even though they were never indexed and it currently won't * delete documents which were indexed under the canonical URL. */ -public class DeletionBolt extends BaseRichBolt - implements RemovalListener>, Listener { +public class DeletionBolt extends BaseRichBolt implements Listener { static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -71,10 +54,7 @@ public class DeletionBolt extends BaseRichBolt private OpenSearchConnection connection; - private Cache> waitAck; - - // Be fair due to cache timeout - private final ReentrantLock waitAckLock = new ReentrantLock(true); + private WaitAckCache waitAck; public DeletionBolt() {} @@ -98,33 +78,13 @@ public void prepare( throw new RuntimeException(e1); } - waitAck = - Caffeine.newBuilder() - .expireAfterWrite(60, TimeUnit.SECONDS) - .removalListener(this) - .build(); - + waitAck = new WaitAckCache(LOG, _collector::fail); CrawlerMetrics.registerGauge(context, conf, "waitAck", waitAck::estimatedSize, 10); } - public void onRemoval( - @Nullable String key, @Nullable List value, @NotNull RemovalCause cause) { - if (!cause.wasEvicted()) { - return; - } - if (value != null) { - LOG.error("Purged from waitAck {} with {} values", key, value.size()); - for (Tuple t : value) { - _collector.fail(t); - } - } else { - // This should never happen, but log it anyway. - LOG.error("Purged from waitAck {} with no values", key); - } - } - @Override public void cleanup() { + waitAck.shutdown(); if (connection != null) { connection.close(); } @@ -142,18 +102,7 @@ public void execute(Tuple tuple) { DeleteRequest dr = new DeleteRequest(getIndexName(metadata), docID); connection.addToProcessor(dr); - waitAckLock.lock(); - try { - List tt = waitAck.getIfPresent(docID); - if (tt == null) { - tt = new LinkedList<>(); - waitAck.put(docID, tt); - } - tt.add(tuple); - LOG.debug("Added to waitAck {} with ID {} total {}", url, docID, tt.size()); - } finally { - waitAckLock.unlock(); - } + waitAck.addTuple(docID, tuple); } @Override @@ -185,135 +134,27 @@ public void beforeBulk(long executionId, BulkRequest request) {} @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - var idsToBulkItemsWithFailedFlag = - Arrays.stream(response.getItems()) - .map( - bir -> { - String id = bir.getId(); - BulkItemResponse.Failure f = bir.getFailure(); - boolean failed = false; - if (f != null) { - if (f.getStatus().equals(RestStatus.CONFLICT)) { - LOG.debug("Doc conflict ID {}", id); - } else { - failed = true; - } - } - return new BulkItemResponseToFailedFlag(bir, failed); - }) - .collect( - // https://github.com/apache/stormcrawler/issues/832 - Collectors.groupingBy( - idWithFailedFlagTuple -> idWithFailedFlagTuple.id, - Collectors.toUnmodifiableList())); - Map> presentTuples; - long estimatedSize; - waitAckLock.lock(); - try { - presentTuples = waitAck.getAllPresent(idsToBulkItemsWithFailedFlag.keySet()); - if (!presentTuples.isEmpty()) { - waitAck.invalidateAll(presentTuples.keySet()); - } - estimatedSize = waitAck.estimatedSize(); - } finally { - waitAckLock.unlock(); - } - - int ackCount = 0; - int failureCount = 0; - - for (var entry : presentTuples.entrySet()) { - final var id = entry.getKey(); - final var associatedTuple = entry.getValue(); - final var bulkItemsWithFailedFlag = idsToBulkItemsWithFailedFlag.get(id); - - BulkItemResponseToFailedFlag selected; - - if (bulkItemsWithFailedFlag.size() == 1) { - selected = bulkItemsWithFailedFlag.get(0); - } else { - // Fallback if there are multiple responses for the same id - BulkItemResponseToFailedFlag tmp = null; - var ctFailed = 0; - for (var buwff : bulkItemsWithFailedFlag) { - if (tmp == null) { - tmp = buwff; - } - if (buwff.failed) { - ctFailed++; - } else { - tmp = buwff; - } - } - if (ctFailed != bulkItemsWithFailedFlag.size()) { - LOG.warn( - "The id {} would result in an ack and a failure. Using only the ack for processing.", - id); - } - selected = Objects.requireNonNull(tmp); - } - - if (associatedTuple != null) { - LOG.debug("Found {} tuple(s) for ID {}", associatedTuple.size(), id); - for (Tuple t : associatedTuple) { - String url = (String) t.getValueByField("url"); - - Metadata metadata = (Metadata) t.getValueByField("metadata"); - + waitAck.processBulkResponse( + response, + executionId, + null, // no conflict counter — deletion conflicts are expected and only logged + (id, t, selected) -> { if (!selected.failed) { - ackCount++; _collector.ack(t); } else { - failureCount++; - var failure = selected.getFailure(); - LOG.error("update ID {}, URL {}, failure: {}", id, url, failure); + String url = (String) t.getValueByField("url"); + LOG.error( + "update ID {}, URL {}, failure: {}", + id, + url, + selected.getFailure()); _collector.fail(t); } - } - } else { - LOG.warn("Could not find unacked tuples for {}", entry.getKey()); - } - } - - LOG.info( - "Bulk response [{}] : items {}, waitAck {}, acked {}, failed {}", - executionId, - idsToBulkItemsWithFailedFlag.size(), - estimatedSize, - ackCount, - failureCount); + }); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - LOG.error("Exception with bulk {} - failing the whole lot ", executionId, failure); - - final var failedIds = - request.requests().stream() - .map(DocWriteRequest::id) - .collect(Collectors.toUnmodifiableSet()); - Map> failedTupleLists; - waitAckLock.lock(); - try { - failedTupleLists = waitAck.getAllPresent(failedIds); - if (!failedTupleLists.isEmpty()) { - waitAck.invalidateAll(failedTupleLists.keySet()); - } - } finally { - waitAckLock.unlock(); - } - - for (var id : failedIds) { - var failedTuples = failedTupleLists.get(id); - if (failedTuples != null) { - LOG.debug("Failed {} tuple(s) for ID {}", failedTuples.size(), id); - for (Tuple x : failedTuples) { - // fail it - _collector.fail(x); - } - } else { - LOG.warn("Could not find unacked tuple for {}", id); - } - } + waitAck.processFailedBulk(request, executionId, failure, _collector::fail); } } diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java index 39b8bf053..6191f2c3a 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/bolt/IndexerBolt.java @@ -20,22 +20,9 @@ import static org.apache.stormcrawler.Constants.StatusStreamName; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.RemovalCause; -import com.github.benmanes.caffeine.cache.RemovalListener; import java.io.IOException; -import java.util.Arrays; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; @@ -47,15 +34,11 @@ import org.apache.stormcrawler.metrics.CrawlerMetrics; import org.apache.stormcrawler.metrics.ScopedCounter; import org.apache.stormcrawler.metrics.ScopedReducedMetric; -import org.apache.stormcrawler.opensearch.BulkItemResponseToFailedFlag; import org.apache.stormcrawler.opensearch.IndexCreation; import org.apache.stormcrawler.opensearch.OpenSearchConnection; +import org.apache.stormcrawler.opensearch.WaitAckCache; import org.apache.stormcrawler.persistence.Status; import org.apache.stormcrawler.util.ConfUtils; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.opensearch.action.DocWriteRequest; -import org.opensearch.action.bulk.BulkItemResponse; import org.opensearch.action.bulk.BulkProcessor; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; @@ -69,8 +52,7 @@ * Sends documents to opensearch. Indexes all the fields from the tuples or a Map * <String,Object> from a named field. */ -public class IndexerBolt extends AbstractIndexerBolt - implements RemovalListener>, BulkProcessor.Listener { +public class IndexerBolt extends AbstractIndexerBolt implements BulkProcessor.Listener { private static final Logger LOG = LoggerFactory.getLogger(IndexerBolt.class); @@ -99,10 +81,7 @@ public class IndexerBolt extends AbstractIndexerBolt private ScopedReducedMetric perSecMetrics; - private Cache> waitAck; - - // Be fair due to cache timeout - private final ReentrantLock waitAckLock = new ReentrantLock(true); + private WaitAckCache waitAck; public IndexerBolt() {} @@ -135,12 +114,7 @@ public void prepare( this.perSecMetrics = CrawlerMetrics.registerPerSecMetric(context, conf, "Indexer_average_persec", 10); - waitAck = - Caffeine.newBuilder() - .expireAfterWrite(60, TimeUnit.SECONDS) - .removalListener(this) - .build(); - + waitAck = new WaitAckCache(LOG, _collector::fail); CrawlerMetrics.registerGauge(context, conf, "waitAck", waitAck::estimatedSize, 10); // use the default status schema if none has been specified @@ -151,24 +125,9 @@ public void prepare( } } - public void onRemoval( - @Nullable String key, @Nullable List value, @NotNull RemovalCause cause) { - if (!cause.wasEvicted()) { - return; - } - if (value != null) { - LOG.error("Purged from waitAck {} with {} values", key, value.size()); - for (Tuple t : value) { - _collector.fail(t); - } - } else { - // This should never happen, but log it anyway. - LOG.error("Purged from waitAck {} with no values", key); - } - } - @Override public void cleanup() { + waitAck.shutdown(); if (connection != null) { connection.close(); } @@ -246,29 +205,12 @@ public void execute(Tuple tuple) { eventCounter.scope("Indexed").incrBy(1); perSecMetrics.scope("Indexed").update(1); - waitAckLock.lock(); - try { - List tt = waitAck.getIfPresent(docID); - if (tt == null) { - tt = new LinkedList<>(); - waitAck.put(docID, tt); - } - tt.add(tuple); - LOG.debug("Added to waitAck {} with ID {} total {}", url, docID, tt.size()); - } finally { - waitAckLock.unlock(); - } + waitAck.addTuple(docID, tuple); } catch (IOException e) { LOG.error("Error building document for OpenSearch", e); // do not send to status stream so that it gets replayed _collector.fail(tuple); - - waitAckLock.lock(); - try { - waitAck.invalidate(docID); - } finally { - waitAckLock.unlock(); - } + waitAck.invalidate(docID); } } @@ -290,181 +232,47 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon eventCounter.scope("bulks_received").incrBy(1); eventCounter.scope("bulk_msec").incrBy(response.getTook().getMillis()); - var idsToBulkItemsWithFailedFlag = - Arrays.stream(response.getItems()) - .map( - bir -> { - String id = bir.getId(); - BulkItemResponse.Failure f = bir.getFailure(); - boolean failed = false; - if (f != null) { - if (f.getStatus().equals(RestStatus.CONFLICT)) { - eventCounter.scope("doc_conflicts").incrBy(1); - LOG.debug("Doc conflict ID {}", id); - } else { - failed = true; - } - } - return new BulkItemResponseToFailedFlag(bir, failed); - }) - .collect( - // https://github.com/apache/stormcrawler/issues/832 - Collectors.groupingBy( - idWithFailedFlagTuple -> idWithFailedFlagTuple.id, - Collectors.toUnmodifiableList())); - - Map> presentTuples; - long estimatedSize; - Set debugInfo = null; - waitAckLock.lock(); - try { - presentTuples = waitAck.getAllPresent(idsToBulkItemsWithFailedFlag.keySet()); - if (!presentTuples.isEmpty()) { - waitAck.invalidateAll(presentTuples.keySet()); - } - estimatedSize = waitAck.estimatedSize(); - // Only if we have to. - if (LOG.isDebugEnabled() && estimatedSize > 0L) { - debugInfo = new HashSet<>(waitAck.asMap().keySet()); - } - } finally { - waitAckLock.unlock(); - } - - int ackCount = 0; - int failureCount = 0; - - for (var entry : presentTuples.entrySet()) { - final var id = entry.getKey(); - final var associatedTuple = entry.getValue(); - final var bulkItemsWithFailedFlag = idsToBulkItemsWithFailedFlag.get(id); - - BulkItemResponseToFailedFlag selected; - - if (bulkItemsWithFailedFlag.size() == 1) { - selected = bulkItemsWithFailedFlag.get(0); - } else { - // Fallback if there are multiple responses for the same id - BulkItemResponseToFailedFlag tmp = null; - var ctFailed = 0; - for (var buwff : bulkItemsWithFailedFlag) { - if (tmp == null) { - tmp = buwff; - } - if (buwff.failed) { - ctFailed++; - } else { - tmp = buwff; - } - } - if (ctFailed != bulkItemsWithFailedFlag.size()) { - LOG.warn( - "The id {} would result in an ack and a failure. Using only the ack for processing.", - id); - } - selected = Objects.requireNonNull(tmp); - } - - if (associatedTuple != null) { - LOG.debug("Found {} tuple(s) for ID {}", associatedTuple.size(), id); - for (Tuple t : associatedTuple) { + waitAck.processBulkResponse( + response, + executionId, + eventCounter, + (id, t, selected) -> { String url = (String) t.getValueByField("url"); - Metadata metadata = (Metadata) t.getValueByField("metadata"); if (!selected.failed) { - ackCount++; _collector.emit( StatusStreamName, t, new Values(url, metadata, Status.FETCHED)); _collector.ack(t); } else { - failureCount++; var failure = selected.getFailure(); LOG.error("update ID {}, URL {}, failure: {}", id, url, failure); // there is something wrong with the content we should - // treat - // it as an ERROR - if (selected.getFailure().getStatus().equals(RestStatus.BAD_REQUEST)) { + // treat it as an ERROR + if (failure.getStatus().equals(RestStatus.BAD_REQUEST)) { metadata.setValue(Constants.STATUS_ERROR_SOURCE, "OpenSearch indexing"); metadata.setValue(Constants.STATUS_ERROR_MESSAGE, "invalid content"); _collector.emit( StatusStreamName, t, new Values(url, metadata, Status.ERROR)); _collector.ack(t); - LOG.debug("Acked {} with ID {}", url, id); } else { - LOG.error("update ID {}, URL {}, failure: {}", id, url, failure); - // there is something wrong with the content we - // should - // treat - // it as an ERROR - if (failure.getStatus().equals(RestStatus.BAD_REQUEST)) { - metadata.setValue( - Constants.STATUS_ERROR_SOURCE, "OpenSearch indexing"); - metadata.setValue( - Constants.STATUS_ERROR_MESSAGE, "invalid content"); - _collector.emit( - StatusStreamName, - t, - new Values(url, metadata, Status.ERROR)); - _collector.ack(t); - } else { - // otherwise just fail it - _collector.fail(t); - } + _collector.fail(t); } } - } - } else { - LOG.warn("Could not find unacked tuples for {}", entry.getKey()); - } - } - - LOG.info( - "Bulk response [{}] : items {}, waitAck {}, acked {}, failed {}", - executionId, - idsToBulkItemsWithFailedFlag.size(), - estimatedSize, - ackCount, - failureCount); - if (debugInfo != null) { - for (String kinaw : debugInfo) { - LOG.debug("Still in wait ack after bulk response [{}] => {}", executionId, kinaw); - } - } + }); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { eventCounter.scope("bulks_received").incrBy(1); - LOG.error("Exception with bulk {} - failing the whole lot ", executionId, failure); - - final var failedIds = - request.requests().stream() - .map(DocWriteRequest::id) - .collect(Collectors.toUnmodifiableSet()); - Map> failedTupleLists; - waitAckLock.lock(); - try { - failedTupleLists = waitAck.getAllPresent(failedIds); - if (!failedTupleLists.isEmpty()) { - waitAck.invalidateAll(failedTupleLists.keySet()); - } - } finally { - waitAckLock.unlock(); - } - for (var id : failedIds) { - var failedTuples = failedTupleLists.get(id); - if (failedTuples != null) { - LOG.debug("Failed {} tuple(s) for ID {}", failedTuples.size(), id); - for (Tuple x : failedTuples) { - // fail it + waitAck.processFailedBulk( + request, + executionId, + failure, + t -> { eventCounter.scope("failed").incrBy(1); - _collector.fail(x); - } - } else { - LOG.warn("Could not find unacked tuple for {}", id); - } - } + _collector.fail(t); + }); } } diff --git a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java index df1598f07..22512975d 100644 --- a/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java +++ b/external/opensearch/src/main/java/org/apache/stormcrawler/opensearch/persistence/StatusUpdaterBolt.java @@ -17,23 +17,11 @@ package org.apache.stormcrawler.opensearch.persistence; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.RemovalCause; -import com.github.benmanes.caffeine.cache.RemovalListener; import java.io.IOException; -import java.util.Arrays; import java.util.Date; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Objects; import java.util.Optional; -import java.util.Set; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; @@ -42,24 +30,19 @@ import org.apache.stormcrawler.metrics.CrawlerMetrics; import org.apache.stormcrawler.metrics.ScopedCounter; import org.apache.stormcrawler.metrics.ScopedReducedMetric; -import org.apache.stormcrawler.opensearch.BulkItemResponseToFailedFlag; import org.apache.stormcrawler.opensearch.Constants; import org.apache.stormcrawler.opensearch.IndexCreation; import org.apache.stormcrawler.opensearch.OpenSearchConnection; +import org.apache.stormcrawler.opensearch.WaitAckCache; import org.apache.stormcrawler.persistence.AbstractStatusUpdaterBolt; import org.apache.stormcrawler.persistence.Status; import org.apache.stormcrawler.util.ConfUtils; import org.apache.stormcrawler.util.URLPartitioner; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.opensearch.action.DocWriteRequest; -import org.opensearch.action.bulk.BulkItemResponse; import org.opensearch.action.bulk.BulkProcessor; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.index.IndexRequest; import org.opensearch.common.xcontent.XContentFactory; -import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.XContentBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,8 +51,7 @@ * Simple bolt which stores the status of URLs into OpenSearch. Takes the tuples coming from the * 'status' stream. To be used in combination with a Spout to read from the index. */ -public class StatusUpdaterBolt extends AbstractStatusUpdaterBolt - implements RemovalListener>, BulkProcessor.Listener { +public class StatusUpdaterBolt extends AbstractStatusUpdaterBolt implements BulkProcessor.Listener { private static final Logger LOG = LoggerFactory.getLogger(StatusUpdaterBolt.class); @@ -95,10 +77,7 @@ public class StatusUpdaterBolt extends AbstractStatusUpdaterBolt private OpenSearchConnection connection; - private Cache> waitAck; - - // Be fair due to cache timeout - private final ReentrantLock waitAckLock = new ReentrantLock(true); + private WaitAckCache waitAck; private ScopedCounter eventCounter; @@ -160,6 +139,8 @@ public void prepare( fieldNameForRoutingKey = fieldNameForRoutingKey.replaceAll("\\.", "%2E"); } + int metrics_time_bucket_secs = 30; + String defaultSpec = String.format( Locale.ROOT, @@ -169,9 +150,14 @@ public void prepare( String waitAckSpec = ConfUtils.getString(stormConf, "opensearch.status.waitack.cache.spec", defaultSpec); - waitAck = Caffeine.from(waitAckSpec).removalListener(this).build(); - - int metrics_time_bucket_secs = 30; + waitAck = + new WaitAckCache( + waitAckSpec, + LOG, + t -> { + eventCounter.scope("purged").incrBy(1); + collector.fail(t); + }); // create gauge for waitAck CrawlerMetrics.registerGauge( @@ -203,6 +189,7 @@ public void prepare( @Override public void cleanup() { + waitAck.shutdown(); if (connection == null) { return; } @@ -217,19 +204,8 @@ public void store( String documentID = getDocumentID(metadata, url); - boolean isAlreadySentAndDiscovered; - // need to synchronize: otherwise it might get added to the cache - // without having been sent to OpenSearch - waitAckLock.lock(); - try { - // check that the same URL is not being sent to OpenSearch - final var alreadySent = waitAck.getIfPresent(documentID); - isAlreadySentAndDiscovered = status.equals(Status.DISCOVERED) && alreadySent != null; - } finally { - waitAckLock.unlock(); - } - - if (isAlreadySentAndDiscovered) { + // check that the same URL is not being sent to OpenSearch + if (status.equals(Status.DISCOVERED) && waitAck.contains(documentID)) { // if this object is discovered - adding another version of it // won't make any difference LOG.debug( @@ -290,33 +266,13 @@ public void store( request.routing(partitionKey); } - waitAckLock.lock(); - try { - final List tt = waitAck.get(documentID, k -> new LinkedList<>()); - tt.add(tuple); - LOG.debug("Added to waitAck {} with ID {} total {}", url, documentID, tt.size()); - } finally { - waitAckLock.unlock(); - } + waitAck.addTuple(documentID, tuple); LOG.debug("Sending to OpenSearch buffer {} with ID {}", url, documentID); connection.addToProcessor(request); } - @Override - public void onRemoval( - @Nullable String key, @Nullable List value, @NotNull RemovalCause cause) { - if (!cause.wasEvicted()) { - return; - } - LOG.error("Purged from waitAck {} with {} values", key, value.size()); - for (Tuple t : value) { - eventCounter.scope("purged").incrBy(1); - collector.fail(t); - } - } - @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { LOG.debug("afterBulk [{}] with {} responses", executionId, request.numberOfActions()); @@ -325,115 +281,21 @@ public void afterBulk(long executionId, BulkRequest request, BulkResponse respon eventCounter.scope("received").incrBy(request.numberOfActions()); receivedPerSecMetrics.scope("received").update(request.numberOfActions()); - var idsToBulkItemsWithFailedFlag = - Arrays.stream(response.getItems()) - .map( - bir -> { - String id = bir.getId(); - BulkItemResponse.Failure f = bir.getFailure(); - boolean failed = false; - if (f != null) { - // already discovered - if (f.getStatus().equals(RestStatus.CONFLICT)) { - eventCounter.scope("doc_conflicts").incrBy(1); - LOG.debug("Doc conflict ID {}", id); - } else { - LOG.error("Update ID {}, failure: {}", id, f); - failed = true; - } - } - return new BulkItemResponseToFailedFlag(bir, failed); - }) - .collect( - // https://github.com/apache/stormcrawler/issues/832 - Collectors.groupingBy( - idWithFailedFlagTuple -> idWithFailedFlagTuple.id, - Collectors.toUnmodifiableList())); - - Map> presentTuples; - long estimatedSize; - Set debugInfo = null; - waitAckLock.lock(); - try { - presentTuples = waitAck.getAllPresent(idsToBulkItemsWithFailedFlag.keySet()); - if (!presentTuples.isEmpty()) { - waitAck.invalidateAll(presentTuples.keySet()); - } - estimatedSize = waitAck.estimatedSize(); - // Only if we have to. - if (LOG.isDebugEnabled() && estimatedSize > 0L) { - debugInfo = new HashSet<>(waitAck.asMap().keySet()); - } - } finally { - waitAckLock.unlock(); - } - - int ackCount = 0; - int failureCount = 0; - - for (var entry : presentTuples.entrySet()) { - final var id = entry.getKey(); - final var associatedTuple = entry.getValue(); - final var bulkItemsWithFailedFlag = idsToBulkItemsWithFailedFlag.get(id); - - BulkItemResponseToFailedFlag selected; - if (bulkItemsWithFailedFlag.size() == 1) { - selected = bulkItemsWithFailedFlag.get(0); - } else { - // Fallback if there are multiple responses for the same id - BulkItemResponseToFailedFlag tmp = null; - var ctFailed = 0; - for (var buwff : bulkItemsWithFailedFlag) { - if (tmp == null) { - tmp = buwff; - } - if (buwff.failed) { - ctFailed++; - } else { - tmp = buwff; - } - } - if (ctFailed != bulkItemsWithFailedFlag.size()) { - LOG.warn( - "The id {} would result in an ack and a failure. Using only the ack for processing.", - id); - } - selected = Objects.requireNonNull(tmp); - } - - if (associatedTuple != null) { - LOG.debug("Acked {} tuple(s) for ID {}", associatedTuple.size(), id); - for (Tuple tuple : associatedTuple) { + waitAck.processBulkResponse( + response, + executionId, + eventCounter, + (id, t, selected) -> { if (!selected.failed) { - String url = tuple.getStringByField("url"); - ackCount++; - // ack and put in cache + String url = t.getStringByField("url"); LOG.debug("Acked {} with ID {}", url, id); eventCounter.scope("acked").incrBy(1); - super.ack(tuple, url); + ack(t, url); } else { - failureCount++; eventCounter.scope("failed").incrBy(1); - collector.fail(tuple); + collector.fail(t); } - } - } else { - LOG.warn("Could not find unacked tuple for {}", id); - } - } - - LOG.info( - "Bulk response [{}] : items {}, waitAck {}, acked {}, failed {}", - executionId, - idsToBulkItemsWithFailedFlag.size(), - estimatedSize, - ackCount, - failureCount); - if (debugInfo != null) { - for (String kinaw : debugInfo) { - LOG.debug("Still in wait ack after bulk response [{}] => {}", executionId, kinaw); - } - } + }); } @Override @@ -441,36 +303,15 @@ public void afterBulk(long executionId, BulkRequest request, Throwable throwable eventCounter.scope("bulks_received").incrBy(1); eventCounter.scope("received").incrBy(request.numberOfActions()); receivedPerSecMetrics.scope("received").update(request.numberOfActions()); - LOG.error("Exception with bulk {} - failing the whole lot ", executionId, throwable); - - final var failedIds = - request.requests().stream() - .map(DocWriteRequest::id) - .collect(Collectors.toUnmodifiableSet()); - Map> failedTupleLists; - waitAckLock.lock(); - try { - failedTupleLists = waitAck.getAllPresent(failedIds); - if (!failedTupleLists.isEmpty()) { - waitAck.invalidateAll(failedTupleLists.keySet()); - } - } finally { - waitAckLock.unlock(); - } - for (var id : failedIds) { - var failedTuples = failedTupleLists.get(id); - if (failedTuples != null) { - LOG.debug("Failed {} tuple(s) for ID {}", failedTuples.size(), id); - for (Tuple x : failedTuples) { - // fail it + waitAck.processFailedBulk( + request, + executionId, + throwable, + t -> { eventCounter.scope("failed").incrBy(1); - collector.fail(x); - } - } else { - LOG.warn("Could not find unacked tuple for {}", id); - } - } + collector.fail(t); + }); } @Override diff --git a/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java b/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java new file mode 100644 index 000000000..0fd22d181 --- /dev/null +++ b/external/opensearch/src/test/java/org/apache/stormcrawler/opensearch/WaitAckCacheTest.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.opensearch; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.github.benmanes.caffeine.cache.Ticker; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.storm.tuple.Tuple; +import org.apache.stormcrawler.metrics.ScopedCounter; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.bulk.BulkItemResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.rest.RestStatus; +import org.slf4j.LoggerFactory; + +class WaitAckCacheTest { + + private WaitAckCache cache; + private List evicted; + private List acked; + private List failed; + + @BeforeEach + void setUp() { + evicted = new CopyOnWriteArrayList<>(); + acked = new ArrayList<>(); + failed = new ArrayList<>(); + cache = new WaitAckCache(LoggerFactory.getLogger(WaitAckCacheTest.class), evicted::add); + } + + private Tuple mockTuple(String url) { + Tuple t = mock(Tuple.class); + when(t.getValueByField("url")).thenReturn(url); + when(t.getStringByField("url")).thenReturn(url); + return t; + } + + private static ShardId shardId() { + return new ShardId("index", "_na_", 0); + } + + private static BulkItemResponse successItem(int itemId, String docId) { + IndexResponse indexResponse = new IndexResponse(shardId(), docId, 1, 1, 1, true); + return new BulkItemResponse(itemId, DocWriteRequest.OpType.INDEX, indexResponse); + } + + private static BulkItemResponse failedItem(int itemId, String docId, RestStatus status) { + BulkItemResponse.Failure failure = + new BulkItemResponse.Failure("index", docId, new Exception("test failure"), status); + return new BulkItemResponse(itemId, DocWriteRequest.OpType.INDEX, failure); + } + + private static BulkResponse bulkResponse(BulkItemResponse... items) { + return new BulkResponse(items, 10L); + } + + @Test + void addAndContains() { + Tuple t = mockTuple("http://example.com"); + assertFalse(cache.contains("doc1")); + + cache.addTuple("doc1", t); + assertTrue(cache.contains("doc1")); + assertEquals(1, cache.estimatedSize()); + } + + @Test + void invalidateRemovesEntry() { + Tuple t = mockTuple("http://example.com"); + cache.addTuple("doc1", t); + assertTrue(cache.contains("doc1")); + + cache.invalidate("doc1"); + assertFalse(cache.contains("doc1")); + } + + @Test + void processBulkResponse_successfulItem_ackedViaTupleAction() { + Tuple t = mockTuple("http://example.com"); + cache.addTuple("doc1", t); + + BulkResponse response = bulkResponse(successItem(0, "doc1")); + + cache.processBulkResponse( + response, + 1L, + null, + (id, tuple, selected) -> { + if (!selected.failed) { + acked.add(tuple); + } else { + failed.add(tuple); + } + }); + + assertEquals(1, acked.size()); + assertEquals(0, failed.size()); + assertSame(t, acked.get(0)); + assertFalse(cache.contains("doc1")); + } + + @Test + void processBulkResponse_failedItem_failedViaTupleAction() { + Tuple t = mockTuple("http://example.com"); + cache.addTuple("doc1", t); + + BulkResponse response = + bulkResponse(failedItem(0, "doc1", RestStatus.INTERNAL_SERVER_ERROR)); + + cache.processBulkResponse( + response, + 1L, + null, + (id, tuple, selected) -> { + if (!selected.failed) { + acked.add(tuple); + } else { + failed.add(tuple); + } + }); + + assertEquals(0, acked.size()); + assertEquals(1, failed.size()); + assertSame(t, failed.get(0)); + } + + @Test + void processBulkResponse_conflictIsNotAFailure() { + Tuple t = mockTuple("http://example.com"); + cache.addTuple("doc1", t); + + ScopedCounter counter = scopeName -> incrementBy -> {}; + BulkResponse response = bulkResponse(failedItem(0, "doc1", RestStatus.CONFLICT)); + + cache.processBulkResponse( + response, + 1L, + counter, + (id, tuple, selected) -> { + if (!selected.failed) { + acked.add(tuple); + } else { + failed.add(tuple); + } + }); + + assertEquals(1, acked.size()); + assertEquals(0, failed.size()); + } + + @Test + void processBulkResponse_multipleTuplesForSameDocId() { + Tuple t1 = mockTuple("http://example.com/1"); + Tuple t2 = mockTuple("http://example.com/2"); + cache.addTuple("doc1", t1); + cache.addTuple("doc1", t2); + + BulkResponse response = bulkResponse(successItem(0, "doc1")); + + cache.processBulkResponse(response, 1L, null, (id, tuple, selected) -> acked.add(tuple)); + + assertEquals(2, acked.size()); + assertTrue(acked.contains(t1)); + assertTrue(acked.contains(t2)); + } + + @Test + void processBulkResponse_duplicateDocIdInBulk_prefersSuccess() { + // https://github.com/apache/stormcrawler/issues/832 + Tuple t = mockTuple("http://example.com"); + cache.addTuple("doc1", t); + + BulkResponse response = + bulkResponse( + failedItem(0, "doc1", RestStatus.INTERNAL_SERVER_ERROR), + successItem(1, "doc1")); + + cache.processBulkResponse( + response, + 1L, + null, + (id, tuple, selected) -> { + if (!selected.failed) { + acked.add(tuple); + } else { + failed.add(tuple); + } + }); + + assertEquals(1, acked.size()); + assertEquals(0, failed.size()); + } + + @Test + void processFailedBulk_failsAllMatchingTuples() { + Tuple t1 = mockTuple("http://example.com/1"); + Tuple t2 = mockTuple("http://example.com/2"); + cache.addTuple("doc1", t1); + cache.addTuple("doc2", t2); + + BulkRequest request = new BulkRequest(); + request.add(new DeleteRequest("index", "doc1")); + request.add(new DeleteRequest("index", "doc2")); + + cache.processFailedBulk(request, 1L, new Exception("connection lost"), failed::add); + + assertEquals(2, failed.size()); + assertTrue(failed.contains(t1)); + assertTrue(failed.contains(t2)); + assertFalse(cache.contains("doc1")); + assertFalse(cache.contains("doc2")); + } + + @Test + void processFailedBulk_ignoresMissingIds() { + Tuple t = mockTuple("http://example.com"); + cache.addTuple("doc1", t); + + BulkRequest request = new BulkRequest(); + request.add(new DeleteRequest("index", "doc_unknown")); + + cache.processFailedBulk(request, 1L, new Exception("test"), failed::add); + + assertEquals(0, failed.size()); + // doc1 should still be in cache since it wasn't in the failed request + assertTrue(cache.contains("doc1")); + } + + @Test + void eviction_failsTuplesOnExpiry() { + AtomicLong fakeTime = new AtomicLong(0); + Ticker fakeTicker = fakeTime::get; + cache = + new WaitAckCache( + "expireAfterWrite=1s", + LoggerFactory.getLogger(WaitAckCacheTest.class), + evicted::add, + fakeTicker); + Tuple t = mockTuple("http://example.com"); + cache.addTuple("doc1", t); + + // Advance past the 1s expiry + fakeTime.set(TimeUnit.SECONDS.toNanos(2)); + + // Access triggers expiration; cleanUp forces listener execution + cache.contains("doc1"); + cache.cleanUp(); + + assertFalse(evicted.isEmpty(), "Eviction callback should have fired"); + assertTrue(evicted.contains(t)); + } + + @Test + void processBulkResponse_multipleDocIds() { + Tuple t1 = mockTuple("http://example.com/1"); + Tuple t2 = mockTuple("http://example.com/2"); + cache.addTuple("doc1", t1); + cache.addTuple("doc2", t2); + + BulkResponse response = + bulkResponse( + successItem(0, "doc1"), + failedItem(1, "doc2", RestStatus.INTERNAL_SERVER_ERROR)); + + cache.processBulkResponse( + response, + 1L, + null, + (id, tuple, selected) -> { + if (!selected.failed) { + acked.add(tuple); + } else { + failed.add(tuple); + } + }); + + assertEquals(1, acked.size()); + assertSame(t1, acked.get(0)); + assertEquals(1, failed.size()); + assertSame(t2, failed.get(0)); + } + + @Test + void shutdown_failsAllRemainingTuples() { + Tuple t1 = mockTuple("http://example.com/1"); + Tuple t2 = mockTuple("http://example.com/2"); + cache.addTuple("doc1", t1); + cache.addTuple("doc2", t2); + + cache.shutdown(); + + assertEquals(2, evicted.size()); + assertTrue(evicted.contains(t1)); + assertTrue(evicted.contains(t2)); + assertFalse(cache.contains("doc1")); + assertFalse(cache.contains("doc2")); + } +}