Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,11 @@ required-features = ["test_utils"]
harness = false
name = "aggregate_vectorized"
required-features = ["test_utils"]

[[bench]]
name = "single_column_aggr"
harness = false

[profile.profiling]
inherits = "release"
debug = true
Comment thread
Rich-T-kid marked this conversation as resolved.
266 changes: 266 additions & 0 deletions datafusion/physical-plan/benches/single_column_aggr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{ArrayRef, StringDictionaryBuilder};
use arrow::datatypes::{DataType, Field, Schema, UInt8Type};
use criterion::{Criterion, criterion_group, criterion_main};
use datafusion_expr::EmitTo;
use datafusion_physical_plan::aggregates::group_values::single_group_by::dictionary::GroupValuesDictionary;
use datafusion_physical_plan::aggregates::group_values::{GroupValues, new_group_values};
use datafusion_physical_plan::aggregates::order::GroupOrdering;
use std::sync::Arc;
#[derive(Debug)]
enum Cardinality {
Xsmall, // 1
Small, // 10
Medium, // 50
Large, // 200
}
#[derive(Debug)]
enum BatchSize {
Small, // 8192
Medium, // 32768
Large, // 65536
}
#[derive(Debug)]
enum NullRate {
Zero, // 0%
Low, // 1%
Medium, // 5%
High, // 20%
}
#[derive(Debug, Clone)]
enum GroupType {
Dictionary,
GroupValueRows,
}
fn create_string_values(cardinality: &Cardinality) -> Vec<String> {
let num_values = match cardinality {
Cardinality::Xsmall => 3,
Cardinality::Small => 10,
Cardinality::Medium => 50,
Cardinality::Large => 200,
};
(0..num_values)
.map(|i| format!("group_value_{i:06}"))
.collect()
}
fn create_batch(batch_size: &BatchSize, cardinality: &Cardinality) -> Vec<String> {
let size = match batch_size {
BatchSize::Small => 8192,
BatchSize::Medium => 32768,
BatchSize::Large => 65536,
};
let unique_strings = create_string_values(cardinality);
if unique_strings.is_empty() {
return Vec::new();
}

unique_strings.iter().cycle().take(size).cloned().collect()
}
fn strings_to_dict_array(values: Vec<Option<String>>) -> ArrayRef {
let mut builder = StringDictionaryBuilder::<UInt8Type>::new();
for v in values {
match v {
Some(v) => builder.append_value(v),
None => builder.append_null(),
}
}
Arc::new(builder.finish())
}
fn introduce_nulls(values: Vec<String>, null_rate: &NullRate) -> Vec<Option<String>> {
let rate = match null_rate {
NullRate::Zero => 0.0,
NullRate::Low => 0.01,
NullRate::Medium => 0.05,
NullRate::High => 0.20,
};
values
.into_iter()
.map(|v| {
if rand::random::<f64>() < rate {
None
} else {
Some(v)
}
})
.collect()
}

fn generate_group_values(kind: &GroupType) -> Box<dyn GroupValues> {
match kind {
GroupType::GroupValueRows => {
// we know this is going to hit the fallback path I.E GroupValueRows, but for the sake of avoiding making private items public call the public api
let schema = Arc::new(Schema::new(vec![Field::new(
"group_col",
DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
false,
)]));
new_group_values(schema, &GroupOrdering::None).unwrap()
}
GroupType::Dictionary => {
// call custom path directly
Box::new(GroupValuesDictionary::<UInt8Type>::new(&DataType::Utf8))
}
}
}

