From 098249abceae99b498c3993596d5744c7cbdf375 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Wed, 18 Feb 2026 17:06:18 +0530 Subject: [PATCH 1/4] Change result type to accept multiple batches Signed-off-by: Arpit Bandejiya --- .../datafusion/DatafusionEngine.java | 23 +++++++++++++------ .../datafusion/search/DatafusionContext.java | 6 ++--- .../index/engine/SearchExecEngine.java | 3 ++- .../org/opensearch/search/SearchService.java | 4 ++-- .../aggregations/ShardResultConvertor.java | 6 ++--- .../bucket/composite/CompositeAggregator.java | 8 +++---- .../terms/AbstractStringTermsAggregator.java | 6 ++--- .../bucket/terms/MultiTermsAggregator.java | 6 ++--- .../bucket/terms/NumericTermsAggregator.java | 19 +++++++-------- .../aggregations/metrics/AvgAggregator.java | 11 +++++---- .../metrics/CardinalityAggregator.java | 9 ++++---- .../aggregations/metrics/MaxAggregator.java | 13 ++++++----- .../aggregations/metrics/MinAggregator.java | 13 ++++++----- .../aggregations/metrics/SumAggregator.java | 8 +++---- .../metrics/ValueCountAggregator.java | 9 ++++---- .../search/internal/SearchContext.java | 4 ++-- .../SearchEngineResultConversionUtils.java | 8 +++---- 17 files changed, 86 insertions(+), 70 deletions(-) diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java index fde15b8fee9f1..043156e0ae9f8 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java @@ -213,7 +213,7 @@ public void close() { @Override public void executeQueryPhase(DatafusionContext context) { - Map finalRes = new HashMap<>(); + Map> finalRes = new HashMap<>(); List rowIdResult = new ArrayList<>(); RecordBatchStream stream = null; @@ -244,7 +244,11 @@ public void executeQueryPhase(DatafusionContext context) { fieldValues[i] = fieldVector.getObject(i); } } - finalRes.put(fieldName, fieldValues); + if(finalRes.containsKey(fieldName)) { + finalRes.get(fieldName).addAll(Arrays.asList(fieldValues)); + } else { + finalRes.put(fieldName, new ArrayList<>(Arrays.asList(fieldValues))); + } } } }; @@ -283,13 +287,13 @@ public void executeQueryPhase(DatafusionContext context) { } @Override - public void executeQueryPhaseAsync(DatafusionContext context, Executor executor, ActionListener> listener) { + public void executeQueryPhaseAsync(DatafusionContext context, Executor executor, ActionListener>> listener) { try { DatafusionSearcher datafusionSearcher = context.getEngineSearcher(); context.getDatafusionQuery().setQueryPlanExplainEnabled(context.evaluateSearchQueryExplainMode()); datafusionSearcher.searchAsync(context.getDatafusionQuery(), datafusionService.getRuntimePointer()).whenCompleteAsync((streamPointer, error)-> { - Map finalRes = new HashMap<>(); + Map> finalRes = new HashMap<>(); List rowIdResult = new ArrayList<>(); if(streamPointer == null) { throw new RuntimeException(error); @@ -316,7 +320,11 @@ public void collect(RecordBatchStream value) { fieldValues[i] = fieldVector.getObject(i); } } - finalRes.put(fieldName, fieldValues); + if(finalRes.containsKey(fieldName)) { + finalRes.get(fieldName).addAll(Arrays.asList(fieldValues)); + } else { + finalRes.put(fieldName, new ArrayList<>(Arrays.asList(fieldValues))); + } } } }; @@ -343,9 +351,9 @@ private void loadNextBatch( RecordBatchStream stream, Executor executor, SearchResultsCollector collector, - Map finalRes, + Map> finalRes, RootAllocator allocator, - ActionListener> listener, + ActionListener>> listener, DatafusionContext context, List rowIdResult ) { @@ -365,6 +373,7 @@ private void loadNextBatch( context.queryResult().topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(rowIdResult.size(), TotalHits.Relation.EQUAL_TO), rowIdResult.stream().map(d-> new ScoreDoc(d.intValue(), Float.NaN, context.indexShard().shardId().getId())).toList().toArray(ScoreDoc[]::new)) , Float.NaN), new DocValueFormat[0]); + // ArrayList<> --> Object[] listener.onResponse(finalRes); } }, error -> { diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionContext.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionContext.java index 39c4b62e4508f..eaefeb70bc655 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionContext.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionContext.java @@ -86,7 +86,7 @@ public class DatafusionContext extends SearchContext { private final IndexService indexService; private final QueryShardContext queryShardContext; private DatafusionQuery datafusionQuery; - private Map dfResults; + private Map> dfResults; private SearchContextAggregations aggregations; private final BigArrays bigArrays; private final Map, CollectorManager> queryCollectorManagers = new HashMap<>(); @@ -825,11 +825,11 @@ public ContextEngineSearcher contextEngineSe return new ContextEngineSearcher<>(this.engineSearcher, this); } - public void setDFResults(Map dfResults) { + public void setDFResults(Map> dfResults) { this.dfResults = dfResults; } - public Map getDFResults() { + public Map> getDFResults() { return dfResults; } diff --git a/server/src/main/java/org/opensearch/index/engine/SearchExecEngine.java b/server/src/main/java/org/opensearch/index/engine/SearchExecEngine.java index 66244d488ab8d..1c138a6cddf10 100644 --- a/server/src/main/java/org/opensearch/index/engine/SearchExecEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/SearchExecEngine.java @@ -20,6 +20,7 @@ import org.opensearch.search.internal.ShardSearchRequest; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -44,7 +45,7 @@ public abstract class SearchExecEngine> listener); + public abstract void executeQueryPhaseAsync(C context, Executor executor, ActionListener>> listener); /** * execute Fetch Phase diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 40140984a833a..e46432a6f72a5 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -958,9 +958,9 @@ private void executeNativeQueryPhaseAsync( SearchExecEngine searchExecEngine = indexer instanceof CompositeEngine ? ((CompositeEngine) indexer).getPrimaryReadEngine() : null; // Execute native query async - searchExecEngine.executeQueryPhaseAsync(finalContext, executor, new ActionListener>() { + searchExecEngine.executeQueryPhaseAsync(finalContext, executor, new ActionListener>>() { @Override - public void onResponse(Map result) { + public void onResponse(Map> result) { try { finalContext.setDFResults(result); // Continue with rest of query phase diff --git a/server/src/main/java/org/opensearch/search/aggregations/ShardResultConvertor.java b/server/src/main/java/org/opensearch/search/aggregations/ShardResultConvertor.java index bf9b1427e9567..61b9a7ec88977 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/ShardResultConvertor.java +++ b/server/src/main/java/org/opensearch/search/aggregations/ShardResultConvertor.java @@ -16,8 +16,8 @@ public interface ShardResultConvertor { - default List convert(Map shardResult, SearchContext searchContext) { - int rows = shardResult.entrySet().stream().findFirst().get().getValue().length; + default List convert(Map> shardResult, SearchContext searchContext) { + int rows = shardResult.entrySet().stream().findFirst().get().getValue().size(); List internalAggregations = new ArrayList<>(); for (int i = 0; i < rows; i++) { internalAggregations.add(convertRow(shardResult, i, searchContext)); @@ -25,7 +25,7 @@ default List convert(Map shardResult, Sea return internalAggregations; } - default InternalAggregation convertRow(Map shardResult, int row, SearchContext searchContext) { + default InternalAggregation convertRow(Map> shardResult, int row, SearchContext searchContext) { throw new UnsupportedOperationException("Row conversion not supported"); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index 1c6b0a7785dcf..e7a8ffd9d9ac2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -734,7 +734,7 @@ public void collect(int doc, long zeroBucket) throws IOException { } @Override - public List convert(Map shardResult, SearchContext searchContext) { + public List convert(Map> shardResult, SearchContext searchContext) { if(shardResult.isEmpty()) { return Collections.singletonList(buildEmptyAggregation()); } @@ -742,7 +742,7 @@ public List convert(Map shardResult, Sear List> currentCompositeKey = new ArrayList<>(sourceConfigs.length); List compositeKeys = new ArrayList<>(shardResult.size()); if (shardResult.isEmpty() == false) { - for (int i = 0; i < shardResult.get(shardResult.keySet().stream().findFirst().get()).length; i++) { + for (int i = 0; i < shardResult.get(shardResult.keySet().stream().findFirst().get()).size(); i++) { for (CompositeValuesSourceConfig sourceConfig : sourceConfigs) { // if (sourceConfig.fieldType() == null) { // throw new UnsupportedOperationException("Composite aggregation does not support script field types"); @@ -750,9 +750,9 @@ public List convert(Map shardResult, Sear // source=hits | eval m = extract(minute from EventTime) | stats count() by UserID, m, SearchPhrase | sort - \`count()\` | head 10 // for above query without this change it will fail above // We can get the name directly from sourceConfig - Object[] values = shardResult.get(sourceConfig.name()); + List values = shardResult.get(sourceConfig.name()); // TODO : Would require conversion for certain types, - currentCompositeKey.add(searchContext.convertToComparable(values[i])); + currentCompositeKey.add(searchContext.convertToComparable(values.get(i))); } compositeKeys.add(new CompositeKey(currentCompositeKey.toArray(new Comparable[0]))); currentCompositeKey.clear(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java index d7ae4dc8d4701..57d9538024a45 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java @@ -120,14 +120,14 @@ protected SignificantStringTerms buildEmptySignificantTermsAggregation(long subs } @Override - public List convert(Map shardResult, SearchContext searchContext) { + public List convert(Map> shardResult, SearchContext searchContext) { if(shardResult.isEmpty()) { return Collections.singletonList(buildEmptyTermsAggregation()); } - int rowCount = shardResult.get(shardResult.keySet().stream().findFirst().get()).length; + int rowCount = shardResult.get(shardResult.keySet().stream().findFirst().get()).size(); List buckets = new ArrayList<>(rowCount); for (int row = 0; row < rowCount; row++) { - String termKey = (String) searchContext.convertToComparable(shardResult.get(name)[row]); + String termKey = (String) searchContext.convertToComparable(shardResult.get(name).get(row)); Tuple, Long> subAggsAndDocCount = SearchEngineResultConversionUtils.extractSubAggsAndDocCount(subAggregators, searchContext, shardResult, row); buckets.add(new StringTerms.Bucket( new BytesRef(termKey), diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java index 6970ff2680d1f..da7a9f8be5b2e 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java @@ -709,12 +709,12 @@ static InternalValuesSource doubleValueSource(ValuesSource.Numeric valuesSource, } @Override - public List convert(Map shardResult, SearchContext searchContext) { - int rowCount = shardResult.isEmpty() ? 0 : shardResult.get(fields.getFirst()).length ; + public List convert(Map> shardResult, SearchContext searchContext) { + int rowCount = shardResult.isEmpty() ? 0 : shardResult.get(fields.getFirst()).size() ; List buckets = new ArrayList<>(rowCount); for (int i = 0; i < rowCount; i++) { final int j = i; - List key = fields.stream().map(fieldName -> (Object) searchContext.convertToComparable(shardResult.get(fieldName)[j])).toList(); + List key = fields.stream().map(fieldName -> (Object) searchContext.convertToComparable(shardResult.get(fieldName).get(j))).toList(); Tuple, Long> subAggsAndDocCount = SearchEngineResultConversionUtils.extractSubAggsAndDocCount(subAggregators, searchContext, shardResult, i); buckets.add(new InternalMultiTerms.Bucket(key, subAggsAndDocCount.v2(), InternalAggregations.from(subAggsAndDocCount.v1()), showTermDocCountError, 0, formats)); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java index e855a3b2306b5..2c9c1ad322ff2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java @@ -84,6 +84,7 @@ import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static org.opensearch.search.aggregations.InternalOrder.isKeyOrder; @@ -523,15 +524,15 @@ LongTerms buildEmptyResult() { } @Override - public List convert(Map shardResult, SearchContext searchContext) { - int rowCount = shardResult.isEmpty() ? 0 : shardResult.get(name).length ; + public List convert(Map> shardResult, SearchContext searchContext) { + int rowCount = shardResult.isEmpty() ? 0 : shardResult.get(name).size() ; List buckets = new ArrayList<>(rowCount); for (int i = 0; i < rowCount; i++) { final int j = i; buckets.add(new LongTerms.Bucket( - ((Number) searchContext.convertToComparable(shardResult.get(name)[i])).longValue(), + ((Number) searchContext.convertToComparable(shardResult.get(name).get(i))).longValue(), 1, - InternalAggregations.from(Arrays.stream(subAggregators).map(subAgg -> ((ShardResultConvertor)subAgg).convertRow(shardResult, j, searchContext)).toList()), + InternalAggregations.from(Arrays.stream(subAggregators).map(subAgg -> ((ShardResultConvertor)subAgg).convertRow(shardResult, j, searchContext)).collect(Collectors.toList())), true, 0, format @@ -640,18 +641,18 @@ DoubleTerms buildEmptyResult() { } @Override - public List convert(Map shardResult, SearchContext searchContext) { + public List convert(Map> shardResult, SearchContext searchContext) { if(shardResult.isEmpty()) { return Collections.singletonList(buildEmptyAggregation()); } - int rowCount = shardResult.isEmpty() ? 0 : shardResult.get(name).length ; + int rowCount = shardResult.isEmpty() ? 0 : shardResult.get(name).size() ; List buckets = new ArrayList<>(rowCount); for (int i = 0; i < rowCount; i++) { final int j = i; buckets.add(new DoubleTerms.Bucket( - ((Number) searchContext.convertToComparable(shardResult.get(name)[i])).doubleValue(), + ((Number) searchContext.convertToComparable(shardResult.get(name).get(i))).doubleValue(), 1, - InternalAggregations.from(Arrays.stream(subAggregators).map(subAgg -> ((ShardResultConvertor)subAgg).convertRow(shardResult, j, searchContext)).toList()), + InternalAggregations.from(Arrays.stream(subAggregators).map(subAgg -> ((ShardResultConvertor)subAgg).convertRow(shardResult, j, searchContext)).collect(Collectors.toList())), true, 0, format @@ -877,7 +878,7 @@ public void close() { } @Override - public List convert(Map shardResult, SearchContext searchContext) { + public List convert(Map> shardResult, SearchContext searchContext) { if(shardResult.isEmpty()) { return Collections.singletonList(buildEmptyAggregation()); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java index 93f0eccbc4cfa..7ec9e54eb9811 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java @@ -60,6 +60,7 @@ import org.opensearch.search.startree.StarTreeQueryHelper; import java.io.IOException; +import java.util.List; import java.util.Map; import static org.opensearch.search.startree.StarTreeQueryHelper.getStarTreeFilteredValues; @@ -278,12 +279,12 @@ public void collectStarTreeEntry(int starTreeEntryBit, long bucket) throws IOExc } @Override - public InternalAggregation convertRow(Map shardResult, int row, SearchContext searchContext) { - Object[] counts = shardResult.get(name + "_count"); - Object[] sums = shardResult.get(name + "_sum"); - if (counts == null || sums == null || counts[row] == null || sums[row] == null) { + public InternalAggregation convertRow(Map> shardResult, int row, SearchContext searchContext) { + List counts = shardResult.get(name + "_count"); + List sums = shardResult.get(name + "_sum"); + if (counts == null || sums == null || counts.get(row) == null || sums.get(row) == null) { return buildEmptyAggregation(); } - return new InternalAvg(name, ((Number) sums[row]).doubleValue(), ((Number) counts[row]).longValue(), format, metadata()); + return new InternalAvg(name, ((Number) sums.get(row)).doubleValue(), ((Number) counts.get(row)).longValue(), format, metadata()); } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java index 4cb20cc4612b4..8b2bcbfb043f8 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/CardinalityAggregator.java @@ -76,6 +76,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.List; import java.util.function.BiConsumer; import static org.opensearch.search.SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD; @@ -764,15 +765,15 @@ public long nextValue() throws IOException { } @Override - public InternalAggregation convertRow(Map shardResult, int row, SearchContext searchContext) { - Object[] hlls = shardResult.get(name + "[hll_registers]"); + public InternalAggregation convertRow(Map> shardResult, int row, SearchContext searchContext) { + List hlls = shardResult.get(name + "[hll_registers]"); if (hlls == null) { hlls = shardResult.get(name); } - if(hlls == null || hlls[row] == null) { + if(hlls == null || hlls.get(row) == null) { return buildEmptyAggregation(); } - HyperLogLogPlusPlus sketch = DataFusionHLLWrapper.getHyperLogLogPlusPlus((byte[]) hlls[row]); + HyperLogLogPlusPlus sketch = DataFusionHLLWrapper.getHyperLogLogPlusPlus((byte[]) hlls.get(row)); return new InternalCardinality(name, sketch, null); } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java index 83b4c1cd5a105..43a7015eca45a 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java @@ -65,6 +65,7 @@ import java.time.LocalDateTime; import java.util.Arrays; import java.util.Map; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -291,15 +292,15 @@ public StreamingCostMetrics getStreamingCostMetrics() { } @Override - public InternalAggregation convertRow(Map shardResult, int row, SearchContext searchContext) { - Object[] values = shardResult.get(name); - if(values == null || values[row] == null) { + public InternalAggregation convertRow(Map> shardResult, int row, SearchContext searchContext) { + List values = shardResult.get(name); + if(values == null || values.get(row) == null) { return buildEmptyAggregation(); } - if (values[row].getClass().equals(LocalDateTime.class)) { - LocalDateTime value = (LocalDateTime) values[row]; + if (values.get(row).getClass().equals(LocalDateTime.class)) { + LocalDateTime value = (LocalDateTime) values.get(row); return new InternalMax(name, convertLocalDateTimeToEpochMillis(value), formatter, metadata()); } - return new InternalMax(name, ((Number) values[row]).doubleValue(), formatter, metadata()); + return new InternalMax(name, ((Number) values.get(row)).doubleValue(), formatter, metadata()); } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java index e5caee6fbde7c..7465814685278 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java @@ -64,6 +64,7 @@ import java.io.IOException; import java.time.LocalDateTime; import java.util.Map; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -277,16 +278,16 @@ public StarTreeBucketCollector getStarTreeBucketCollector( } @Override - public InternalAggregation convertRow(Map shardResult, int row, SearchContext searchContext) { - Object[] values = shardResult.get(name); - if (values == null || values[row] == null) { + public InternalAggregation convertRow(Map> shardResult, int row, SearchContext searchContext) { + List values = shardResult.get(name); + if (values == null || values.get(row) == null) { return buildEmptyAggregation(); } - if (values[row].getClass().equals(LocalDateTime.class)) { - LocalDateTime value = (LocalDateTime) values[row]; + if (values.get(row).getClass().equals(LocalDateTime.class)) { + LocalDateTime value = (LocalDateTime) values.get(row); return new InternalMin(name, convertLocalDateTimeToEpochMillis(value), format, metadata()); } - return new InternalMin(name, ((Number) values[row]).doubleValue(), format, metadata()); + return new InternalMin(name, ((Number) values.get(row)).doubleValue(), format, metadata()); } @Override diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java index d2897e96bfcc7..b4f2200b510f1 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java @@ -220,11 +220,11 @@ public void doClose() { } @Override - public InternalAggregation convertRow(Map shardResult, int row, SearchContext searchContext) { - Object[] values = shardResult.get(name); - if(values == null || values[row] == null) { + public InternalAggregation convertRow(Map> shardResult, int row, SearchContext searchContext) { + List values = shardResult.get(name); + if(values == null || values.get(row) == null) { return buildEmptyAggregation(); } - return new InternalSum(name, ((Number) values[row]).doubleValue(), format, metadata()); + return new InternalSum(name, ((Number) values.get(row)).doubleValue(), format, metadata()); } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregator.java index 6736c2cf4ae28..1f28ed547317f 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/ValueCountAggregator.java @@ -55,6 +55,7 @@ import java.io.IOException; import java.util.Map; +import java.util.List; import static org.opensearch.search.startree.StarTreeQueryHelper.getSupportedStarTree; @@ -212,11 +213,11 @@ public StarTreeBucketCollector getStarTreeBucketCollector( } @Override - public InternalAggregation convertRow(Map shardResult, int row, SearchContext searchContext) { - Object[] values = shardResult.get(name); - if (values == null || values[row] == null) { + public InternalAggregation convertRow(Map> shardResult, int row, SearchContext searchContext) { + List values = shardResult.get(name); + if (values == null || values.get(row) == null) { return buildEmptyAggregation(); } - return new InternalValueCount(name, ((Number) values[row]).longValue(), metadata()); + return new InternalValueCount(name, ((Number) values.get(row)).longValue(), metadata()); } } diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index 53a60c8f8724b..dfccc936d3205 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -607,11 +607,11 @@ public boolean setFlushModeIfAbsent(FlushMode flushMode) { } - public void setDFResults(Map dfResults) { + public void setDFResults(Map> dfResults) { } - public Map getDFResults() { + public Map> getDFResults() { return Collections.emptyMap(); } diff --git a/server/src/main/java/org/opensearch/search/query/SearchEngineResultConversionUtils.java b/server/src/main/java/org/opensearch/search/query/SearchEngineResultConversionUtils.java index 4f8dab69249b3..bf581cd080240 100644 --- a/server/src/main/java/org/opensearch/search/query/SearchEngineResultConversionUtils.java +++ b/server/src/main/java/org/opensearch/search/query/SearchEngineResultConversionUtils.java @@ -35,7 +35,7 @@ public class SearchEngineResultConversionUtils { public static void convertDFResultGeneric(SearchContext searchContext) { if (searchContext.aggregations() != null) { - Map dfResult = searchContext.getDFResults(); + Map> dfResult = searchContext.getDFResults(); // LOGGER.info("DF Results at convertDFResultGeneric:"); // for (Map.Entry entry : dfResult.entrySet()) { @@ -75,7 +75,7 @@ public static void convertDFResultGeneric(SearchContext searchContext) { } } - public static Tuple, Long> extractSubAggsAndDocCount(Aggregator[] subAggregators, SearchContext searchContext, Map shardResult, int row) { + public static Tuple, Long> extractSubAggsAndDocCount(Aggregator[] subAggregators, SearchContext searchContext, Map> shardResult, int row) { List subAggs = new ArrayList<>(); long docCount = -1; for (Aggregator aggregator : subAggregators) { @@ -91,9 +91,9 @@ public static Tuple, Long> extractSubAggsAndDocCount(A } } if (docCount == -1) { - Object[] values = shardResult.get(INJECTED_COUNT_AGG_NAME); + List values = shardResult.get(INJECTED_COUNT_AGG_NAME); if (values != null) { - docCount = ((Number) values[row]).longValue(); + docCount = ((Number) values.get(row)).longValue(); } else { throw new IllegalStateException(String.format("Unable to populate doc count from shard result [%s]", shardResult.keySet())); } From 15b60915ea3084ec070aa9b4c914b0de4ed52ed7 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Wed, 18 Feb 2026 18:50:09 +0530 Subject: [PATCH 2/4] move fieldValues to ArrayList type --- .../org/opensearch/datafusion/DatafusionEngine.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java index 043156e0ae9f8..c6f353fe40186 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java @@ -307,23 +307,22 @@ public void collect(RecordBatchStream value) { for (Field field : root.getSchema().getFields()) { String fieldName = field.getName(); FieldVector fieldVector = root.getVector(fieldName); - Object[] fieldValues = new Object[fieldVector.getValueCount()]; + List fieldValues = new ArrayList<>(); if (fieldName.equals(CompositeDataFormatWriter.ROW_ID)) { FieldVector rowIdVector = root.getVector(fieldName); for(int i=0; i(Arrays.asList(fieldValues))); + finalRes.put(fieldName, fieldValues); } } } From 830c8978007ed26bf4da4579cbdde5d3e09d6f36 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Wed, 18 Feb 2026 19:41:12 +0530 Subject: [PATCH 3/4] Add fix capacity for FieldValues --- .../main/java/org/opensearch/datafusion/DatafusionEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java index c6f353fe40186..ea3c3522cf2ba 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java @@ -307,7 +307,7 @@ public void collect(RecordBatchStream value) { for (Field field : root.getSchema().getFields()) { String fieldName = field.getName(); FieldVector fieldVector = root.getVector(fieldName); - List fieldValues = new ArrayList<>(); + List fieldValues = new ArrayList<>(fieldVector.getValueCount()); if (fieldName.equals(CompositeDataFormatWriter.ROW_ID)) { FieldVector rowIdVector = root.getVector(fieldName); for(int i=0; i Date: Thu, 19 Feb 2026 09:27:12 +0530 Subject: [PATCH 4/4] Add abstraction for DF results in vectorised SPI Signed-off-by: Arpit Bandejiya --- .../execution/search/spi/QueryResult.java | 28 ++++++++++++++++ .../datafusion/DatafusionEngine.java | 19 ++++++----- .../datafusion/search/DatafusionContext.java | 6 ++-- .../datafusion/search/DfResult.java | 33 +++++++++++++++++++ server/build.gradle | 1 + .../index/engine/SearchExecEngine.java | 4 +-- .../org/opensearch/search/SearchService.java | 5 +-- .../aggregations/ShardResultConvertor.java | 4 ++- .../bucket/composite/CompositeAggregator.java | 4 ++- .../terms/AbstractStringTermsAggregator.java | 4 ++- .../bucket/terms/MultiTermsAggregator.java | 4 ++- .../bucket/terms/NumericTermsAggregator.java | 13 +++++--- .../search/internal/SearchContext.java | 9 +++-- .../SearchEngineResultConversionUtils.java | 9 ++--- 14 files changed, 106 insertions(+), 37 deletions(-) create mode 100644 libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/search/spi/QueryResult.java create mode 100644 plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DfResult.java diff --git a/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/search/spi/QueryResult.java b/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/search/spi/QueryResult.java new file mode 100644 index 0000000000000..8a5d26497be97 --- /dev/null +++ b/libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/search/spi/QueryResult.java @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.vectorized.execution.search.spi; + +import java.util.List; +import java.util.Map; + +/** + * Service Provider Interface for query execution results. + * Implementations provide access to columnar query results from different execution engines. + * + * @opensearch.experimental + */ +public interface QueryResult { + + /** + * Returns the columnar result data where each entry maps a column name to its list of values. + * + * @return Map of column names to their corresponding value lists + */ + Map> getColumns(); +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java index ea3c3522cf2ba..01929ec311c6b 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java @@ -68,6 +68,7 @@ import org.opensearch.search.fetch.subphase.FetchSourceContext; import org.opensearch.search.internal.ReaderContext; import org.opensearch.search.internal.SearchContext; +import org.opensearch.datafusion.search.DfResult; import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.search.lookup.SourceLookup; @@ -282,18 +283,18 @@ public void executeQueryPhase(DatafusionContext context) { throw new RuntimeException(e); } } - context.setDFResults(finalRes); + context.setDFResults(new DfResult(finalRes)); context.queryResult().topDocs(new TopDocsAndMaxScore(new TopDocs(new TotalHits(rowIdResult.size(), TotalHits.Relation.EQUAL_TO), rowIdResult.stream().map(d-> new ScoreDoc(d.intValue(), Float.NaN, context.indexShard().shardId().getId())).toList().toArray(ScoreDoc[]::new)) , Float.NaN), new DocValueFormat[0]); } @Override - public void executeQueryPhaseAsync(DatafusionContext context, Executor executor, ActionListener>> listener) { + public void executeQueryPhaseAsync(DatafusionContext context, Executor executor, ActionListener listener) { try { DatafusionSearcher datafusionSearcher = context.getEngineSearcher(); context.getDatafusionQuery().setQueryPlanExplainEnabled(context.evaluateSearchQueryExplainMode()); datafusionSearcher.searchAsync(context.getDatafusionQuery(), datafusionService.getRuntimePointer()).whenCompleteAsync((streamPointer, error)-> { - Map> finalRes = new HashMap<>(); + Map> finalResColumns = new HashMap<>(); List rowIdResult = new ArrayList<>(); if(streamPointer == null) { throw new RuntimeException(error); @@ -319,15 +320,15 @@ public void collect(RecordBatchStream value) { fieldValues.add(fieldVector.getObject(i)); } } - if(finalRes.containsKey(fieldName)) { - finalRes.get(fieldName).addAll(fieldValues); + if(finalResColumns.containsKey(fieldName)) { + finalResColumns.get(fieldName).addAll(fieldValues); } else { - finalRes.put(fieldName, fieldValues); + finalResColumns.put(fieldName, fieldValues); } } } }; - loadNextBatch(stream, executor, collector, finalRes, allocator, listener, context, rowIdResult); + loadNextBatch(stream, executor, collector, finalResColumns, allocator, listener, context, rowIdResult); }); // logger.info("Memory Pool Allocation Post Query ShardID:{}", context.getQueryShardContext().getShardId()); @@ -352,7 +353,7 @@ private void loadNextBatch( SearchResultsCollector collector, Map> finalRes, RootAllocator allocator, - ActionListener>> listener, + ActionListener listener, DatafusionContext context, List rowIdResult ) { @@ -373,7 +374,7 @@ private void loadNextBatch( TotalHits.Relation.EQUAL_TO), rowIdResult.stream().map(d-> new ScoreDoc(d.intValue(), Float.NaN, context.indexShard().shardId().getId())).toList().toArray(ScoreDoc[]::new)) , Float.NaN), new DocValueFormat[0]); // ArrayList<> --> Object[] - listener.onResponse(finalRes); + listener.onResponse(new DfResult(finalRes)); } }, error -> { cleanup(stream, allocator); diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionContext.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionContext.java index eaefeb70bc655..1b51d2037eebb 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionContext.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DatafusionContext.java @@ -86,7 +86,7 @@ public class DatafusionContext extends SearchContext { private final IndexService indexService; private final QueryShardContext queryShardContext; private DatafusionQuery datafusionQuery; - private Map> dfResults; + private QueryResult dfResults; private SearchContextAggregations aggregations; private final BigArrays bigArrays; private final Map, CollectorManager> queryCollectorManagers = new HashMap<>(); @@ -825,11 +825,11 @@ public ContextEngineSearcher contextEngineSe return new ContextEngineSearcher<>(this.engineSearcher, this); } - public void setDFResults(Map> dfResults) { + public void setDFResults(QueryResult dfResults) { this.dfResults = dfResults; } - public Map> getDFResults() { + public QueryResult getDFResults() { return dfResults; } diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DfResult.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DfResult.java new file mode 100644 index 0000000000000..67aff9c0cfd47 --- /dev/null +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/search/DfResult.java @@ -0,0 +1,33 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.datafusion.search; + +import org.opensearch.vectorized.execution.search.spi.QueryResult; + +import java.util.List; +import java.util.Map; + +/** + * Wraps the columnar result from a DataFusion query execution. + * Each entry maps a column name to its list of values. + * Implements the QueryResult SPI to allow usage in core without creating a dependency. + */ +public class DfResult implements QueryResult { + + private final Map> columns; + + public DfResult(Map> columns) { + this.columns = columns; + } + + @Override + public Map> getColumns() { + return columns; + } +} diff --git a/server/build.gradle b/server/build.gradle index 4fb82b79f0ddb..ee32550cf3ec4 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -72,6 +72,7 @@ dependencies { api project(":libs:opensearch-geo") api project(":libs:opensearch-telemetry") api project(":libs:opensearch-task-commons") + api project(":libs:opensearch-vectorized-exec-spi") compileOnly project(":libs:agent-sm:bootstrap") compileOnly project(':libs:opensearch-plugin-classloader') diff --git a/server/src/main/java/org/opensearch/index/engine/SearchExecEngine.java b/server/src/main/java/org/opensearch/index/engine/SearchExecEngine.java index 1c138a6cddf10..a920d0999c6ac 100644 --- a/server/src/main/java/org/opensearch/index/engine/SearchExecEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/SearchExecEngine.java @@ -18,9 +18,9 @@ import org.opensearch.search.internal.ReaderContext; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.internal.ShardSearchRequest; +import org.opensearch.vectorized.execution.search.spi.QueryResult; import java.io.IOException; -import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -45,7 +45,7 @@ public abstract class SearchExecEngine>> listener); + public abstract void executeQueryPhaseAsync(C context, Executor executor, ActionListener listener); /** * execute Fetch Phase diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index e46432a6f72a5..5ed49e5604855 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -142,6 +142,7 @@ import org.opensearch.search.profile.ProfileShardResult; import org.opensearch.search.profile.Profilers; import org.opensearch.search.profile.SearchProfileShardResults; +import org.opensearch.vectorized.execution.search.spi.QueryResult; import org.opensearch.search.query.*; import org.opensearch.search.rescore.RescorerBuilder; import org.opensearch.search.searchafter.SearchAfterBuilder; @@ -958,9 +959,9 @@ private void executeNativeQueryPhaseAsync( SearchExecEngine searchExecEngine = indexer instanceof CompositeEngine ? ((CompositeEngine) indexer).getPrimaryReadEngine() : null; // Execute native query async - searchExecEngine.executeQueryPhaseAsync(finalContext, executor, new ActionListener>>() { + searchExecEngine.executeQueryPhaseAsync(finalContext, executor, new ActionListener() { @Override - public void onResponse(Map> result) { + public void onResponse(QueryResult result) { try { finalContext.setDFResults(result); // Continue with rest of query phase diff --git a/server/src/main/java/org/opensearch/search/aggregations/ShardResultConvertor.java b/server/src/main/java/org/opensearch/search/aggregations/ShardResultConvertor.java index 61b9a7ec88977..452087375b214 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/ShardResultConvertor.java +++ b/server/src/main/java/org/opensearch/search/aggregations/ShardResultConvertor.java @@ -9,6 +9,7 @@ package org.opensearch.search.aggregations; import org.opensearch.search.internal.SearchContext; +import org.opensearch.vectorized.execution.search.spi.QueryResult; import java.util.ArrayList; import java.util.List; @@ -16,7 +17,8 @@ public interface ShardResultConvertor { - default List convert(Map> shardResult, SearchContext searchContext) { + default List convert(QueryResult queryResult, SearchContext searchContext) { + Map> shardResult = queryResult.getColumns(); int rows = shardResult.entrySet().stream().findFirst().get().getValue().size(); List internalAggregations = new ArrayList<>(); for (int i = 0; i < rows; i++) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java index e7a8ffd9d9ac2..6c61db56d155d 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -61,6 +61,7 @@ import org.opensearch.common.Rounding; import org.opensearch.common.collect.Tuple; import org.opensearch.common.lease.Releasables; +import org.opensearch.vectorized.execution.search.spi.QueryResult; import org.opensearch.index.IndexSortConfig; import org.opensearch.lucene.queries.SearchAfterSortedDocQuery; import org.opensearch.search.DocValueFormat; @@ -734,7 +735,8 @@ public void collect(int doc, long zeroBucket) throws IOException { } @Override - public List convert(Map> shardResult, SearchContext searchContext) { + public List convert(QueryResult dfResult, SearchContext searchContext) { + Map> shardResult = dfResult.getColumns(); if(shardResult.isEmpty()) { return Collections.singletonList(buildEmptyAggregation()); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java index 57d9538024a45..1fa34e3230d52 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/AbstractStringTermsAggregator.java @@ -35,6 +35,7 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.util.BytesRef; import org.opensearch.common.collect.Tuple; +import org.opensearch.vectorized.execution.search.spi.QueryResult; import org.opensearch.search.DocValueFormat; import org.opensearch.search.aggregations.Aggregator; import org.opensearch.search.aggregations.AggregatorFactories; @@ -120,7 +121,8 @@ protected SignificantStringTerms buildEmptySignificantTermsAggregation(long subs } @Override - public List convert(Map> shardResult, SearchContext searchContext) { + public List convert(QueryResult dfResult, SearchContext searchContext) { + Map> shardResult = dfResult.getColumns(); if(shardResult.isEmpty()) { return Collections.singletonList(buildEmptyTermsAggregation()); } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java index da7a9f8be5b2e..f36a7e17aee6a 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/MultiTermsAggregator.java @@ -20,6 +20,7 @@ import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; +import org.opensearch.vectorized.execution.search.spi.QueryResult; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -709,7 +710,8 @@ static InternalValuesSource doubleValueSource(ValuesSource.Numeric valuesSource, } @Override - public List convert(Map> shardResult, SearchContext searchContext) { + public List convert(QueryResult dfResult, SearchContext searchContext) { + Map> shardResult = dfResult.getColumns(); int rowCount = shardResult.isEmpty() ? 0 : shardResult.get(fields.getFirst()).size() ; List buckets = new ArrayList<>(rowCount); for (int i = 0; i < rowCount; i++) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java index 2c9c1ad322ff2..bdf01665519fb 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/NumericTermsAggregator.java @@ -42,6 +42,7 @@ import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.LongArray; +import org.opensearch.vectorized.execution.search.spi.QueryResult; import org.opensearch.index.codec.composite.CompositeIndexFieldInfo; import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues; import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator; @@ -524,7 +525,8 @@ LongTerms buildEmptyResult() { } @Override - public List convert(Map> shardResult, SearchContext searchContext) { + public List convert(QueryResult dfResult, SearchContext searchContext) { + Map> shardResult = dfResult.getColumns(); int rowCount = shardResult.isEmpty() ? 0 : shardResult.get(name).size() ; List buckets = new ArrayList<>(rowCount); for (int i = 0; i < rowCount; i++) { @@ -641,7 +643,8 @@ DoubleTerms buildEmptyResult() { } @Override - public List convert(Map> shardResult, SearchContext searchContext) { + public List convert(QueryResult dfResult, SearchContext searchContext) { + Map> shardResult = dfResult.getColumns(); if(shardResult.isEmpty()) { return Collections.singletonList(buildEmptyAggregation()); } @@ -878,12 +881,12 @@ public void close() { } @Override - public List convert(Map> shardResult, SearchContext searchContext) { - if(shardResult.isEmpty()) { + public List convert(QueryResult dfResult, SearchContext searchContext) { + if(dfResult.getColumns().isEmpty()) { return Collections.singletonList(buildEmptyAggregation()); } if (resultStrategy instanceof ShardResultConvertor) { - return ((ShardResultConvertor) resultStrategy).convert(shardResult, searchContext); + return ((ShardResultConvertor) resultStrategy).convert(dfResult, searchContext); } else { throw new UnsupportedOperationException("Result strategy not supported for conversion " + resultStrategy.getClass().getName()); } diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index dfccc936d3205..17120a5bbb2a1 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -79,6 +79,7 @@ import org.opensearch.search.query.QuerySearchResult; import org.opensearch.search.query.ReduceableSearchResult; import org.opensearch.search.rescore.RescoreContext; +import org.opensearch.vectorized.execution.search.spi.QueryResult; import org.opensearch.search.slice.SliceBuilder; import org.opensearch.search.sort.SortAndFormats; import org.opensearch.search.streaming.FlushMode; @@ -607,12 +608,10 @@ public boolean setFlushModeIfAbsent(FlushMode flushMode) { } - public void setDFResults(Map> dfResults) { + public void setDFResults(QueryResult dfResults) {} - } - - public Map> getDFResults() { - return Collections.emptyMap(); + public QueryResult getDFResults() { + return null; } // TODO : This should be a part of mapper given by DataFormat or SearchEngine as related to Field type. diff --git a/server/src/main/java/org/opensearch/search/query/SearchEngineResultConversionUtils.java b/server/src/main/java/org/opensearch/search/query/SearchEngineResultConversionUtils.java index bf581cd080240..a5b8b8ee62685 100644 --- a/server/src/main/java/org/opensearch/search/query/SearchEngineResultConversionUtils.java +++ b/server/src/main/java/org/opensearch/search/query/SearchEngineResultConversionUtils.java @@ -19,6 +19,7 @@ import org.opensearch.search.aggregations.metrics.InternalCardinality; import org.opensearch.search.aggregations.metrics.InternalValueCount; import org.opensearch.search.aggregations.metrics.ValueCountAggregator; +import org.opensearch.vectorized.execution.search.spi.QueryResult; import org.opensearch.search.internal.SearchContext; import java.io.IOException; @@ -35,14 +36,8 @@ public class SearchEngineResultConversionUtils { public static void convertDFResultGeneric(SearchContext searchContext) { if (searchContext.aggregations() != null) { - Map> dfResult = searchContext.getDFResults(); + QueryResult dfResult = searchContext.getDFResults(); -// LOGGER.info("DF Results at convertDFResultGeneric:"); -// for (Map.Entry entry : dfResult.entrySet()) { -// LOGGER.info("{}: {}", entry.getKey(), java.util.Arrays.toString(entry.getValue())); -// } - - // Create aggregators which will process the result from DataFusion try { List aggregators = new ArrayList<>();