Skip to content

fix: impl handle_child_pushdown_result for SortExec#21527

Open
haohuaijin wants to merge 5 commits intoapache:mainfrom
haohuaijin:impl-handle-child
Open

fix: impl handle_child_pushdown_result for SortExec#21527
haohuaijin wants to merge 5 commits intoapache:mainfrom
haohuaijin:impl-handle-child

Conversation

@haohuaijin
Copy link
Copy Markdown
Contributor

@haohuaijin haohuaijin commented Apr 10, 2026

Which issue does this PR close?

Rationale for this change

FilterPushdown does not push filters through SortExec because SortExec lacks an implementation of handle_child_pushdown_result. When a FilterExec sits above a plain SortExec (no fetch), the filter can safely be moved below the sort without changing semantics, since sorting preserves all rows.

What changes are included in this PR?

Implemented handle_child_pushdown_result for SortExec in datafusion/physical-plan/src/sorts/sort.rs:

  • For plain SortExec (no fetch) in the Pre phase: any filters not absorbed by the child are collected into a new FilterExec inserted between the SortExec and its child.
  • For SortExec with fetch (TopK) or non-Pre phases: filters are not absorbed, preserving correct TopK semantics where filtering after limiting would change results.

Are these changes tested?

Added comprehensive tests in datafusion/core/tests/physical_optimizer/filter_pushdown.rs covering:

  • Filter pushdown through sort into a scan that supports pushdown
  • Filter pushdown through sort when scan does NOT support pushdown (FilterExec inserted between sort and scan)
  • Multiple conjunctive filters pushed through sort
  • Filter NOT pushed through sort with fetch (TopK)
  • Filter with projection pushed through sort
  • Filter pushdown preserving the preserve_partitioning flag
  • Filter with fetch limit propagated to SortExec (TopK conversion)

and one test case test_filter_pushdown_through_sort_with_projection for use LogicalPlanBuilder to reproduce.
not able to use sql to reproduce due to #15886

Are there any user-facing changes?

@github-actions github-actions bot added core Core DataFusion crate physical-plan Changes to the physical-plan crate labels Apr 10, 2026
@haohuaijin
Copy link
Copy Markdown
Contributor Author

the reproduce result in #21526, for this pr

Physical plan:
FilterExec: log@1 LIKE %datafusion%, projection=[_timestamp@0]
  SortExec: expr=[_timestamp@0 DESC NULLS LAST], preserve_partitioning=[true]
    DataSourceExec: partitions=1, partition_sizes=[1]

After FilterPushdown:
ProjectionExec: expr=[_timestamp@0 as _timestamp]
  SortExec: expr=[_timestamp@0 DESC NULLS LAST], preserve_partitioning=[true]
    FilterExec: log@1 LIKE %datafusion%
      DataSourceExec: partitions=1, partition_sizes=[1]

@haohuaijin haohuaijin changed the title fix: impl handle_child_pushdown_result for SortExec fix: impl handle_child_pushdown_result for SortExec Apr 10, 2026
@haohuaijin
Copy link
Copy Markdown
Contributor Author

cc @alamb @adriangb do you have time take a look 🙏


/// FilterExec above a plain SortExec (no fetch) should be pushed below it.
/// The scan supports pushdown, so the filter lands in the DataSourceExec.
#[test]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add these as SLT tests instead?

// Only absorb filters in Pre phase for a plain sort (no fetch).
// A sort with fetch (TopK) must not accept filters: reordering
// filter vs. limit would change semantics.
if phase != FilterPushdownPhase::Pre || self.fetch.is_some() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why FilterPushdownPhase::Post should not be allowed, maybe with examples of where it would be incorrect?

Comment on lines +1456 to +1471
// Build a single conjunctive predicate from the unsupported filters
// and insert a FilterExec between this SortExec and its child.
let predicate = datafusion_physical_expr::conjunction(unsupported_filters);
let new_child =
Arc::new(FilterExec::try_new(predicate, Arc::clone(self.input()))?)
as Arc<dyn ExecutionPlan>;
let new_sort = Arc::new(
SortExec::new(self.expr.clone(), new_child)
.with_fetch(self.fetch())
.with_preserve_partitioning(self.preserve_partitioning()),
) as Arc<dyn ExecutionPlan>;

Ok(FilterPushdownPropagation {
filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()],
updated_node: Some(new_sort),
})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Swapping the order from FilterExec -> SortExec to SortExec -> FilterExec makes sense to me 👍🏻

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

filter not pushdown through sort in FilterPushdown

2 participants