fn bench_single_column_group_values(c: &mut Criterion) {
let group_types = [GroupType::GroupValueRows, GroupType::Dictionary];
let cardinalities = [
Cardinality::Xsmall,
Cardinality::Small,
Cardinality::Medium,
Cardinality::Large,
];
let batch_sizes = [BatchSize::Small, BatchSize::Medium, BatchSize::Large];
let null_rates = [
NullRate::Zero,
NullRate::Low,
NullRate::Medium,
NullRate::High,
];

for cardinality in &cardinalities {
for batch_size in &batch_sizes {
for null_rate in &null_rates {
for group_type in &group_types {
let group_name = format!(
"t1_{group_type:?}_cardinality_{cardinality:?}_batch_{batch_size:?}_null_rate_{null_rate:?}"
);

let string_vec = create_batch(batch_size, cardinality);
let nullable_values = introduce_nulls(string_vec, null_rate);
let col_ref = match group_type {
GroupType::Dictionary | GroupType::GroupValueRows => {
strings_to_dict_array(nullable_values.clone())
}
};
c.bench_function(&group_name, |b| {
b.iter_batched(
|| {
//create fresh group values for each iteration
let gv = generate_group_values(group_type);
let col = col_ref.clone();
(gv, col)
},
|(mut group_values, col)| {
let mut groups = Vec::new();
group_values.intern(&[col], &mut groups).unwrap();
//group_values.emit(EmitTo::All).unwrap();
},
criterion::BatchSize::SmallInput,
);
});

// Second benchmark that alternates between intern and emit to simulate more realistic usage patterns where the same group values is used across multiple batches of the same grouping column
let multi_batch_name = format!(
"multi_batch/{group_type:?}_cardinality_{cardinality:?}_batch_{batch_size:?}_null_rate_{null_rate:?}"
);
c.bench_function(&multi_batch_name, |b| {
b.iter_batched(
|| {
// setup - create 3 batches to simulate multiple record batches
let gv = generate_group_values(group_type);
let batch1 = col_ref.clone();
let batch2 = col_ref.clone();
let batch3 = col_ref.clone();
(gv, batch1, batch2, batch3)
},
|(mut group_values, batch1, batch2, batch3)| {
// simulate realistic aggregation flow:
// multiple intern calls (one per record batch)
// followed by emit
let mut groups = Vec::new();

group_values.intern(&[batch1], &mut groups).unwrap();
groups.clear();
group_values.intern(&[batch2], &mut groups).unwrap();
groups.clear();
group_values.intern(&[batch3], &mut groups).unwrap();

// emit once at the end like the real aggregation flow
group_values.emit(EmitTo::All).unwrap();
},
criterion::BatchSize::SmallInput,
);
});
}
}
}
}
}

fn bench_repeated_intern_prefab_cols(c: &mut Criterion) {
let cardinality = Cardinality::Small;
let batch_size = BatchSize::Large;
let null_rate = NullRate::Low;
let group_types = [GroupType::GroupValueRows, GroupType::Dictionary];

for group_type in &group_types {
let group_type = group_type.clone();
let string_vec = create_batch(&batch_size, &cardinality);
let nullable_values = introduce_nulls(string_vec, &null_rate);
let col_ref = match group_type {
GroupType::Dictionary | GroupType::GroupValueRows => {
strings_to_dict_array(nullable_values.clone())
}
};

// Build once outside the benchmark iteration and reuse in intern calls.
let arr1 = col_ref.clone();
let arr2 = col_ref.clone();
let arr3 = col_ref.clone();
let arr4 = col_ref.clone();

let group_name = format!(
"repeated_intern/{group_type:?}_cardinality_{cardinality:?}_batch_{batch_size:?}_null_rate_{null_rate:?}"
);
c.bench_function(&group_name, |b| {
b.iter_batched(
|| generate_group_values(&group_type),
|mut group_values| {
let mut groups = Vec::new();

group_values
.intern(std::slice::from_ref(&arr1), &mut groups)
.unwrap();
groups.clear();
group_values
.intern(std::slice::from_ref(&arr2), &mut groups)
.unwrap();
groups.clear();
group_values
.intern(std::slice::from_ref(&arr3), &mut groups)
.unwrap();
groups.clear();
group_values
.intern(std::slice::from_ref(&arr4), &mut groups)
.unwrap();
},
criterion::BatchSize::SmallInput,
);
});
}
}

