diff --git a/modules/mapper-extras/src/main/java/org/opensearch/index/mapper/ScaledFloatFieldMapper.java b/modules/mapper-extras/src/main/java/org/opensearch/index/mapper/ScaledFloatFieldMapper.java index 46f403251c323..424fa69f150e8 100644 --- a/modules/mapper-extras/src/main/java/org/opensearch/index/mapper/ScaledFloatFieldMapper.java +++ b/modules/mapper-extras/src/main/java/org/opensearch/index/mapper/ScaledFloatFieldMapper.java @@ -540,7 +540,7 @@ protected void canDeriveSourceInternal() { * both doc values and stored field */ @Override - protected DerivedFieldGenerator derivedFieldGenerator() { + public DerivedFieldGenerator derivedFieldGenerator() { return new DerivedFieldGenerator( mappedFieldType, new SortedNumericDocValuesFetcher(mappedFieldType, simpleName()), diff --git a/plugins/engine-datafusion/jni/src/lib.rs b/plugins/engine-datafusion/jni/src/lib.rs index d45d3a88f5025..416295330fcd9 100644 --- a/plugins/engine-datafusion/jni/src/lib.rs +++ b/plugins/engine-datafusion/jni/src/lib.rs @@ -5,15 +5,17 @@ * this file be licensed under the Apache-2.0 license or a * compatible open source license. */ +use std::collections::{BTreeSet, HashMap}; +use std::fs::File; use std::ptr::addr_of_mut; -use jni::objects::{JByteArray, JClass, JObject}; +use jni::objects::{JByteArray, JClass, JIntArray, JObject}; use jni::sys::{jbyteArray, jlong, jstring}; use jni::JNIEnv; use std::sync::Arc; use arrow_array::{Array, StructArray}; use arrow_array::ffi::FFI_ArrowArray; -use arrow_schema::DataType; use arrow_schema::ffi::FFI_ArrowSchema; +use datafusion::common::DataFusionError; mod util; mod row_id_optimizer; @@ -21,7 +23,7 @@ mod listing_table; use datafusion::execution::context::SessionContext; -use crate::util::{create_object_meta_from_filenames, parse_string_arr, set_object_result_error, set_object_result_ok}; +use crate::util::{create_object_meta_from_filenames, parse_string_arr, set_object_result, set_object_result_error, set_object_result_ok}; use datafusion::datasource::file_format::csv::CsvFormat; use datafusion::datasource::listing::{ListingTableUrl}; use datafusion::execution::cache::cache_manager::CacheManagerConfig; @@ -31,12 +33,22 @@ use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder}; use datafusion::prelude::SessionConfig; use datafusion::DATAFUSION_VERSION; use datafusion::datasource::file_format::parquet::ParquetFormat; -use datafusion::physical_plan::SendableRecordBatchStream; +use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::datasource::physical_plan::parquet::{ParquetAccessPlan, RowGroupAccess}; +use datafusion::datasource::physical_plan::ParquetSource; +use datafusion::execution::TaskContext; +use datafusion::parquet::arrow::arrow_reader::RowSelector; +use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; +use datafusion_datasource::file_groups::FileGroup; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::PartitionedFile; +use datafusion_datasource::source::DataSourceExec; use datafusion_substrait::logical_plan::consumer::from_substrait_plan; use datafusion_substrait::substrait::proto::Plan; use futures::TryStreamExt; use jni::objects::{JObjectArray, JString}; use object_store::ObjectMeta; +use parquet::file::reader::{FileReader, SerializedFileReader}; use prost::Message; use tokio::runtime::Runtime; use crate::listing_table::{ListingOptions, ListingTable, ListingTableConfig}; @@ -430,3 +442,249 @@ pub extern "system" fn Java_org_opensearch_datafusion_RecordBatchStream_getSchem } } } + +#[no_mangle] +pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_executeFetchPhase( + mut env: JNIEnv, + _class: JClass, + shard_view_ptr: jlong, + values: JIntArray, // TODO : Add projections + tokio_runtime_env_ptr: jlong, + callback: JObject, +) { + + + let shard_view = unsafe { &*(shard_view_ptr as *const ShardView) }; + let runtime_ptr = unsafe { &*(tokio_runtime_env_ptr as *const Runtime)}; + + let table_path = shard_view.table_path(); + let files_meta = shard_view.files_meta(); + + + // Safety checks first + if values.is_null() { + let _ = env.throw_new("java/lang/NullPointerException", "values array is null"); + return; + } + + // Get array length + let array_length = match env.get_array_length(&values) { + Ok(len) => len, + Err(e) => { + let _ = env.throw_new("java/lang/RuntimeException", + format!("Failed to get array length: {:?}", e)); + return; + } + }; + + // Create a buffer to hold the array data + let mut row_ids = vec![0; array_length as usize]; + + // Copy the array data into the buffer + match env.get_int_array_region(&values, 0, &mut row_ids) { + Ok(_) => { + // Now buffer contains the array data + // println!("Received array of length {}: {:?}", array_length, row_ids); + }, + Err(e) => { + let _ = env.throw_new("java/lang/RuntimeException", + format!("Failed to get array data: {:?}", e)); + return; + } + } + + + // Safety checks + if tokio_runtime_env_ptr == 0 { + let error = DataFusionError::Execution("Null runtime pointer".to_string()); + set_object_result_error(&mut env, callback, &error); + return; + } + + let access_plans = create_access_plans(row_ids, files_meta.clone()); + + + let runtime_env = RuntimeEnvBuilder::new() + .with_cache_manager(CacheManagerConfig::default() + //.with_list_files_cache(Some(list_file_cache)) TODO: //Fix this + ).build().unwrap(); + let ctx = SessionContext::new_with_config_rt(SessionConfig::new(), Arc::new(runtime_env)); + + // Create default parquet options + let file_format = ParquetFormat::new(); + let listing_options = ListingOptions::new(Arc::new(file_format)) + .with_file_extension(".parquet"); // TODO: take this as parameter + // .with_table_partition_cols(vec![("row_base".to_string(), DataType::Int32)]); // TODO: enable only for query phase + + // Ideally the executor will give this + + + + runtime_ptr.block_on(async { + + let parquet_schema = listing_options + .infer_schema(&ctx.state(), &table_path.clone()) + .await.unwrap(); + + let partitioned_files: Vec = files_meta + .iter() + .zip(access_plans.await.iter()) + .map(|(meta, access_plan)| { + PartitionedFile::new( + meta.location.clone(), + meta.size + ).with_extensions(Arc::new(access_plan.clone())) + }) + .collect(); + + let file_group = FileGroup::new(partitioned_files); + + let file_source = Arc::new( + ParquetSource::default() + // provide the factory to create parquet reader without re-reading metadata + //.with_parquet_file_reader_factory(Arc::new(reader_factory)), + ); + + let file_scan_config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), parquet_schema.clone(), file_source) + //.with_limit(limit) + // .with_projection(projection) + .with_file_group(file_group) + .build(); + + let parquet_exec = DataSourceExec::from_data_source(file_scan_config); + + // IMPORTANT: Only get one reference to each pointer + // let liquid_ctx = unsafe { &mut *(context_ptr as *mut SessionContext) }; + // let session_ctx = unsafe { Box::from_raw(context_ptr as *mut SessionContext) }; + let mut optimized_plan: Arc = parquet_exec.clone(); + + + + let task_ctx = Arc::new(TaskContext::default()); + + // let runtime = unsafe { &mut *(runtime as *mut Runtime) }; + // runtime.block_on(async { + match optimized_plan.execute(0, task_ctx) { + Ok(stream) => { + let boxed_stream = Box::new(stream); + let stream_ptr = Box::into_raw(boxed_stream); + set_object_result_ok( + &mut env, + callback, + stream_ptr + ); + } + Err(e) => { + set_object_result_error( + &mut env, + callback, + &e + ); + } + } + }); +} + +async fn create_access_plans( + row_ids: Vec, + files_meta: Arc> +) -> Result, DataFusionError> { + let mut parquet_metadata = Vec::new(); + + // First get ParquetMetaData for all files + for meta in files_meta.iter() { + let path = meta.location.as_ref(); + let file = File::open(path)?; + let reader = SerializedFileReader::new(file).unwrap(); + parquet_metadata.push(reader.metadata().clone()); + } + + let mut access_plans = Vec::new(); + let mut cumulative_rows = 0; + + // Process each file + for (file_idx, metadata) in parquet_metadata.iter().enumerate() { + let total_row_groups = metadata.num_row_groups(); + let mut access_plan = ParquetAccessPlan::new_all(total_row_groups); + + // Calculate total rows in current file + let mut file_total_rows = 0; + let mut row_group_map: HashMap = HashMap::new(); + for group_id in 0..total_row_groups { + let rows_in_group = metadata.row_group(group_id).num_rows() as i32; + row_group_map.insert(group_id, rows_in_group); + file_total_rows += rows_in_group; + } + + // Filter row IDs that belong to this file + let file_row_ids: Vec = row_ids.iter() + .filter(|&&id| id >= cumulative_rows && id < cumulative_rows + file_total_rows) + .map(|&id| id - cumulative_rows) // Convert global row IDs to local + .collect(); + + if file_row_ids.is_empty() { + // If no rows belong to this file, skip all row groups + for group_id in 0..total_row_groups { + access_plan.skip(group_id); + } + } else { + // Group local row IDs by row group + let mut group_map: HashMap> = HashMap::new(); + for row_id in file_row_ids { + let rows_per_group = *row_group_map.get(&0).unwrap(); + group_map.entry((row_id / rows_per_group) as usize) + .or_default() + .insert(row_id % rows_per_group); + } + + // Process each row group + for group_id in 0..total_row_groups { + let row_group_size = *row_group_map.get(&group_id).unwrap() as usize; + if let Some(group_row_ids) = group_map.get(&group_id) { + let mut relative_row_ids: Vec = group_row_ids.iter() + .map(|&x| x as usize) + .collect(); + + if relative_row_ids.is_empty() { + access_plan.skip(group_id); + } else if relative_row_ids.len() == row_group_size { + access_plan.scan(group_id); + } else { + // Create selectors + let mut selectors = Vec::new(); + let mut current_pos = 0; + let mut i = 0; + while i < relative_row_ids.len() { + let mut target_pos = relative_row_ids[i]; + if target_pos > current_pos { + selectors.push(RowSelector::skip(target_pos - current_pos)); + } + let mut select_count = 1; + while i + 1 < relative_row_ids.len() && + relative_row_ids[i + 1] == relative_row_ids[i] + 1 { + select_count += 1; + i += 1; + target_pos = relative_row_ids[i]; + } + selectors.push(RowSelector::select(select_count)); + current_pos = relative_row_ids[i] + 1; + i += 1; + } + if current_pos < row_group_size { + selectors.push(RowSelector::skip(row_group_size - current_pos)); + } + access_plan.set(group_id, RowGroupAccess::Selection(selectors.into())); + } + } else { + access_plan.skip(group_id); + } + } + } + + access_plans.push(access_plan); + cumulative_rows += file_total_rows; + } + + Ok(access_plans) +} diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionQueryJNI.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionQueryJNI.java index 6cdc09bd040f7..b0cb5089192cc 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionQueryJNI.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DataFusionQueryJNI.java @@ -115,6 +115,17 @@ private static synchronized void loadNativeLibrary() { */ public static native long executeSubstraitQuery(long cachePtr, byte[] substraitPlan, long runtimePtr); + /** + * Execute a Substrait query plan + * @param cachePtr the session context ID + * @param rowIds row ids for which record needs to fetch + * @param runtimePtr runtime pointer + * @return stream pointer for result iteration + */ + + // TODO: tie this to actual FetchPhase + public static native long executeFetchPhase(long cachePtr, long[] rowIds, long runtimePtr); + public static native long createDatafusionReader(String path, String[] files); public static native void closeDatafusionReader(long ptr); diff --git a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java index 2381322d76bf4..7adf4cefca71f 100644 --- a/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java +++ b/plugins/engine-datafusion/src/main/java/org/opensearch/datafusion/DatafusionEngine.java @@ -14,9 +14,12 @@ import org.apache.arrow.vector.types.pojo.Field; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchException; import org.opensearch.action.search.SearchShardTask; import org.opensearch.common.lease.Releasables; -import org.opensearch.datafusion.core.DefaultRecordBatchStream; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.datafusion.search.DatafusionContext; import org.opensearch.datafusion.search.DatafusionQuery; import org.opensearch.datafusion.search.DatafusionQueryPhaseExecutor; @@ -30,6 +33,7 @@ import org.opensearch.index.engine.EngineSearcherSupplier; import org.opensearch.index.engine.SearchExecEngine; import org.opensearch.index.engine.exec.FileMetadata; +import org.opensearch.index.mapper.*; import org.opensearch.search.aggregations.SearchResultsCollector; import org.opensearch.search.internal.ReaderContext; import org.opensearch.search.internal.ShardSearchRequest; @@ -39,10 +43,7 @@ import java.io.IOException; import java.io.UncheckedIOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; +import java.util.*; import java.util.function.Function; public class DatafusionEngine extends SearchExecEngine formatCatalogSnapshot, DataFusionService dataFusionService) throws IOException { this.dataFormat = dataFormat; - this.datafusionReaderManager = new DatafusionReaderManager("TODO://FigureOutPath", formatCatalogSnapshot); + this.datafusionReaderManager = new DatafusionReaderManager("/Users/animodak/Documents/logs/", formatCatalogSnapshot); // TODO: FigureOutPath this.datafusionService = dataFusionService; } @@ -195,4 +196,83 @@ public void collect(RecordBatchStream value) { } return finalRes; } + + + // TODO: make this Map? + public List executeFetchPhase(DatafusionContext context) throws IOException { +// List rowIds = context.getDFResults() // TODO: get row_ids, projections from dfContext which was returned in query phase + List rowIds = new ArrayList<>(); + List projectionFields = new ArrayList<>(); + + DatafusionSearcher datafusionSearcher = context.getEngineSearcher(); + long streamPointer = datafusionSearcher.search(context.getDatafusionQuery(), datafusionService.getTokioRuntimePointer()); // update to handle fetchPhase query + RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); + RecordBatchStream stream = new RecordBatchStream(streamPointer, datafusionService.getTokioRuntimePointer() , allocator); + return generateByteRefs(context, stream); + } + + public List generateByteRefs(DatafusionContext context, RecordBatchStream recordBatchStream) throws IOException { + MapperService mapperService = context.mapperService(); + List byteRefs = new ArrayList<>(); + while(recordBatchStream.loadNextBatch().join()) { + VectorSchemaRoot vectorSchemaRoot = recordBatchStream.getVectorSchemaRoot(); + List fieldVectorList = vectorSchemaRoot.getFieldVectors(); + for(int i=0; i generate(RecordBatchStream recordBatchStream) throws IOException { +// List byteRefs = new ArrayList<>(); +// while(recordBatchStream.loadNextBatch().join()) { +// VectorSchemaRoot vectorSchemaRoot = recordBatchStream.getVectorSchemaRoot(); +// List fieldVectorList = vectorSchemaRoot.getFieldVectors(); +// for(int i=0; i values) throws IOException { + fieldValueFetcher.write(builder, values); + } } diff --git a/server/src/main/java/org/opensearch/index/mapper/FieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/FieldMapper.java index 39a3a73c9529c..1fdec479dad91 100644 --- a/server/src/main/java/org/opensearch/index/mapper/FieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/FieldMapper.java @@ -600,7 +600,7 @@ protected Explicit ignoreMalformed() { * Method to create derived source generator for this field mapper, it is illegal to enable the * derived source feature and not implement this method for a field mapper */ - protected DerivedFieldGenerator derivedFieldGenerator() { + public DerivedFieldGenerator derivedFieldGenerator() { return null; } diff --git a/server/src/main/java/org/opensearch/index/mapper/GeoPointFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/GeoPointFieldMapper.java index 2910bd2856d2f..89844e14a351d 100644 --- a/server/src/main/java/org/opensearch/index/mapper/GeoPointFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/GeoPointFieldMapper.java @@ -219,7 +219,7 @@ protected void canDeriveSourceInternal() { * 4. When using stored field, order and duplicate values would be preserved */ @Override - protected DerivedFieldGenerator derivedFieldGenerator() { + public DerivedFieldGenerator derivedFieldGenerator() { return new DerivedFieldGenerator(mappedFieldType, new SortedNumericDocValuesFetcher(mappedFieldType, simpleName()) { @Override public Object convert(Object value) { diff --git a/server/src/main/java/org/opensearch/index/mapper/IpFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/IpFieldMapper.java index d3a604444c10f..5004b3d1ef1cb 100644 --- a/server/src/main/java/org/opensearch/index/mapper/IpFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/IpFieldMapper.java @@ -199,7 +199,7 @@ protected void canDeriveSourceInternal() { * 2. When using stored field, order and duplicate values would be preserved */ @Override - protected DerivedFieldGenerator derivedFieldGenerator() { + public DerivedFieldGenerator derivedFieldGenerator() { return new DerivedFieldGenerator( mappedFieldType, new SortedSetDocValuesFetcher(mappedFieldType, simpleName()), diff --git a/server/src/main/java/org/opensearch/index/mapper/KeywordFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/KeywordFieldMapper.java index 1c534153f2629..4ac4f73f6fc8c 100644 --- a/server/src/main/java/org/opensearch/index/mapper/KeywordFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/KeywordFieldMapper.java @@ -294,7 +294,7 @@ protected void canDeriveSourceInternal() { * 2. When using stored field, order and duplicate values would be preserved */ @Override - protected DerivedFieldGenerator derivedFieldGenerator() { + public DerivedFieldGenerator derivedFieldGenerator() { return new DerivedFieldGenerator( mappedFieldType, new SortedSetDocValuesFetcher(mappedFieldType, simpleName()), diff --git a/server/src/main/java/org/opensearch/index/mapper/Mapper.java b/server/src/main/java/org/opensearch/index/mapper/Mapper.java index 3b9024162656f..d6f5bdcbd9af2 100644 --- a/server/src/main/java/org/opensearch/index/mapper/Mapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/Mapper.java @@ -319,4 +319,8 @@ public void canDeriveSource() { public void deriveSource(XContentBuilder builder, LeafReader leafReader, int docId) throws IOException { throw new UnsupportedOperationException("Derived source field is not supported for [" + name() + "] field"); } + + public DerivedFieldGenerator derivedFieldGenerator() throws IOException { + throw new UnsupportedOperationException("Converting [" + name() + "] is not supported for [" + name() + "] field"); + } } diff --git a/server/src/main/java/org/opensearch/index/mapper/NumberFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/NumberFieldMapper.java index 30281b46c7c55..f245cb98673b3 100644 --- a/server/src/main/java/org/opensearch/index/mapper/NumberFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/NumberFieldMapper.java @@ -209,7 +209,7 @@ public boolean isDataCubeMetricSupported() { * compared to stored field(stored as float) */ @Override - protected DerivedFieldGenerator derivedFieldGenerator() { + public DerivedFieldGenerator derivedFieldGenerator() { return new DerivedFieldGenerator(mappedFieldType, new SortedNumericDocValuesFetcher(mappedFieldType, simpleName()) { @Override public Object convert(Object value) { diff --git a/server/src/main/java/org/opensearch/index/mapper/TextFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/TextFieldMapper.java index bb726893b3d17..66107621f1049 100644 --- a/server/src/main/java/org/opensearch/index/mapper/TextFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/TextFieldMapper.java @@ -1238,7 +1238,7 @@ protected void canDeriveSourceInternal() {} * Derive source using stored field, which would always be present for derived source enabled index field */ @Override - protected DerivedFieldGenerator derivedFieldGenerator() { + public DerivedFieldGenerator derivedFieldGenerator() { return new DerivedFieldGenerator(mappedFieldType, null, new StoredFieldFetcher(mappedFieldType, simpleName())) { @Override public FieldValueType getDerivedFieldPreference() { diff --git a/server/src/main/java/org/opensearch/index/mapper/WildcardFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/WildcardFieldMapper.java index 21179122c0b5e..4fbaae930023e 100644 --- a/server/src/main/java/org/opensearch/index/mapper/WildcardFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/WildcardFieldMapper.java @@ -928,7 +928,7 @@ protected void canDeriveSourceInternal() { * 1. When using doc values, for multi value field, result would be deduplicated and in sorted order */ @Override - protected DerivedFieldGenerator derivedFieldGenerator() { + public DerivedFieldGenerator derivedFieldGenerator() { return new DerivedFieldGenerator(mappedFieldType, new SortedSetDocValuesFetcher(mappedFieldType, simpleName()) { @Override public Object convert(Object value) {