diff --git a/datafusion/physical-expr-common/src/binary_view_map.rs b/datafusion/physical-expr-common/src/binary_view_map.rs index abc3e28f82627..3e59e1813b724 100644 --- a/datafusion/physical-expr-common/src/binary_view_map.rs +++ b/datafusion/physical-expr-common/src/binary_view_map.rs @@ -33,7 +33,7 @@ use std::sync::Arc; /// HashSet optimized for storing string or binary values that can produce that /// the final set as a `GenericBinaryViewArray` with minimal copies. #[derive(Debug)] -pub struct ArrowBytesViewSet(ArrowBytesViewMap<()>); +pub struct ArrowBytesViewSet(ArrowBytesViewMap); impl ArrowBytesViewSet { pub fn new(output_type: OutputType) -> Self { @@ -42,10 +42,7 @@ impl ArrowBytesViewSet { /// Inserts each value from `values` into the set pub fn insert(&mut self, values: &ArrayRef) { - fn make_payload_fn(_value: Option<&[u8]>) {} - fn observe_payload_fn(_payload: ()) {} - self.0 - .insert_if_new(values, make_payload_fn, observe_payload_fn); + self.0.insert_if_new(values, |_idx| {}); } /// Return the contents of this map and replace it with a new empty map with @@ -88,14 +85,6 @@ impl ArrowBytesViewSet { /// values that can produce the set of keys on /// output as `GenericBinaryViewArray` without copies. /// -/// Equivalent to `HashSet` but with better performance if you need -/// to emit the keys as an Arrow `StringViewArray` / `BinaryViewArray`. For other -/// purposes it is the same as a `HashMap` -/// -/// # Generic Arguments -/// -/// * `V`: payload type -/// /// # Description /// /// This is a specialized HashMap with the following properties: @@ -108,8 +97,9 @@ impl ArrowBytesViewSet { /// 2. Retains the insertion order of entries in the final array. The values are /// in the same order as they were inserted. /// -/// Note this structure can be used as a `HashSet` by specifying the value type -/// as `()`, as is done by [`ArrowBytesViewSet`]. +/// Each distinct value is assigned a sequential index (starting from 0) which +/// serves as both the position in the output array and the group index for +/// `GROUP BY` operations. /// /// This map is used by the special `COUNT DISTINCT` aggregate function to /// store the distinct values, and by the `GROUP BY` operator to store @@ -117,14 +107,11 @@ impl ArrowBytesViewSet { /// Max size of the in-progress buffer before flushing to completed buffers const BYTE_VIEW_MAX_BLOCK_SIZE: usize = 2 * 1024 * 1024; -pub struct ArrowBytesViewMap -where - V: Debug + PartialEq + Eq + Clone + Copy + Default, -{ +pub struct ArrowBytesViewMap { /// Should the output be StringView or BinaryView? output_type: OutputType, /// Underlying hash set for each distinct value - map: hashbrown::hash_table::HashTable>, + map: hashbrown::hash_table::HashTable, /// Total size of the map in bytes map_size: usize, @@ -141,19 +128,14 @@ where random_state: RandomState, /// buffer that stores hash values (reused across batches to save allocations) hashes_buffer: Vec, - /// `(payload, null_index)` for the 'null' value, if any - /// NOTE null_index is the logical index in the final array, not the index - /// in the buffer - null: Option<(V, usize)>, + /// Index of the null value in the views array, if any + null: Option, } /// The size, in number of entries, of the initial hash table const INITIAL_MAP_CAPACITY: usize = 512; -impl ArrowBytesViewMap -where - V: Debug + PartialEq + Eq + Clone + Copy + Default, -{ +impl ArrowBytesViewMap { pub fn new(output_type: OutputType) -> Self { Self { output_type, @@ -177,58 +159,24 @@ where new_self } - /// Inserts each value from `values` into the map, invoking `payload_fn` for - /// each value if *not* already present, deferring the allocation of the - /// payload until it is needed. - /// - /// Note that this is different than a normal map that would replace the - /// existing entry - /// - /// # Arguments: - /// - /// `values`: array whose values are inserted - /// - /// `make_payload_fn`: invoked for each value that is not already present - /// to create the payload, in order of the values in `values` - /// - /// `observe_payload_fn`: invoked once, for each value in `values`, that was - /// already present in the map, with corresponding payload value. - /// - /// # Returns - /// - /// The payload value for the entry, either the existing value or - /// the newly inserted value - /// - /// # Safety: + /// Inserts each value from `values` into the map. /// - /// Note that `make_payload_fn` and `observe_payload_fn` are only invoked - /// with valid values from `values`, not for the `NULL` value. - pub fn insert_if_new( - &mut self, - values: &ArrayRef, - make_payload_fn: MP, - observe_payload_fn: OP, - ) where - MP: FnMut(Option<&[u8]>) -> V, - OP: FnMut(V), + /// For each value in `values`, calls `observe_fn` with the entry's index + /// (sequential, starting from 0). This index is the same for both new and + /// previously seen values, serving as the group index for `GROUP BY`. + pub fn insert_if_new(&mut self, values: &ArrayRef, observe_fn: OP) + where + OP: FnMut(usize), { // Sanity check array type match self.output_type { OutputType::BinaryView => { assert!(matches!(values.data_type(), DataType::BinaryView)); - self.insert_if_new_inner::( - values, - make_payload_fn, - observe_payload_fn, - ) + self.insert_if_new_inner::(values, observe_fn) } OutputType::Utf8View => { assert!(matches!(values.data_type(), DataType::Utf8View)); - self.insert_if_new_inner::( - values, - make_payload_fn, - observe_payload_fn, - ) + self.insert_if_new_inner::(values, observe_fn) } _ => unreachable!("Utf8/Binary should use `ArrowBytesSet`"), }; @@ -242,14 +190,9 @@ where /// simpler and understand and reducing code bloat due to duplication. /// /// See comments on `insert_if_new` for more details - fn insert_if_new_inner( - &mut self, - values: &ArrayRef, - mut make_payload_fn: MP, - mut observe_payload_fn: OP, - ) where - MP: FnMut(Option<&[u8]>) -> V, - OP: FnMut(V), + fn insert_if_new_inner(&mut self, values: &ArrayRef, mut observe_fn: OP) + where + OP: FnMut(usize), B: ByteViewType, { // step 1: compute hashes @@ -276,17 +219,16 @@ where // handle null value via validity bitmap check if values.is_null(i) { - let payload = if let Some(&(payload, _offset)) = self.null.as_ref() { - payload + let idx = if let Some(null_index) = self.null { + null_index } else { - let payload = make_payload_fn(None); let null_index = self.views.len(); self.views.push(0); self.nulls.append_null(); - self.null = Some((payload, null_index)); - payload + self.null = Some(null_index); + null_index }; - observe_payload_fn(payload); + observe_fn(idx); continue; } @@ -294,8 +236,9 @@ where let len = view_u128 as u32; // Check if value already exists - let maybe_payload = { - // Borrow completed and in_progress for comparison + let existing_idx = { + // Borrow fields for comparison closure + let views = &self.views; let completed = &self.completed; let in_progress = &self.in_progress; @@ -305,20 +248,22 @@ where return false; } + let stored_view = views[header.view_idx]; + // Fast path: inline strings can be compared directly if len <= 12 { - return header.view == view_u128; + return stored_view == view_u128; } // For larger strings: first compare the 4-byte prefix - let stored_prefix = (header.view >> 32) as u32; + let stored_prefix = (stored_view >> 32) as u32; let input_prefix = (view_u128 >> 32) as u32; if stored_prefix != input_prefix { return false; } // Prefix matched - compare full bytes - let byte_view = ByteView::from(header.view); + let byte_view = ByteView::from(stored_view); let stored_len = byte_view.length as usize; let buffer_index = byte_view.buffer_index as usize; let offset = byte_view.offset as usize; @@ -332,29 +277,23 @@ where let input_value: &[u8] = values.value(i).as_ref(); stored_value == input_value }) - .map(|entry| entry.payload) + .map(|entry| entry.view_idx) }; - let payload = if let Some(payload) = maybe_payload { - payload + let idx = if let Some(idx) = existing_idx { + idx } else { - // no existing value, make a new one + // no existing value, insert new entry let value: &[u8] = values.value(i).as_ref(); - let payload = make_payload_fn(Some(value)); - - // Create view pointing to our buffers - let new_view = self.append_value(value); - let new_header = Entry { - view: new_view, - hash, - payload, - }; + let view_idx = self.views.len(); + self.append_value(value); + let new_header = Entry { view_idx, hash }; self.map .insert_accounted(new_header, |h| h.hash, &mut self.map_size); - payload + view_idx }; - observe_payload_fn(payload); + observe_fn(idx); } } @@ -448,10 +387,7 @@ where } } -impl Debug for ArrowBytesViewMap -where - V: Debug + PartialEq + Eq + Clone + Copy + Default, -{ +impl Debug for ArrowBytesViewMap { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ArrowBytesMap") .field("map", &"") @@ -466,24 +402,14 @@ where /// Entry in the hash table -- see [`ArrowBytesViewMap`] for more details /// -/// Stores the view pointing to our internal buffers, eliminating the need -/// for a separate builder index. For inline strings (<=12 bytes), the view -/// contains the entire value. For out-of-line strings, the view contains -/// buffer_index and offset pointing directly to our storage. +/// Only stores the view index and hash. The view_idx also serves as the +/// entry's group index since entries are inserted sequentially. #[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] -struct Entry -where - V: Debug + PartialEq + Eq + Clone + Copy + Default, -{ - /// The u128 view pointing to our internal buffers. For inline strings, - /// this contains the complete value. For larger strings, this contains - /// the buffer_index/offset into our completed/in_progress buffers. - view: u128, - +struct Entry { + /// Index into the `views` Vec in [`ArrowBytesViewMap`]. + /// Also serves as the group index for GROUP BY operations. + view_idx: usize, hash: u64, - - /// value stored by the entry - payload: V, } #[cfg(test)] @@ -678,15 +604,9 @@ mod tests { assert_eq!(set.len(), 10); } - #[derive(Debug, PartialEq, Eq, Default, Clone, Copy)] - struct TestPayload { - // store the string value to check against input - index: usize, // store the index of the string (each new string gets the next sequential input) - } - /// Wraps an [`ArrowBytesViewMap`], validating its invariants struct TestMap { - map: ArrowBytesViewMap, + map: ArrowBytesViewMap, // stores distinct strings seen, in order strings: Vec>, // map strings to index in strings @@ -694,8 +614,6 @@ mod tests { } impl TestMap { - /// creates a map with TestPayloads for the given strings and then - /// validates the payloads fn new() -> Self { Self { map: ArrowBytesViewMap::new(OutputType::Utf8View), @@ -704,47 +622,31 @@ mod tests { } } - /// Inserts strings into the map + /// Inserts strings into the map and validates the returned indexes fn insert(&mut self, strings: &[Option<&str>]) { let string_array = StringViewArray::from(strings.to_vec()); let arr: ArrayRef = Arc::new(string_array); - let mut next_index = self.indexes.len(); - let mut actual_new_strings = vec![]; - let mut actual_seen_indexes = vec![]; - // update self with new values, keeping track of newly added values + let mut expected_indexes = vec![]; + // update self with new values, keeping track of expected indexes for str in strings { let str = str.map(|s| s.to_string()); let index = self.indexes.get(&str).cloned().unwrap_or_else(|| { - actual_new_strings.push(str.clone()); let index = self.strings.len(); self.strings.push(str.clone()); self.indexes.insert(str, index); index }); - actual_seen_indexes.push(index); + expected_indexes.push(index); } - // insert the values into the map, recording what we did - let mut seen_new_strings = vec![]; + // insert the values into the map, recording returned indexes let mut seen_indexes = vec![]; - self.map.insert_if_new( - &arr, - |s| { - let value = s - .map(|s| String::from_utf8(s.to_vec()).expect("Non utf8 string")); - let index = next_index; - next_index += 1; - seen_new_strings.push(value); - TestPayload { index } - }, - |payload| { - seen_indexes.push(payload.index); - }, - ); - - assert_eq!(actual_seen_indexes, seen_indexes); - assert_eq!(actual_new_strings, seen_new_strings); + self.map.insert_if_new(&arr, |idx| { + seen_indexes.push(idx); + }); + + assert_eq!(expected_indexes, seen_indexes); } /// Call `self.map.into_array()` validating that the strings are in the same @@ -784,4 +686,12 @@ mod tests { let expected_output: ArrayRef = Arc::new(StringViewArray::from(input)); assert_eq!(&test_map.into_array(), &expected_output); } + + #[test] + fn test_entry_size() { + // Entry should be 16 bytes: view_idx (usize) + hash (u64) + // Previously it also stored a full u128 view and a payload, + // which made each entry 32+ bytes. + assert_eq!(size_of::(), 16); + } } diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs index a91dd3115d879..0627410a37323 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs @@ -19,7 +19,7 @@ use crate::aggregates::group_values::multi_group_by::{ GroupColumn, Nulls, nulls_equal_to, }; use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder; -use arrow::array::{Array, ArrayRef, AsArray, ByteView, GenericByteViewArray, make_view}; +use arrow::array::{Array, ArrayRef, AsArray, ByteView, GenericByteViewArray}; use arrow::buffer::{Buffer, ScalarBuffer}; use arrow::datatypes::ByteViewType; use datafusion_common::Result; @@ -157,17 +157,45 @@ impl ByteViewGroupValueBuilder { Nulls::Some }; + // Pre-reserve capacity for views + self.views.reserve(rows.len()); + match all_null_or_non_null { Nulls::Some => { + let input_views = arr.views(); for &row in rows { - self.append_val_inner(array, row); + if arr.is_null(row) { + self.nulls.append(true); + self.views.push(0); + } else { + self.nulls.append(false); + // SAFETY: row is a valid index + let input_view = + unsafe { *input_views.get_unchecked(row) }; + self.do_append_val_with_view(arr, row, input_view); + } } } Nulls::None => { self.nulls.append_n(rows.len(), false); - for &row in rows { - self.do_append_val_inner(arr, row); + let input_views = arr.views(); + let has_buffers = !arr.data_buffers().is_empty(); + if has_buffers { + for &row in rows { + // SAFETY: row is a valid index + let input_view = + unsafe { *input_views.get_unchecked(row) }; + self.do_append_val_with_view(arr, row, input_view); + } + } else { + // All values are inline — just copy the views directly + let input_views = arr.views(); + self.views.extend( + rows.iter() + // SAFETY: rows are valid indices + .map(|&row| unsafe { *input_views.get_unchecked(row) }), + ); } } @@ -183,21 +211,44 @@ impl ByteViewGroupValueBuilder { where B: ByteViewType, { - let value: &[u8] = array.value(row).as_ref(); + // SAFETY: row is a valid index + let input_view = unsafe { *array.views().get_unchecked(row) }; + self.do_append_val_with_view(array, row, input_view); + } + + /// Append a value with a pre-fetched input view, avoiding redundant view lookups + #[inline(always)] + fn do_append_val_with_view( + &mut self, + array: &GenericByteViewArray, + row: usize, + input_view: u128, + ) where + B: ByteViewType, + { + let value_len = input_view as u32; - let value_len = value.len(); let view = if value_len <= 12 { - make_view(value, 0, 0) + // For inline values, the view is self-contained — copy it directly + // without accessing the underlying byte data + input_view } else { + // For non-inline values, we need to copy the bytes into our buffers + // and create a new view pointing to our storage + let value: &[u8] = unsafe { array.value_unchecked(row).as_ref() }; + // Ensure big enough block to hold the value firstly - self.ensure_in_progress_big_enough(value_len); + self.ensure_in_progress_big_enough(value.len()); - // Append value - let buffer_index = self.completed.len(); - let offset = self.in_progress.len(); + // Reuse length+prefix from input view, set our buffer location + let buffer_index = self.completed.len() as u32; + let offset = self.in_progress.len() as u32; self.in_progress.extend_from_slice(value); - make_view(value, buffer_index as u32, offset as u32) + ByteView::from(input_view) + .with_buffer_index(buffer_index) + .with_offset(offset) + .as_u128() }; // Append view @@ -244,10 +295,7 @@ impl ByteViewGroupValueBuilder { // SAFETY: the `lhs_row` and rhs_row` are valid let exist_view = unsafe { *self.views.get_unchecked(lhs_row) }; - let exist_view_len = exist_view as u32; - let input_view = unsafe { *array.views().get_unchecked(rhs_row) }; - let input_view_len = input_view as u32; // fast path, if we know there are no buffers, then the view must be inlined // so we can simply compare the u128 views @@ -255,27 +303,19 @@ impl ByteViewGroupValueBuilder { return exist_view == input_view; } - // The check logic - // - Check len equality - // - If inlined, check inlined value - // - If non-inlined, check prefix and then check value in buffer - // when needed - if exist_view_len != input_view_len { + // Compare the lower 8 bytes (len + prefix) in one shot. + // This rejects mismatches on either length or prefix with a single comparison. + let exist_lo = exist_view as u64; + let input_lo = input_view as u64; + if exist_lo != input_lo { return false; } + let exist_view_len = exist_view as u32; if exist_view_len <= 12 { - // both inlined, so compare inlined value - exist_view == input_view + // both inlined and len+prefix match — compare remaining inlined data + (exist_view >> 64) == (input_view >> 64) } else { - let exist_prefix = - unsafe { GenericByteViewArray::::inline_value(&exist_view, 4) }; - let input_prefix = - unsafe { GenericByteViewArray::::inline_value(&input_view, 4) }; - - if exist_prefix != input_prefix { - return false; - } // get the full values and compare let exist_full = { diff --git a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs index 7a56f7c52c11a..e4e097a108bb7 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/bytes_view.rs @@ -20,7 +20,6 @@ use arrow::array::{Array, ArrayRef}; use datafusion_expr::EmitTo; use datafusion_physical_expr::binary_map::OutputType; use datafusion_physical_expr_common::binary_view_map::ArrowBytesViewMap; -use std::mem::size_of; /// A [`GroupValues`] storing single column of Utf8View/BinaryView values /// @@ -28,16 +27,13 @@ use std::mem::size_of; /// purpose `Row`s format pub struct GroupValuesBytesView { /// Map string/binary values to group index - map: ArrowBytesViewMap, - /// The total number of groups so far (used to assign group_index) - num_groups: usize, + map: ArrowBytesViewMap, } impl GroupValuesBytesView { pub fn new(output_type: OutputType) -> Self { Self { map: ArrowBytesViewMap::new(output_type), - num_groups: 0, } } } @@ -54,20 +50,9 @@ impl GroupValues for GroupValuesBytesView { let arr = &cols[0]; groups.clear(); - self.map.insert_if_new( - arr, - // called for each new group - |_value| { - // assign new group index on each insert - let group_idx = self.num_groups; - self.num_groups += 1; - group_idx - }, - // called for each group - |group_idx| { - groups.push(group_idx); - }, - ); + self.map.insert_if_new(arr, |group_idx| { + groups.push(group_idx); + }); // ensure we assigned a group to for each row assert_eq!(groups.len(), arr.len()); @@ -79,11 +64,11 @@ impl GroupValues for GroupValuesBytesView { } fn is_empty(&self) -> bool { - self.num_groups == 0 + self.map.is_empty() } fn len(&self) -> usize { - self.num_groups + self.map.len() } fn emit(&mut self, emit_to: EmitTo) -> datafusion_common::Result> { @@ -91,14 +76,8 @@ impl GroupValues for GroupValuesBytesView { let map_contents = self.map.take().into_state(); let group_values = match emit_to { - EmitTo::All => { - self.num_groups -= map_contents.len(); - map_contents - } - EmitTo::First(n) if n == self.len() => { - self.num_groups -= map_contents.len(); - map_contents - } + EmitTo::All => map_contents, + EmitTo::First(n) if n == self.len() => map_contents, EmitTo::First(n) => { // if we only wanted to take the first n, insert the rest back // into the map we could potentially avoid this reallocation, at @@ -108,7 +87,6 @@ impl GroupValues for GroupValuesBytesView { let remaining_group_values = map_contents.slice(n, map_contents.len() - n); - self.num_groups = 0; let mut group_indexes = vec![]; self.intern(&[remaining_group_values], &mut group_indexes)?;