criterion_group!(
benches,
bench_single_column_group_values,
bench_repeated_intern_prefab_cols
);
criterion_main!(benches);
Binary file added datafusion/physical-plan/profile.json.gz
Comment thread
Rich-T-kid marked this conversation as resolved.
Binary file not shown.
70 changes: 67 additions & 3 deletions datafusion/physical-plan/src/aggregates/group_values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use datafusion_expr::EmitTo;

pub mod multi_group_by;

mod row;
mod single_group_by;
pub mod row;
pub mod single_group_by;
use datafusion_physical_expr::binary_map::OutputType;
use multi_group_by::GroupValuesColumn;
use row::GroupValuesRows;
Expand All @@ -41,7 +41,8 @@ pub(crate) use single_group_by::primitive::HashValue;
use crate::aggregates::{
group_values::single_group_by::{
boolean::GroupValuesBoolean, bytes::GroupValuesBytes,
bytes_view::GroupValuesBytesView, primitive::GroupValuesPrimitive,
bytes_view::GroupValuesBytesView, dictionary::GroupValuesDictionary,
primitive::GroupValuesPrimitive,
},
order::GroupOrdering,
};
Expand Down Expand Up @@ -196,6 +197,56 @@ pub fn new_group_values(
DataType::Boolean => {
return Ok(Box::new(GroupValuesBoolean::new()));
}
DataType::Dictionary(key_type, value_type) => {
if supported_single_dictionary_value(value_type) {
return match key_type.as_ref() {
// TODO: turn this into a macro
Comment thread
Rich-T-kid marked this conversation as resolved.
DataType::Int8 => {
Ok(Box::new(GroupValuesDictionary::<
arrow::datatypes::Int8Type,
>::new(value_type)))
}
DataType::Int16 => {
Ok(Box::new(GroupValuesDictionary::<
arrow::datatypes::Int16Type,
>::new(value_type)))
}
DataType::Int32 => {
Ok(Box::new(GroupValuesDictionary::<
arrow::datatypes::Int32Type,
>::new(value_type)))
}
DataType::Int64 => {
Ok(Box::new(GroupValuesDictionary::<
arrow::datatypes::Int64Type,
>::new(value_type)))
}
DataType::UInt8 => {
Ok(Box::new(GroupValuesDictionary::<
arrow::datatypes::UInt8Type,
>::new(value_type)))
}
DataType::UInt16 => {
Ok(Box::new(GroupValuesDictionary::<
arrow::datatypes::UInt16Type,
>::new(value_type)))
}
DataType::UInt32 => {
Ok(Box::new(GroupValuesDictionary::<
arrow::datatypes::UInt32Type,
>::new(value_type)))
}
DataType::UInt64 => {
Ok(Box::new(GroupValuesDictionary::<
arrow::datatypes::UInt64Type,
>::new(value_type)))
}
_ => Err(datafusion_common::DataFusionError::NotImplemented(
format!("Unsupported dictionary key type: {key_type:?}",),
)),
};
}
}
_ => {}
}
}
Expand All @@ -207,6 +258,19 @@ pub fn new_group_values(
Ok(Box::new(GroupValuesColumn::<true>::try_new(schema)?))
}
} else {
// TODO: add specialized implementation for dictionary encoding columns for 2+ group by columns case
Ok(Box::new(GroupValuesRows::try_new(schema)?))
}
}

fn supported_single_dictionary_value(t: &DataType) -> bool {
matches!(
t,
DataType::Utf8
| DataType::LargeUtf8
| DataType::Binary
| DataType::LargeBinary
| DataType::Utf8View
| DataType::BinaryView
)
}
Comment on lines +266 to +276
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the most common types, but why not support other types? Other types would fall under the slow path no?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is intended to serve as a proof of concept that dictionary encoding works and improves efficiency. It makes sense to start with string types since that is what dictionary arrays are most commonly used with and where they provide the most benefit. A follow up issue can be created to support additional types such as LargeUtf8, LargeList, and numeric values ,which should be as minimal as adding the relevant branches in get_raw_bytes and sentinel_repr. This is also related to how sentinel representations of types work but I think thats worth a separate comment.

Loading
Loading