diff --git a/rust/lance-core/src/datatypes.rs b/rust/lance-core/src/datatypes.rs index a0a1fd1817a..dd56e610f52 100644 --- a/rust/lance-core/src/datatypes.rs +++ b/rust/lance-core/src/datatypes.rs @@ -91,6 +91,10 @@ impl LogicalType { self.0 == "large_list" || self.0 == "large_list.struct" } + fn is_fixed_size_list_struct(&self) -> bool { + self.0.starts_with("fixed_size_list:struct:") + } + fn is_struct(&self) -> bool { self.0 == "struct" } diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index 1df60d65611..3b0169fcb7b 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -165,6 +165,16 @@ impl Field { lt if lt.is_large_list() => { DataType::LargeList(Arc::new(ArrowField::from(&self.children[0]))) } + lt if lt.is_fixed_size_list_struct() => { + // Parse size from "fixed_size_list:struct:N" + let size: i32 = + lt.0.split(':') + .next_back() + .expect("fixed_size_list:struct logical type missing size suffix") + .parse() + .expect("fixed_size_list:struct logical type has invalid size"); + DataType::FixedSizeList(Arc::new(ArrowField::from(&self.children[0])), size) + } lt if lt.is_struct() => { DataType::Struct(self.children.iter().map(ArrowField::from).collect()) } @@ -1076,6 +1086,9 @@ impl TryFrom<&ArrowField> for Field { .collect::>()?, DataType::List(item) => vec![Self::try_from(item.as_ref())?], DataType::LargeList(item) => vec![Self::try_from(item.as_ref())?], + DataType::FixedSizeList(item, _) if matches!(item.data_type(), DataType::Struct(_)) => { + vec![Self::try_from(item.as_ref())?] + } DataType::Map(entries, keys_sorted) => { // TODO: We only support keys_sorted=false for now, // because converting a rust arrow map field to the python arrow field will diff --git a/rust/lance-datagen/src/generator.rs b/rust/lance-datagen/src/generator.rs index e45cfdfa08d..068eb17c642 100644 --- a/rust/lance-datagen/src/generator.rs +++ b/rust/lance-datagen/src/generator.rs @@ -15,7 +15,7 @@ use arrow_array::{ make_array, types::{ArrowDictionaryKeyType, BinaryType, ByteArrayType, Utf8Type}, Array, BinaryArray, FixedSizeBinaryArray, FixedSizeListArray, Float32Array, LargeListArray, - LargeStringArray, ListArray, NullArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, + LargeStringArray, ListArray, MapArray, NullArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, RecordBatchOptions, RecordBatchReader, StringArray, StructArray, }; use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema, SchemaRef}; @@ -1712,6 +1712,85 @@ impl ArrayGenerator for RandomListGenerator { } } +/// Generates random map arrays where each map has 0-4 entries. +#[derive(Debug)] +struct RandomMapGenerator { + field: Arc, + entries_field: Arc, + keys_gen: Box, + values_gen: Box, + lengths_gen: Box, +} + +impl RandomMapGenerator { + fn new(keys_gen: Box, values_gen: Box) -> Self { + let entries_fields = Fields::from(vec![ + Field::new("keys", keys_gen.data_type().clone(), false), + Field::new("values", values_gen.data_type().clone(), true), + ]); + let entries_field = Arc::new(Field::new( + "entries", + DataType::Struct(entries_fields), + false, + )); + let map_type = DataType::Map(entries_field.clone(), false); + let field = Arc::new(Field::new("", map_type, true)); + let lengths_dist = Uniform::new_inclusive(0_i32, 4).unwrap(); + let lengths_gen = rand_with_distribution::>(lengths_dist); + + Self { + field, + entries_field, + keys_gen, + values_gen, + lengths_gen, + } + } +} + +impl ArrayGenerator for RandomMapGenerator { + fn generate( + &mut self, + length: RowCount, + rng: &mut rand_xoshiro::Xoshiro256PlusPlus, + ) -> Result, ArrowError> { + let lengths = self.lengths_gen.generate(length, rng)?; + let lengths = lengths.as_primitive::(); + let total_entries = lengths.values().iter().sum::() as u64; + let offsets = OffsetBuffer::from_lengths(lengths.values().iter().map(|v| *v as usize)); + + let keys = self.keys_gen.generate(RowCount::from(total_entries), rng)?; + let values = self + .values_gen + .generate(RowCount::from(total_entries), rng)?; + + let entries = StructArray::new( + Fields::from(vec![ + Field::new("keys", keys.data_type().clone(), false), + Field::new("values", values.data_type().clone(), true), + ]), + vec![keys, values], + None, + ); + + Ok(Arc::new(MapArray::try_new( + self.entries_field.clone(), + offsets, + entries, + None, + false, + )?)) + } + + fn data_type(&self) -> &DataType { + self.field.data_type() + } + + fn element_size_bytes(&self) -> Option { + None + } +} + #[derive(Debug)] struct NullArrayGenerator {} @@ -2754,6 +2833,13 @@ pub mod array { Box::new(RandomListGenerator::new(item_gen, is_large)) } + /// Generates random map arrays where each map has 0-4 entries. + pub fn rand_map(key_type: &DataType, value_type: &DataType) -> Box { + let keys_gen = rand_type(key_type); + let values_gen = rand_type(value_type); + Box::new(RandomMapGenerator::new(keys_gen, values_gen)) + } + pub fn rand_struct(fields: Fields) -> Box { let child_gens = fields .iter() @@ -2797,6 +2883,14 @@ pub mod array { DataType::FixedSizeBinary(size) => rand_fsb(*size), DataType::List(child) => rand_list(child.data_type(), false), DataType::LargeList(child) => rand_list(child.data_type(), true), + DataType::Map(entries_field, _) => { + let DataType::Struct(fields) = entries_field.data_type() else { + panic!("Map entries field must be a struct"); + }; + let key_type = fields[0].data_type(); + let value_type = fields[1].data_type(); + rand_map(key_type, value_type) + } DataType::Duration(unit) => match unit { TimeUnit::Second => rand::(), TimeUnit::Millisecond => rand::(), diff --git a/rust/lance-encoding/src/decoder.rs b/rust/lance-encoding/src/decoder.rs index 3c7e51f0cc0..a3bcfd3c27b 100644 --- a/rust/lance-encoding/src/decoder.rs +++ b/rust/lance-encoding/src/decoder.rs @@ -232,12 +232,14 @@ use snafu::location; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::{self, unbounded_channel}; +use lance_core::error::LanceOptionExt; use lance_core::{ArrowResult, Error, Result}; use tracing::instrument; use crate::compression::{DecompressionStrategy, DefaultDecompressionStrategy}; use crate::data::DataBlock; use crate::encoder::EncodedBatch; +use crate::encodings::logical::fixed_size_list::StructuralFixedSizeListScheduler; use crate::encodings::logical::list::StructuralListScheduler; use crate::encodings::logical::map::StructuralMapScheduler; use crate::encodings::logical::primitive::StructuralPrimitiveFieldScheduler; @@ -765,15 +767,23 @@ impl CoreFieldDecoderStrategy { ) } DataType::List(_) | DataType::LargeList(_) => { - let child = field - .children - .first() - .expect("List field must have a child"); + let child = field.children.first().expect_ok()?; let child_scheduler = self.create_structural_field_scheduler(child, column_infos)?; Ok(Box::new(StructuralListScheduler::new(child_scheduler)) as Box) } + DataType::FixedSizeList(inner, dimension) + if matches!(inner.data_type(), DataType::Struct(_)) => + { + let child = field.children.first().expect_ok()?; + let child_scheduler = + self.create_structural_field_scheduler(child, column_infos)?; + Ok(Box::new(StructuralFixedSizeListScheduler::new( + child_scheduler, + *dimension, + )) as Box) + } DataType::Map(_, keys_sorted) => { // TODO: We only support keys_sorted=false for now, // because converting a rust arrow map field to the python arrow field will @@ -784,10 +794,7 @@ impl CoreFieldDecoderStrategy { location: location!(), }); } - let entries_child = field - .children - .first() - .expect("Map field must have an entries child"); + let entries_child = field.children.first().expect_ok()?; let child_scheduler = self.create_structural_field_scheduler(entries_child, column_infos)?; Ok(Box::new(StructuralMapScheduler::new(child_scheduler)) diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 230e591501f..203b3b99642 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -20,6 +20,7 @@ use arrow_schema::DataType; use bytes::{Bytes, BytesMut}; use futures::future::BoxFuture; use lance_core::datatypes::{Field, Schema}; +use lance_core::error::LanceOptionExt; use lance_core::utils::bit::{is_pwr_two, pad_bytes_to}; use lance_core::{Error, Result}; use snafu::location; @@ -29,6 +30,7 @@ use crate::compression::{CompressionStrategy, DefaultCompressionStrategy}; use crate::compression_config::CompressionParams; use crate::decoder::PageEncoding; use crate::encodings::logical::blob::{BlobStructuralEncoder, BlobV2StructuralEncoder}; +use crate::encodings::logical::fixed_size_list::FixedSizeListStructuralEncoder; use crate::encodings::logical::list::ListStructuralEncoder; use crate::encodings::logical::map::MapStructuralEncoder; use crate::encodings::logical::primitive::PrimitiveStructuralEncoder; @@ -345,37 +347,39 @@ impl StructuralEncodingStrategy { } fn is_primitive_type(data_type: &DataType) -> bool { - matches!( - data_type, - DataType::Boolean - | DataType::Date32 - | DataType::Date64 - | DataType::Decimal128(_, _) - | DataType::Decimal256(_, _) - | DataType::Duration(_) - | DataType::Float16 - | DataType::Float32 - | DataType::Float64 - | DataType::Int16 - | DataType::Int32 - | DataType::Int64 - | DataType::Int8 - | DataType::Interval(_) - | DataType::Null - | DataType::Time32(_) - | DataType::Time64(_) - | DataType::Timestamp(_, _) - | DataType::UInt16 - | DataType::UInt32 - | DataType::UInt64 - | DataType::UInt8 - | DataType::FixedSizeBinary(_) - | DataType::FixedSizeList(_, _) - | DataType::Binary - | DataType::LargeBinary - | DataType::Utf8 - | DataType::LargeUtf8, - ) + match data_type { + DataType::FixedSizeList(inner, _) => Self::is_primitive_type(inner.data_type()), + _ => matches!( + data_type, + DataType::Boolean + | DataType::Date32 + | DataType::Date64 + | DataType::Decimal128(_, _) + | DataType::Decimal256(_, _) + | DataType::Duration(_) + | DataType::Float16 + | DataType::Float32 + | DataType::Float64 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::Int8 + | DataType::Interval(_) + | DataType::Null + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Timestamp(_, _) + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::UInt8 + | DataType::FixedSizeBinary(_) + | DataType::Binary + | DataType::LargeBinary + | DataType::Utf8 + | DataType::LargeUtf8, + ), + } } fn do_create_field_encoder( @@ -437,7 +441,7 @@ impl StructuralEncodingStrategy { } else { match data_type { DataType::List(_) | DataType::LargeList(_) => { - let child = field.children.first().expect("List should have a child"); + let child = field.children.first().expect_ok()?; let child_encoder = self.do_create_field_encoder( _encoding_strategy_root, child, @@ -450,6 +454,33 @@ impl StructuralEncodingStrategy { child_encoder, ))) } + DataType::FixedSizeList(inner, _) + if matches!(inner.data_type(), DataType::Struct(_)) => + { + if self.version < LanceFileVersion::V2_2 { + return Err(Error::NotSupported { + source: format!( + "FixedSizeList is only supported in Lance file format 2.2+, current version: {}", + self.version + ) + .into(), + location: location!(), + }); + } + // Complex FixedSizeList needs structural encoding + let child = field.children.first().expect_ok()?; + let child_encoder = self.do_create_field_encoder( + _encoding_strategy_root, + child, + column_index, + options, + root_field_metadata, + )?; + Ok(Box::new(FixedSizeListStructuralEncoder::new( + options.keep_original_array, + child_encoder, + ))) + } DataType::Map(_, keys_sorted) => { // TODO: We only support keys_sorted=false for now, // because converting a rust arrow map field to the python arrow field will diff --git a/rust/lance-encoding/src/encodings/logical.rs b/rust/lance-encoding/src/encodings/logical.rs index 4b1d186b79a..199f470f55b 100644 --- a/rust/lance-encoding/src/encodings/logical.rs +++ b/rust/lance-encoding/src/encodings/logical.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors pub mod blob; +pub mod fixed_size_list; pub mod list; pub mod map; pub mod primitive; diff --git a/rust/lance-encoding/src/encodings/logical/fixed_size_list.rs b/rust/lance-encoding/src/encodings/logical/fixed_size_list.rs new file mode 100644 index 00000000000..805ab2c96bb --- /dev/null +++ b/rust/lance-encoding/src/encodings/logical/fixed_size_list.rs @@ -0,0 +1,732 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Encoding support for complex FixedSizeList types (FSL with non-primitive children). +//! +//! Primitive FSL (e.g., `FixedSizeList`) is handled in the physical encoding layer. +//! This module handles FSL with complex children (Struct, Map, List) which require +//! structural encoding. + +use std::{ops::Range, sync::Arc}; + +use arrow_array::{cast::AsArray, Array, ArrayRef, GenericListArray, OffsetSizeTrait, StructArray}; +use arrow_buffer::{BooleanBufferBuilder, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow_schema::DataType; +use futures::future::BoxFuture; +use lance_arrow::deepcopy::deep_copy_nulls; +use lance_core::{Error, Result}; +use snafu::location; + +use crate::{ + decoder::{ + DecodedArray, FilterExpression, ScheduledScanLine, SchedulerContext, + StructuralDecodeArrayTask, StructuralFieldDecoder, StructuralFieldScheduler, + StructuralSchedulingJob, + }, + encoder::{EncodeTask, FieldEncoder, OutOfLineBuffers}, + repdef::RepDefBuilder, +}; + +/// A structural encoder for complex fixed-size list fields +/// +/// The FSL's validity is added to the rep/def builder along with the dimension +/// and the FSL array's values are passed to the child encoder. +pub struct FixedSizeListStructuralEncoder { + keep_original_array: bool, + child: Box, +} + +impl FixedSizeListStructuralEncoder { + pub fn new(keep_original_array: bool, child: Box) -> Self { + Self { + keep_original_array, + child, + } + } +} + +impl FieldEncoder for FixedSizeListStructuralEncoder { + fn maybe_encode( + &mut self, + array: ArrayRef, + external_buffers: &mut OutOfLineBuffers, + mut repdef: RepDefBuilder, + row_number: u64, + num_rows: u64, + ) -> Result> { + let fsl_arr = array + .as_fixed_size_list_opt() + .ok_or_else(|| Error::Internal { + message: "FixedSizeList encoder used for non-fixed-size-list data".to_string(), + location: location!(), + })?; + + let dimension = fsl_arr.value_length() as usize; + let values = fsl_arr.values().clone(); + + let validity = if self.keep_original_array { + array.nulls().cloned() + } else { + deep_copy_nulls(array.nulls()) + }; + repdef.add_fsl(validity.clone(), dimension, num_rows as usize); + + // FSL forces child elements to exist even under null rows. Normalize any + // nested lists under null FSL rows to null empty lists. + let values = if let Some(ref fsl_validity) = validity { + if needs_garbage_filtering(values.data_type()) { + let is_garbage = + expand_garbage_mask(&fsl_validity_to_garbage_mask(fsl_validity), dimension); + filter_fsl_child_garbage(values, &is_garbage) + } else { + values + } + } else { + values + }; + + self.child.maybe_encode( + values, + external_buffers, + repdef, + row_number, + num_rows * dimension as u64, + ) + } + + fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result> { + self.child.flush(external_buffers) + } + + fn num_columns(&self) -> u32 { + self.child.num_columns() + } + + fn finish( + &mut self, + external_buffers: &mut OutOfLineBuffers, + ) -> BoxFuture<'_, Result>> { + self.child.finish(external_buffers) + } +} + +/// A scheduler for complex fixed-size list fields +/// +/// Scales row ranges by the FSL dimension when scheduling child rows, +/// and scales scheduled rows back when reporting to the parent. +#[derive(Debug)] +pub struct StructuralFixedSizeListScheduler { + child: Box, + dimension: u64, +} + +impl StructuralFixedSizeListScheduler { + pub fn new(child: Box, dimension: i32) -> Self { + Self { + child, + dimension: dimension as u64, + } + } +} + +impl StructuralFieldScheduler for StructuralFixedSizeListScheduler { + fn schedule_ranges<'a>( + &'a self, + ranges: &[Range], + filter: &FilterExpression, + ) -> Result> { + // Scale ranges by dimension for the child - each FSL row becomes `dimension` child rows + let child_ranges: Vec> = ranges + .iter() + .map(|r| (r.start * self.dimension)..(r.end * self.dimension)) + .collect(); + let child = self.child.schedule_ranges(&child_ranges, filter)?; + Ok(Box::new(StructuralFixedSizeListSchedulingJob::new( + child, + self.dimension, + ))) + } + + fn initialize<'a>( + &'a mut self, + filter: &'a FilterExpression, + context: &'a SchedulerContext, + ) -> BoxFuture<'a, Result<()>> { + self.child.initialize(filter, context) + } +} + +#[derive(Debug)] +struct StructuralFixedSizeListSchedulingJob<'a> { + child: Box, + dimension: u64, +} + +impl<'a> StructuralFixedSizeListSchedulingJob<'a> { + fn new(child: Box, dimension: u64) -> Self { + Self { child, dimension } + } +} + +impl StructuralSchedulingJob for StructuralFixedSizeListSchedulingJob<'_> { + fn schedule_next(&mut self, context: &mut SchedulerContext) -> Result> { + // Get the child's scan lines (scheduled in terms of child struct rows) + let child_scan_lines = self.child.schedule_next(context)?; + + // Scale down rows_scheduled by dimension to convert from child rows to FSL rows + Ok(child_scan_lines + .into_iter() + .map(|scan_line| ScheduledScanLine { + decoders: scan_line.decoders, + rows_scheduled: scan_line.rows_scheduled / self.dimension, + }) + .collect()) + } +} + +/// A decoder for complex fixed-size list fields +/// +/// Drains `num_rows * dimension` from the child decoder and reconstructs +/// the FSL array with validity from the rep/def information. +#[derive(Debug)] +pub struct StructuralFixedSizeListDecoder { + child: Box, + data_type: DataType, +} + +impl StructuralFixedSizeListDecoder { + pub fn new(child: Box, data_type: DataType) -> Self { + Self { child, data_type } + } +} + +impl StructuralFieldDecoder for StructuralFixedSizeListDecoder { + fn accept_page(&mut self, child: crate::decoder::LoadedPageShard) -> Result<()> { + self.child.accept_page(child) + } + + fn drain(&mut self, num_rows: u64) -> Result> { + // For FixedSizeList, we need to drain num_rows * dimension from the child + let dimension = match &self.data_type { + DataType::FixedSizeList(_, d) => *d as u64, + _ => { + return Err(Error::Internal { + message: "FixedSizeListDecoder has non-FSL data type".to_string(), + location: location!(), + }); + } + }; + let child_task = self.child.drain(num_rows * dimension)?; + Ok(Box::new(StructuralFixedSizeListDecodeTask::new( + child_task, + self.data_type.clone(), + num_rows, + ))) + } + + fn data_type(&self) -> &DataType { + &self.data_type + } +} + +#[derive(Debug)] +struct StructuralFixedSizeListDecodeTask { + child_task: Box, + data_type: DataType, + num_rows: u64, +} + +impl StructuralFixedSizeListDecodeTask { + fn new( + child_task: Box, + data_type: DataType, + num_rows: u64, + ) -> Self { + Self { + child_task, + data_type, + num_rows, + } + } +} + +impl StructuralDecodeArrayTask for StructuralFixedSizeListDecodeTask { + fn decode(self: Box) -> Result { + let DecodedArray { array, mut repdef } = self.child_task.decode()?; + match &self.data_type { + DataType::FixedSizeList(child_field, dimension) => { + let num_rows = self.num_rows as usize; + let validity = repdef.unravel_fsl_validity(num_rows, *dimension as usize); + let fsl_array = arrow_array::FixedSizeListArray::try_new( + child_field.clone(), + *dimension, + array, + validity, + )?; + Ok(DecodedArray { + array: Arc::new(fsl_array), + repdef, + }) + } + _ => Err(Error::Internal { + message: "FixedSizeList decoder did not have a fixed-size list field".to_string(), + location: location!(), + }), + } + } +} + +// ======================= +// Garbage filtering +// ======================= + +/// Returns true if the data type contains any variable-length list-like types +/// (List, LargeList, ListView, LargeListView, Map) that need garbage filtering. +fn needs_garbage_filtering(data_type: &DataType) -> bool { + match data_type { + DataType::List(_) + | DataType::LargeList(_) + | DataType::ListView(_) + | DataType::LargeListView(_) + | DataType::Map(_, _) => true, + DataType::Struct(fields) => fields + .iter() + .any(|f| needs_garbage_filtering(f.data_type())), + DataType::FixedSizeList(field, _) => needs_garbage_filtering(field.data_type()), + _ => false, + } +} + +/// Filters garbage (undefined data under null FSL rows) from nested list-like types. +/// Unlike variable-length lists which can remove null children entirely, FSL children +/// always exist, so we must clean any nested lists before encoding. +/// +/// NB: Nested FSL is currently precluded at a higher level in our system. However, this code +/// supports and tests it. +fn filter_fsl_child_garbage(array: ArrayRef, is_garbage: &[bool]) -> ArrayRef { + debug_assert_eq!(array.len(), is_garbage.len()); + + match array.data_type() { + DataType::List(_) => filter_list_garbage(array.as_list::(), is_garbage), + DataType::LargeList(_) => filter_list_garbage(array.as_list::(), is_garbage), + DataType::ListView(_) | DataType::LargeListView(_) => { + unimplemented!("ListView inside complex FSL is not yet supported") + } + DataType::Map(_, _) => filter_map_garbage(array.as_map(), is_garbage), + DataType::FixedSizeList(_, dim) => { + filter_nested_fsl_garbage(array.as_fixed_size_list(), is_garbage, *dim as usize) + } + DataType::Struct(_) => filter_struct_garbage(array.as_struct(), is_garbage), + _ => array, + } +} + +fn filter_struct_garbage(struct_arr: &StructArray, is_garbage: &[bool]) -> ArrayRef { + let needs_filtering = struct_arr + .fields() + .iter() + .any(|f| needs_garbage_filtering(f.data_type())); + + if !needs_filtering { + return Arc::new(struct_arr.clone()); + } + + let new_columns: Vec = struct_arr + .columns() + .iter() + .zip(struct_arr.fields().iter()) + .map(|(col, field)| { + if needs_garbage_filtering(field.data_type()) { + filter_fsl_child_garbage(col.clone(), is_garbage) + } else { + col.clone() + } + }) + .collect(); + + Arc::new(StructArray::new( + struct_arr.fields().clone(), + new_columns, + struct_arr.nulls().cloned(), + )) +} + +fn expand_garbage_mask(is_garbage: &[bool], dimension: usize) -> Vec { + let mut expanded = Vec::with_capacity(is_garbage.len() * dimension); + for &garbage in is_garbage { + for _ in 0..dimension { + expanded.push(garbage); + } + } + expanded +} + +fn fsl_validity_to_garbage_mask(fsl_validity: &NullBuffer) -> Vec { + fsl_validity.iter().map(|valid| !valid).collect() +} + +fn filter_list_garbage( + list_arr: &GenericListArray, + is_garbage: &[bool], +) -> ArrayRef { + debug_assert_eq!( + list_arr.len(), + is_garbage.len(), + "list length must match garbage mask length" + ); + + let old_offsets = list_arr.offsets(); + let value_field = match list_arr.data_type() { + DataType::List(f) | DataType::LargeList(f) => f.clone(), + _ => unreachable!(), + }; + + let mut new_offsets: Vec = Vec::with_capacity(list_arr.len() + 1); + let mut values_to_keep: Vec = Vec::new(); + let mut validity_builder = BooleanBufferBuilder::new(list_arr.len()); + let mut current_offset = O::usize_as(0); + new_offsets.push(current_offset); + let old_validity = list_arr.nulls(); + + for (i, &garbage) in is_garbage.iter().enumerate() { + if garbage { + new_offsets.push(current_offset); + validity_builder.append(false); + } else { + let start = old_offsets[i].as_usize(); + let end = old_offsets[i + 1].as_usize(); + values_to_keep.extend(start..end); + current_offset += O::usize_as(end - start); + new_offsets.push(current_offset); + validity_builder.append(old_validity.map(|v| v.is_valid(i)).unwrap_or(true)); + } + } + + let new_values = if values_to_keep.is_empty() { + list_arr.values().slice(0, 0) + } else { + let indices = + arrow_array::UInt64Array::from_iter_values(values_to_keep.iter().map(|&i| i as u64)); + arrow_select::take::take(list_arr.values().as_ref(), &indices, None) + .expect("take should succeed") + }; + + let new_values = if needs_garbage_filtering(value_field.data_type()) && !new_values.is_empty() { + let len = new_values.len(); + filter_fsl_child_garbage(new_values, &vec![false; len]) + } else { + new_values + }; + + let new_validity = NullBuffer::new(validity_builder.finish()); + Arc::new(GenericListArray::new( + value_field, + OffsetBuffer::new(ScalarBuffer::from(new_offsets)), + new_values, + Some(new_validity), + )) +} + +fn filter_map_garbage(map_arr: &arrow_array::MapArray, is_garbage: &[bool]) -> ArrayRef { + debug_assert_eq!(map_arr.len(), is_garbage.len()); + + let old_offsets = map_arr.offsets(); + let entries_field = match map_arr.data_type() { + DataType::Map(field, _) => field.clone(), + _ => unreachable!(), + }; + + let mut new_offsets: Vec = Vec::with_capacity(map_arr.len() + 1); + let mut values_to_keep: Vec = Vec::new(); + let mut validity_builder = BooleanBufferBuilder::new(map_arr.len()); + let mut current_offset: i32 = 0; + new_offsets.push(current_offset); + let old_validity = map_arr.nulls(); + + for (i, &garbage) in is_garbage.iter().enumerate() { + if garbage { + new_offsets.push(current_offset); + validity_builder.append(false); + } else { + let start = old_offsets[i] as usize; + let end = old_offsets[i + 1] as usize; + values_to_keep.extend(start..end); + current_offset += (end - start) as i32; + new_offsets.push(current_offset); + validity_builder.append(old_validity.map(|v| v.is_valid(i)).unwrap_or(true)); + } + } + + let new_entries: ArrayRef = if values_to_keep.is_empty() { + Arc::new(map_arr.entries().slice(0, 0)) + } else { + let indices = + arrow_array::UInt64Array::from_iter_values(values_to_keep.iter().map(|&i| i as u64)); + arrow_select::take::take(map_arr.entries(), &indices, None).expect("take should succeed") + }; + + let new_entries = + if needs_garbage_filtering(entries_field.data_type()) && !new_entries.is_empty() { + let len = new_entries.len(); + filter_fsl_child_garbage(new_entries, &vec![false; len]) + } else { + new_entries + }; + + let new_validity = NullBuffer::new(validity_builder.finish()); + let keys_sorted = matches!(map_arr.data_type(), DataType::Map(_, true)); + + Arc::new( + arrow_array::MapArray::try_new( + entries_field, + OffsetBuffer::new(ScalarBuffer::from(new_offsets)), + new_entries.as_struct().clone(), + Some(new_validity), + keys_sorted, + ) + .expect("MapArray construction should succeed"), + ) +} + +/// Filters garbage from nested FSL arrays that contain list-like children. +fn filter_nested_fsl_garbage( + fsl_arr: &arrow_array::FixedSizeListArray, + is_garbage: &[bool], + dimension: usize, +) -> ArrayRef { + debug_assert_eq!(fsl_arr.len(), is_garbage.len()); + + let child_field = match fsl_arr.data_type() { + DataType::FixedSizeList(field, _) => field.clone(), + _ => unreachable!(), + }; + + if !needs_garbage_filtering(child_field.data_type()) { + return Arc::new(fsl_arr.clone()); + } + + let child_garbage = expand_garbage_mask(is_garbage, dimension); + let new_values = filter_fsl_child_garbage(fsl_arr.values().clone(), &child_garbage); + + Arc::new(arrow_array::FixedSizeListArray::new( + child_field, + dimension as i32, + new_values, + fsl_arr.nulls().cloned(), + )) +} + +#[cfg(test)] +mod tests { + use std::{collections::HashMap, sync::Arc}; + + use arrow_array::{ + builder::{Int32Builder, ListBuilder}, + cast::AsArray, + Array, FixedSizeListArray, + }; + use arrow_schema::{DataType, Field, Fields}; + use rstest::rstest; + + use super::filter_nested_fsl_garbage; + use crate::{ + constants::{ + STRUCTURAL_ENCODING_FULLZIP, STRUCTURAL_ENCODING_META_KEY, + STRUCTURAL_ENCODING_MINIBLOCK, + }, + testing::{check_specific_random, TestCases}, + version::LanceFileVersion, + }; + + fn make_fsl_struct_type(struct_fields: Fields, dimension: i32) -> DataType { + DataType::FixedSizeList( + Arc::new(Field::new("item", DataType::Struct(struct_fields), true)), + dimension, + ) + } + + fn simple_struct_fields() -> Fields { + Fields::from(vec![ + Field::new("x", DataType::Float64, false), + Field::new("y", DataType::Float64, false), + ]) + } + + fn nested_struct_fields() -> Fields { + let inner = Fields::from(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ]); + Fields::from(vec![ + Field::new("outer_val", DataType::Float64, false), + Field::new("inner", DataType::Struct(inner), true), + ]) + } + + fn nested_struct_with_list_fields() -> Fields { + let inner = Fields::from(vec![Field::new( + "values", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + true, + )]); + Fields::from(vec![ + Field::new("id", DataType::Int32, false), + Field::new("inner", DataType::Struct(inner), true), + ]) + } + + fn struct_with_list_fields() -> Fields { + Fields::from(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "values", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + true, + ), + ]) + } + + fn struct_with_large_list_fields() -> Fields { + Fields::from(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "values", + DataType::LargeList(Arc::new(Field::new("item", DataType::Int64, true))), + true, + ), + ]) + } + + fn struct_with_nested_fsl_fields() -> Fields { + Fields::from(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "vectors", + DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 4), + true, + ), + ]) + } + + fn struct_with_map_fields() -> Fields { + let entries_field = Arc::new(Field::new( + "entries", + DataType::Struct(Fields::from(vec![ + Field::new("keys", DataType::Utf8, false), + Field::new("values", DataType::Int32, true), + ])), + false, + )); + Fields::from(vec![ + Field::new("id", DataType::Int32, false), + Field::new("props", DataType::Map(entries_field, false), true), + ]) + } + + #[rstest] + #[case::simple(simple_struct_fields(), 2, LanceFileVersion::V2_2)] + #[case::nested_struct(nested_struct_fields(), 2, LanceFileVersion::V2_2)] + #[case::struct_with_list(struct_with_list_fields(), 2, LanceFileVersion::V2_2)] + #[case::struct_with_large_list(struct_with_large_list_fields(), 2, LanceFileVersion::V2_2)] + #[case::nested_struct_with_list(nested_struct_with_list_fields(), 2, LanceFileVersion::V2_2)] + #[case::struct_with_nested_fsl(struct_with_nested_fsl_fields(), 2, LanceFileVersion::V2_2)] + #[case::struct_with_map(struct_with_map_fields(), 2, LanceFileVersion::V2_2)] + #[test_log::test(tokio::test)] + async fn test_fsl_struct_random( + #[case] struct_fields: Fields, + #[case] dimension: i32, + #[case] min_version: LanceFileVersion, + #[values(STRUCTURAL_ENCODING_MINIBLOCK, STRUCTURAL_ENCODING_FULLZIP)] + structural_encoding: &str, + ) { + let data_type = make_fsl_struct_type(struct_fields, dimension); + let mut field_metadata = HashMap::new(); + field_metadata.insert( + STRUCTURAL_ENCODING_META_KEY.to_string(), + structural_encoding.into(), + ); + let field = Field::new("", data_type, true).with_metadata(field_metadata); + let test_cases = TestCases::basic().with_min_file_version(min_version); + check_specific_random(field, test_cases).await; + } + + // FSL and FSL are not yet supported (blocked by repdef) + #[test] + #[should_panic(expected = "Unsupported logical type: list")] + fn test_fsl_list_rejected() { + let inner = Field::new( + "item", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + true, + ); + let data_type = DataType::FixedSizeList(Arc::new(inner), 2); + let arrow_field = Field::new("test", data_type, true); + let lance_field = lance_core::datatypes::Field::try_from(&arrow_field).unwrap(); + let _ = lance_field.data_type(); + } + + #[test] + #[should_panic(expected = "Unsupported logical type: map")] + fn test_fsl_map_rejected() { + let inner = Field::new( + "item", + DataType::Map( + Arc::new(Field::new( + "entries", + DataType::Struct(Fields::from(vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Int32, true), + ])), + false, + )), + false, + ), + true, + ); + let data_type = DataType::FixedSizeList(Arc::new(inner), 2); + let arrow_field = Field::new("test", data_type, true); + let lance_field = lance_core::datatypes::Field::try_from(&arrow_field).unwrap(); + let _ = lance_field.data_type(); + } + + #[test] + fn test_filter_nested_fsl_garbage() { + // Create FSL> with dimension 2: [[[1], [2]], [[3], [4]], [[5], [6]]] + let mut list_builder = ListBuilder::new(Int32Builder::new()); + for i in 1..=6 { + list_builder.values().append_value(i); + list_builder.append(true); + } + let list_arr = list_builder.finish(); + + let fsl_field = Arc::new(Field::new( + "item", + DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), + true, + )); + let fsl = FixedSizeListArray::new(fsl_field, 2, Arc::new(list_arr), None); + + // Mark second FSL row as garbage + let result = filter_nested_fsl_garbage(&fsl, &[false, true, false], 2); + let result = result.as_fixed_size_list(); + + // Child lists at positions 2,3 (garbage row 1) should be filtered to null + let child_list = result.values().as_list::(); + assert_eq!( + (0..6).map(|i| child_list.is_valid(i)).collect::>(), + vec![true, true, false, false, true, true] + ); + } + + #[test] + fn test_filter_nested_fsl_no_list_child() { + // FSL - no list child, should return unchanged + let fsl_field = Arc::new(Field::new("item", DataType::Int32, true)); + let values = arrow_array::Int32Array::from(vec![1, 2, 3, 4, 5, 6]); + let fsl = FixedSizeListArray::new(fsl_field, 2, Arc::new(values), None); + + let result = filter_nested_fsl_garbage(&fsl, &[false, true, false], 2); + // Should return the same array unchanged + assert_eq!(result.len(), 3); + } +} diff --git a/rust/lance-encoding/src/encodings/logical/struct.rs b/rust/lance-encoding/src/encodings/logical/struct.rs index 0a53ec9a21c..793e8347360 100644 --- a/rust/lance-encoding/src/encodings/logical/struct.rs +++ b/rust/lance-encoding/src/encodings/logical/struct.rs @@ -8,8 +8,8 @@ use std::{ }; use super::{ - list::StructuralListDecoder, map::StructuralMapDecoder, - primitive::StructuralPrimitiveFieldDecoder, + fixed_size_list::StructuralFixedSizeListDecoder, list::StructuralListDecoder, + map::StructuralMapDecoder, primitive::StructuralPrimitiveFieldDecoder, }; use crate::{ decoder::{ @@ -275,6 +275,16 @@ impl StructuralStructDecoder { field.data_type().clone(), ))) } + DataType::FixedSizeList(child_field, _) + if matches!(child_field.data_type(), DataType::Struct(_)) => + { + // FixedSizeList containing Struct needs structural decoding + let child_decoder = Self::field_to_decoder(child_field, should_validate)?; + Ok(Box::new(StructuralFixedSizeListDecoder::new( + child_decoder, + field.data_type().clone(), + ))) + } DataType::Map(entries_field, keys_sorted) => { if *keys_sorted { return Err(Error::NotSupported {