-
Notifications
You must be signed in to change notification settings - Fork 2k
Rich t kid/dictionary encoding hash optmize #21589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
aaf9ca7
14d4d80
efaf690
92ec4b6
3e60f15
cc18a8f
aa69892
203efc4
40a43c6
deec858
3bcb8c7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,266 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| use arrow::array::{ArrayRef, StringDictionaryBuilder}; | ||
| use arrow::datatypes::{DataType, Field, Schema, UInt8Type}; | ||
| use criterion::{Criterion, criterion_group, criterion_main}; | ||
| use datafusion_expr::EmitTo; | ||
| use datafusion_physical_plan::aggregates::group_values::single_group_by::dictionary::GroupValuesDictionary; | ||
| use datafusion_physical_plan::aggregates::group_values::{GroupValues, new_group_values}; | ||
| use datafusion_physical_plan::aggregates::order::GroupOrdering; | ||
| use std::sync::Arc; | ||
| #[derive(Debug)] | ||
| enum Cardinality { | ||
| Xsmall, // 1 | ||
| Small, // 10 | ||
| Medium, // 50 | ||
| Large, // 200 | ||
| } | ||
| #[derive(Debug)] | ||
| enum BatchSize { | ||
| Small, // 8192 | ||
| Medium, // 32768 | ||
| Large, // 65536 | ||
| } | ||
| #[derive(Debug)] | ||
| enum NullRate { | ||
| Zero, // 0% | ||
| Low, // 1% | ||
| Medium, // 5% | ||
| High, // 20% | ||
| } | ||
| #[derive(Debug, Clone)] | ||
| enum GroupType { | ||
| Dictionary, | ||
| GroupValueRows, | ||
| } | ||
| fn create_string_values(cardinality: &Cardinality) -> Vec<String> { | ||
| let num_values = match cardinality { | ||
| Cardinality::Xsmall => 3, | ||
| Cardinality::Small => 10, | ||
| Cardinality::Medium => 50, | ||
| Cardinality::Large => 200, | ||
| }; | ||
| (0..num_values) | ||
| .map(|i| format!("group_value_{i:06}")) | ||
| .collect() | ||
| } | ||
| fn create_batch(batch_size: &BatchSize, cardinality: &Cardinality) -> Vec<String> { | ||
| let size = match batch_size { | ||
| BatchSize::Small => 8192, | ||
| BatchSize::Medium => 32768, | ||
| BatchSize::Large => 65536, | ||
| }; | ||
| let unique_strings = create_string_values(cardinality); | ||
| if unique_strings.is_empty() { | ||
| return Vec::new(); | ||
| } | ||
|
|
||
| unique_strings.iter().cycle().take(size).cloned().collect() | ||
| } | ||
| fn strings_to_dict_array(values: Vec<Option<String>>) -> ArrayRef { | ||
| let mut builder = StringDictionaryBuilder::<UInt8Type>::new(); | ||
| for v in values { | ||
| match v { | ||
| Some(v) => builder.append_value(v), | ||
| None => builder.append_null(), | ||
| } | ||
| } | ||
| Arc::new(builder.finish()) | ||
| } | ||
| fn introduce_nulls(values: Vec<String>, null_rate: &NullRate) -> Vec<Option<String>> { | ||
| let rate = match null_rate { | ||
| NullRate::Zero => 0.0, | ||
| NullRate::Low => 0.01, | ||
| NullRate::Medium => 0.05, | ||
| NullRate::High => 0.20, | ||
| }; | ||
| values | ||
| .into_iter() | ||
| .map(|v| { | ||
| if rand::random::<f64>() < rate { | ||
| None | ||
| } else { | ||
| Some(v) | ||
| } | ||
| }) | ||
| .collect() | ||
| } | ||
|
|
||
| fn generate_group_values(kind: &GroupType) -> Box<dyn GroupValues> { | ||
| match kind { | ||
| GroupType::GroupValueRows => { | ||
| // we know this is going to hit the fallback path I.E GroupValueRows, but for the sake of avoiding making private items public call the public api | ||
| let schema = Arc::new(Schema::new(vec![Field::new( | ||
| "group_col", | ||
| DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)), | ||
| false, | ||
| )])); | ||
| new_group_values(schema, &GroupOrdering::None).unwrap() | ||
| } | ||
| GroupType::Dictionary => { | ||
| // call custom path directly | ||
| Box::new(GroupValuesDictionary::<UInt8Type>::new(&DataType::Utf8)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn bench_single_column_group_values(c: &mut Criterion) { | ||
| let group_types = [GroupType::GroupValueRows, GroupType::Dictionary]; | ||
| let cardinalities = [ | ||
| Cardinality::Xsmall, | ||
| Cardinality::Small, | ||
| Cardinality::Medium, | ||
| Cardinality::Large, | ||
| ]; | ||
| let batch_sizes = [BatchSize::Small, BatchSize::Medium, BatchSize::Large]; | ||
| let null_rates = [ | ||
| NullRate::Zero, | ||
| NullRate::Low, | ||
| NullRate::Medium, | ||
| NullRate::High, | ||
| ]; | ||
|
|
||
| for cardinality in &cardinalities { | ||
| for batch_size in &batch_sizes { | ||
| for null_rate in &null_rates { | ||
| for group_type in &group_types { | ||
| let group_name = format!( | ||
| "t1_{group_type:?}_cardinality_{cardinality:?}_batch_{batch_size:?}_null_rate_{null_rate:?}" | ||
| ); | ||
|
|
||
| let string_vec = create_batch(batch_size, cardinality); | ||
| let nullable_values = introduce_nulls(string_vec, null_rate); | ||
| let col_ref = match group_type { | ||
| GroupType::Dictionary | GroupType::GroupValueRows => { | ||
| strings_to_dict_array(nullable_values.clone()) | ||
| } | ||
| }; | ||
| c.bench_function(&group_name, |b| { | ||
| b.iter_batched( | ||
| || { | ||
| //create fresh group values for each iteration | ||
| let gv = generate_group_values(group_type); | ||
| let col = col_ref.clone(); | ||
| (gv, col) | ||
| }, | ||
| |(mut group_values, col)| { | ||
| let mut groups = Vec::new(); | ||
| group_values.intern(&[col], &mut groups).unwrap(); | ||
| //group_values.emit(EmitTo::All).unwrap(); | ||
| }, | ||
| criterion::BatchSize::SmallInput, | ||
| ); | ||
| }); | ||
|
|
||
| // Second benchmark that alternates between intern and emit to simulate more realistic usage patterns where the same group values is used across multiple batches of the same grouping column | ||
| let multi_batch_name = format!( | ||
| "multi_batch/{group_type:?}_cardinality_{cardinality:?}_batch_{batch_size:?}_null_rate_{null_rate:?}" | ||
| ); | ||
| c.bench_function(&multi_batch_name, |b| { | ||
| b.iter_batched( | ||
| || { | ||
| // setup - create 3 batches to simulate multiple record batches | ||
| let gv = generate_group_values(group_type); | ||
| let batch1 = col_ref.clone(); | ||
| let batch2 = col_ref.clone(); | ||
| let batch3 = col_ref.clone(); | ||
| (gv, batch1, batch2, batch3) | ||
| }, | ||
| |(mut group_values, batch1, batch2, batch3)| { | ||
| // simulate realistic aggregation flow: | ||
| // multiple intern calls (one per record batch) | ||
| // followed by emit | ||
| let mut groups = Vec::new(); | ||
|
|
||
| group_values.intern(&[batch1], &mut groups).unwrap(); | ||
| groups.clear(); | ||
| group_values.intern(&[batch2], &mut groups).unwrap(); | ||
| groups.clear(); | ||
| group_values.intern(&[batch3], &mut groups).unwrap(); | ||
|
|
||
| // emit once at the end like the real aggregation flow | ||
| group_values.emit(EmitTo::All).unwrap(); | ||
| }, | ||
| criterion::BatchSize::SmallInput, | ||
| ); | ||
| }); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| fn bench_repeated_intern_prefab_cols(c: &mut Criterion) { | ||
| let cardinality = Cardinality::Small; | ||
| let batch_size = BatchSize::Large; | ||
| let null_rate = NullRate::Low; | ||
| let group_types = [GroupType::GroupValueRows, GroupType::Dictionary]; | ||
|
|
||
| for group_type in &group_types { | ||
| let group_type = group_type.clone(); | ||
| let string_vec = create_batch(&batch_size, &cardinality); | ||
| let nullable_values = introduce_nulls(string_vec, &null_rate); | ||
| let col_ref = match group_type { | ||
| GroupType::Dictionary | GroupType::GroupValueRows => { | ||
| strings_to_dict_array(nullable_values.clone()) | ||
| } | ||
| }; | ||
|
|
||
| // Build once outside the benchmark iteration and reuse in intern calls. | ||
| let arr1 = col_ref.clone(); | ||
| let arr2 = col_ref.clone(); | ||
| let arr3 = col_ref.clone(); | ||
| let arr4 = col_ref.clone(); | ||
|
|
||
| let group_name = format!( | ||
| "repeated_intern/{group_type:?}_cardinality_{cardinality:?}_batch_{batch_size:?}_null_rate_{null_rate:?}" | ||
| ); | ||
| c.bench_function(&group_name, |b| { | ||
| b.iter_batched( | ||
| || generate_group_values(&group_type), | ||
| |mut group_values| { | ||
| let mut groups = Vec::new(); | ||
|
|
||
| group_values | ||
| .intern(std::slice::from_ref(&arr1), &mut groups) | ||
| .unwrap(); | ||
| groups.clear(); | ||
| group_values | ||
| .intern(std::slice::from_ref(&arr2), &mut groups) | ||
| .unwrap(); | ||
| groups.clear(); | ||
| group_values | ||
| .intern(std::slice::from_ref(&arr3), &mut groups) | ||
| .unwrap(); | ||
| groups.clear(); | ||
| group_values | ||
| .intern(std::slice::from_ref(&arr4), &mut groups) | ||
| .unwrap(); | ||
| }, | ||
| criterion::BatchSize::SmallInput, | ||
| ); | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| criterion_group!( | ||
| benches, | ||
| bench_single_column_group_values, | ||
| bench_repeated_intern_prefab_cols | ||
| ); | ||
| criterion_main!(benches); |
|
Rich-T-kid marked this conversation as resolved.
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,8 +30,8 @@ use datafusion_expr::EmitTo; | |
|
|
||
| pub mod multi_group_by; | ||
|
|
||
| mod row; | ||
| mod single_group_by; | ||
| pub mod row; | ||
| pub mod single_group_by; | ||
| use datafusion_physical_expr::binary_map::OutputType; | ||
| use multi_group_by::GroupValuesColumn; | ||
| use row::GroupValuesRows; | ||
|
|
@@ -41,7 +41,8 @@ pub(crate) use single_group_by::primitive::HashValue; | |
| use crate::aggregates::{ | ||
| group_values::single_group_by::{ | ||
| boolean::GroupValuesBoolean, bytes::GroupValuesBytes, | ||
| bytes_view::GroupValuesBytesView, primitive::GroupValuesPrimitive, | ||
| bytes_view::GroupValuesBytesView, dictionary::GroupValuesDictionary, | ||
| primitive::GroupValuesPrimitive, | ||
| }, | ||
| order::GroupOrdering, | ||
| }; | ||
|
|
@@ -196,6 +197,56 @@ pub fn new_group_values( | |
| DataType::Boolean => { | ||
| return Ok(Box::new(GroupValuesBoolean::new())); | ||
| } | ||
| DataType::Dictionary(key_type, value_type) => { | ||
| if supported_single_dictionary_value(value_type) { | ||
| return match key_type.as_ref() { | ||
| // TODO: turn this into a macro | ||
|
Rich-T-kid marked this conversation as resolved.
|
||
| DataType::Int8 => { | ||
| Ok(Box::new(GroupValuesDictionary::< | ||
| arrow::datatypes::Int8Type, | ||
| >::new(value_type))) | ||
| } | ||
| DataType::Int16 => { | ||
| Ok(Box::new(GroupValuesDictionary::< | ||
| arrow::datatypes::Int16Type, | ||
| >::new(value_type))) | ||
| } | ||
| DataType::Int32 => { | ||
| Ok(Box::new(GroupValuesDictionary::< | ||
| arrow::datatypes::Int32Type, | ||
| >::new(value_type))) | ||
| } | ||
| DataType::Int64 => { | ||
| Ok(Box::new(GroupValuesDictionary::< | ||
| arrow::datatypes::Int64Type, | ||
| >::new(value_type))) | ||
| } | ||
| DataType::UInt8 => { | ||
| Ok(Box::new(GroupValuesDictionary::< | ||
| arrow::datatypes::UInt8Type, | ||
| >::new(value_type))) | ||
| } | ||
| DataType::UInt16 => { | ||
| Ok(Box::new(GroupValuesDictionary::< | ||
| arrow::datatypes::UInt16Type, | ||
| >::new(value_type))) | ||
| } | ||
| DataType::UInt32 => { | ||
| Ok(Box::new(GroupValuesDictionary::< | ||
| arrow::datatypes::UInt32Type, | ||
| >::new(value_type))) | ||
| } | ||
| DataType::UInt64 => { | ||
| Ok(Box::new(GroupValuesDictionary::< | ||
| arrow::datatypes::UInt64Type, | ||
| >::new(value_type))) | ||
| } | ||
| _ => Err(datafusion_common::DataFusionError::NotImplemented( | ||
| format!("Unsupported dictionary key type: {key_type:?}",), | ||
| )), | ||
| }; | ||
| } | ||
| } | ||
| _ => {} | ||
| } | ||
| } | ||
|
|
@@ -207,6 +258,19 @@ pub fn new_group_values( | |
| Ok(Box::new(GroupValuesColumn::<true>::try_new(schema)?)) | ||
| } | ||
| } else { | ||
| // TODO: add specialized implementation for dictionary encoding columns for 2+ group by columns case | ||
| Ok(Box::new(GroupValuesRows::try_new(schema)?)) | ||
| } | ||
| } | ||
|
|
||
| fn supported_single_dictionary_value(t: &DataType) -> bool { | ||
| matches!( | ||
| t, | ||
| DataType::Utf8 | ||
| | DataType::LargeUtf8 | ||
| | DataType::Binary | ||
| | DataType::LargeBinary | ||
| | DataType::Utf8View | ||
| | DataType::BinaryView | ||
| ) | ||
| } | ||
|
Comment on lines
+266
to
+276
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are the most common types, but why not support other types? Other types would fall under the slow path no?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This PR is intended to serve as a proof of concept that dictionary encoding works and improves efficiency. It makes sense to start with string types since that is what dictionary arrays are most commonly used with and where they provide the most benefit. A follow up issue can be created to support additional types such as LargeUtf8, LargeList, and numeric values ,which should be as minimal as adding the relevant branches in |
||
Uh oh!
There was an error while loading. Please reload this page.