feat: reorder row groups by grouping key statistics#21588
feat: reorder row groups by grouping key statistics#21588Dandandan wants to merge 9 commits intoapache:mainfrom
Conversation
When sort pushdown is active, reorder row groups within each file by their min/max statistics to match the requested sort order. This helps TopK queries find optimal values first via dynamic filter pushdown. - Add reorder_by_statistics to PreparedAccessPlan that sorts row_group_indexes by the first sort column's min values - Pass sort order from ParquetSource::try_pushdown_sort through to the opener via sort_order_for_reorder field - Reorder happens after pruning but before reverse (they compose) - Gracefully skips reorder when statistics unavailable, sort expr is not a simple column, row_selection present, or <=1 row groups Closes apache#21317
Extends the row group reordering infrastructure (from sort pushdown) to also reorder by GROUP BY key statistics. When an AggregateExec sits above a ParquetSource, the new ReorderByGroupKeys optimizer rule pushes grouping key expressions down so row groups with similar group key values are read together. Benefits: - Reduces active cardinality of aggregation hash tables - Improves CPU cache locality during hash table lookups Adds try_pushdown_groupby_order() to ExecutionPlan, DataSource, and FileSource traits, with ParquetSource implementation that reuses the existing reorder_by_statistics infrastructure. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
run benchmark clickbench_extended |
|
run benchmark tpch |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing reorder-by-groupby-keys (6c143f7) to 29c5dd5 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing reorder-by-groupby-keys (6c143f7) to 29c5dd5 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
run benchmark clickbench |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing reorder-by-groupby-keys (6c143f7) to 29c5dd5 (merge-base) diff using: clickbench_extended File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing reorder-by-groupby-keys (6c143f7) to 29c5dd5 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing reorder-by-groupby-keys (6c143f7) to 29c5dd5 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing reorder-by-groupby-keys (6c143f7) to 29c5dd5 (merge-base) diff File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
Extends the row group reordering infrastructure (from sort pushdown) to also reorder by GROUP BY key statistics. When an AggregateExec sits above a ParquetSource, the new ReorderByGroupKeys optimizer rule pushes grouping key expressions down so row groups with similar group key values are read together. Two levels of reordering: - Files within partitions are sorted by grouping key min statistics - Row groups within each file are reordered by grouping key statistics Benefits: - Reduces active cardinality of aggregation hash tables - Improves CPU cache locality during hash table lookups Adds try_pushdown_groupby_order() to ExecutionPlan, DataSource, and FileSource traits, with ParquetSource implementation that reuses the existing reorder_by_statistics infrastructure. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run benchmark clickbench_1 |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing reorder-by-groupby-keys (4e09360) to 29c5dd5 (merge-base) diff using: clickbench_1 File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_extended — base (merge-base)
clickbench_extended — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_1 — base (merge-base)
clickbench_1 — branch
File an issue against this benchmark runner |
Two improvements to the groupby row group reordering: 1. Add try_pushdown_groupby_order delegation to transparent nodes (CoalesceBatchesExec, BufferExec, CooperativeExec, ProjectionExec, CoalescePartitionsExec, RepartitionExec) so the optimization fires even when intermediate nodes sit between AggregateExec and DataSourceExec. 2. Use ALL grouping keys for multi-column file-level sorting instead of just the first key. This gives much better clustering for multi-column GROUP BY queries like GROUP BY (a, b). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing reorder-by-groupby-keys (b41f84e) to 29c5dd5 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing reorder-by-groupby-keys (b41f84e) to 29c5dd5 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing reorder-by-groupby-keys (b41f84e) to 29c5dd5 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
FilterExec preserves the schema unchanged, so group expressions pass through directly. This ensures the optimization fires when a FilterExec sits between AggregateExec and DataSourceExec. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing reorder-by-groupby-keys (1c67f29) to 29c5dd5 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing reorder-by-groupby-keys (1c67f29) to 29c5dd5 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing reorder-by-groupby-keys (1c67f29) to 29c5dd5 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
…eshold When the partial aggregate's hash table exceeds a configurable group count threshold (default: 10000), emit accumulated state and reset. This keeps the hash table small enough to fit in CPU cache, improving performance for high-cardinality GROUP BY queries. The downstream FinalPartitioned aggregate correctly merges multiple partial emissions via merge_batch. For low-cardinality GROUP BY queries, the threshold is never reached, so behavior is unchanged. New config: datafusion.execution.partial_aggregation_group_count_emit_threshold Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run benchmarks |
10000 groups was too aggressive — frequent emissions add overhead. 100000 groups (~20MB hash table) fits comfortably in L3 cache while emitting infrequently enough to keep overhead negligible. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing reorder-by-groupby-keys (beb8325) to 29c5dd5 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing reorder-by-groupby-keys (beb8325) to 29c5dd5 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing reorder-by-groupby-keys (beb8325) to 29c5dd5 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Which issue does this PR close?
Extends #21580 (row group reorder by statistics during sort pushdown) to also reorder by GROUP BY keys, as suggested in the review.
Rationale for this change
When an
AggregateExecsits above a Parquet data source, reading row groups in grouping-key order clusters similar group values together. This:What changes are included in this PR?
try_pushdown_groupby_order()toExecutionPlan,DataSource, andFileSourcetraits (default: no-op)DataSourceExec→FileScanConfig→ParquetSourcechain, reusing the existingsort_order_for_reorder/reorder_by_statisticsinfrastructure from feat: reorder row groups by statistics during sort pushdown #21580ReorderByGroupKeysphysical optimizer rule that detectsAggregateExec→DataSourceExecpatterns and pushes grouping key expressions downPushdownSortso sort pushdown can override the reorder hint when a sort requirement is presentAre these changes tested?
Yes — SLT tests in
sort_pushdown.slt(Test I) verify correct GROUP BY SUM and COUNT results with the optimization active on multi-row-group Parquet files.Are there any user-facing changes?
No user-facing API changes. The optimization is automatic when aggregating over Parquet data.
🤖 Generated with Claude Code