Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions datafusion/core/tests/physical_optimizer/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,8 @@ async fn test_static_filter_pushdown_through_hash_join() {
"
);

// Test left join - filters should NOT be pushed down
// Test left join: filter on preserved (build) side is pushed down,
// filter on non-preserved (probe) side is NOT pushed down.
let join = Arc::new(
HashJoinExec::try_new(
TestScanBuilder::new(Arc::clone(&build_side_schema))
Expand Down Expand Up @@ -429,7 +430,6 @@ async fn test_static_filter_pushdown_through_hash_join() {
let plan =
Arc::new(FilterExec::try_new(filter, join).unwrap()) as Arc<dyn ExecutionPlan>;

// Test that filters are NOT pushed down for left join
insta::assert_snapshot!(
OptimizationTest::new(plan, FilterPushdown::new(), true),
@r"
Expand All @@ -441,10 +441,9 @@ async fn test_static_filter_pushdown_through_hash_join() {
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true
output:
Ok:
- FilterExec: a@0 = aa
- HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true
- HashJoinExec: mode=Partitioned, join_type=Left, on=[(a@0, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = aa
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true
"
);
}
Expand Down
87 changes: 70 additions & 17 deletions datafusion/physical-plan/src/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ use datafusion_common::{
};
use datafusion_physical_expr::{expressions::Column, utils::reassign_expr_columns};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use itertools::Itertools;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FilterPushdownPhase {
Expand Down Expand Up @@ -317,21 +316,33 @@ pub struct ChildFilterDescription {
/// exist in the target schema. If any column in the filter is not present
/// in the schema, the filter cannot be pushed down to that child.
pub(crate) struct FilterColumnChecker<'a> {
column_names: HashSet<&'a str>,
/// The set of `(column_name, column_index)` pairs that are allowed.
allowed_columns: HashSet<(&'a str, usize)>,
}

impl<'a> FilterColumnChecker<'a> {
/// Creates a new [`FilterColumnChecker`] from the given schema.
///
/// Extracts all column names from the schema's fields to build
/// Extracts all column `(name, index)` pairs from the schema's fields to build
/// a lookup set for efficient column existence checks.
pub(crate) fn new(input_schema: &'a Schema) -> Self {
let column_names: HashSet<&str> = input_schema
pub(crate) fn new_schema(input_schema: &'a Schema) -> Self {
let allowed_columns: HashSet<(&str, usize)> = input_schema
.fields()
.iter()
.map(|f| f.name().as_str())
.enumerate()
.map(|(i, f)| (f.name().as_str(), i))
.collect();
Self { column_names }
Self { allowed_columns }
}

/// Creates a new [`FilterColumnChecker`] from an explicit set of
/// allowed `(name, index)` pairs.
///
/// This is used by join nodes that need to restrict pushdown to columns
/// belonging to a specific side of the join, even when different sides
/// have columns with the same name.
pub(crate) fn new_columns(allowed_columns: HashSet<(&'a str, usize)>) -> Self {
Self { allowed_columns }
}

/// Checks whether a filter expression can be pushed down to the child
Expand All @@ -347,7 +358,9 @@ impl<'a> FilterColumnChecker<'a> {
filter
.apply(|expr| {
if let Some(column) = expr.as_any().downcast_ref::<Column>()
&& !self.column_names.contains(column.name())
&& !self
.allowed_columns
.contains(&(column.name(), column.index()))
{
can_apply = false;
return Ok(TreeNodeRecursion::Stop);
Expand Down Expand Up @@ -375,7 +388,7 @@ impl ChildFilterDescription {
let child_schema = child.schema();

// Build a set of column names in the child schema for quick lookup
let checker = FilterColumnChecker::new(&child_schema);
let checker = FilterColumnChecker::new_schema(&child_schema);

// Analyze each parent filter
let mut child_parent_filters = Vec::with_capacity(parent_filters.len());
Expand All @@ -401,6 +414,52 @@ impl ChildFilterDescription {
})
}

/// Like [`Self::from_child`], but restricts which parent-level columns are
/// considered reachable through this child.
///
/// `allowed_columns` is a set of `(column_name, column_index)` pairs
/// (in the *parent* schema) that map to this child's side of a join.
/// A filter is only eligible for pushdown when **every** column it
/// references appears in `allowed_columns`. This prevents incorrect
/// pushdown when different join sides have columns with the same name.
pub fn from_child_with_allowed_columns(
parent_filters: &[Arc<dyn PhysicalExpr>],
allowed_columns: HashSet<(&str, usize)>,
child: &Arc<dyn crate::ExecutionPlan>,
) -> Result<Self> {
let child_schema = child.schema();
let checker = FilterColumnChecker::new_columns(allowed_columns);

let mut child_parent_filters = Vec::with_capacity(parent_filters.len());
for filter in parent_filters {
if checker.can_pushdown(filter) {
let reassigned_filter =
reassign_expr_columns(Arc::clone(filter), &child_schema)?;
child_parent_filters
.push(PushedDownPredicate::supported(reassigned_filter));
} else {
child_parent_filters
.push(PushedDownPredicate::unsupported(Arc::clone(filter)));
}
}

Ok(Self {
parent_filters: child_parent_filters,
self_filters: vec![],
})
}

/// Mark all parent filters as unsupported for this child.
pub fn all_unsupported(parent_filters: &[Arc<dyn PhysicalExpr>]) -> Self {
Self {
parent_filters: parent_filters
.iter()
.map(|f| PushedDownPredicate::unsupported(Arc::clone(f)))
.collect(),
self_filters: vec![],
}
}

/// Add a self filter (from the current node) to be pushed down to this child.
pub fn with_self_filter(mut self, filter: Arc<dyn PhysicalExpr>) -> Self {
self.self_filters.push(filter);
Expand Down Expand Up @@ -476,15 +535,9 @@ impl FilterDescription {
children: &[&Arc<dyn crate::ExecutionPlan>],
) -> Self {
let mut desc = Self::new();
let child_filters = parent_filters
.iter()
.map(|f| PushedDownPredicate::unsupported(Arc::clone(f)))
.collect_vec();
for _ in 0..children.len() {
desc = desc.with_child(ChildFilterDescription {
parent_filters: child_filters.clone(),
self_filters: vec![],
});
desc =
desc.with_child(ChildFilterDescription::all_unsupported(parent_filters));
}
desc
}
Expand Down
131 changes: 97 additions & 34 deletions datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{any::Any, vec};
use crate::ExecutionPlanProperties;
use crate::execution_plan::{EmissionType, boundedness_from_children};
use crate::filter_pushdown::{
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation,
};
use crate::joins::Map;
Expand Down Expand Up @@ -720,7 +720,9 @@ impl HashJoinExec {
}

fn allow_join_dynamic_filter_pushdown(&self, config: &ConfigOptions) -> bool {
if !config.optimizer.enable_join_dynamic_filter_pushdown {
if self.join_type != JoinType::Inner
|| !config.optimizer.enable_join_dynamic_filter_pushdown
{
return false;
}

Expand Down Expand Up @@ -1407,32 +1409,76 @@ impl ExecutionPlan for HashJoinExec {
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
config: &ConfigOptions,
) -> Result<FilterDescription> {
// Other types of joins can support *some* filters, but restrictions are complex and error prone.
// For now we don't support them.
// See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs
// See https://github.com/apache/datafusion/issues/16973 for tracking.
if self.join_type != JoinType::Inner {
return Ok(FilterDescription::all_unsupported(
&parent_filters,
&self.children(),
));
// This is the physical-plan equivalent of `push_down_all_join` in
// `datafusion/optimizer/src/push_down_filter.rs`. That function uses
// `lr_is_preserved` to decide which parent predicates can be pushed
// past a logical join to its children, then checks column references
// to route each predicate to the correct side.
//
// We apply the same two-level logic here:
// 1. `lr_is_preserved` gates whether a side is eligible at all.
// 2. For each filter, we check that all column references belong to
// the target child using `(name, index)` pairs via
// `from_child_with_allowed_columns`. This is critical for
// correctness: name-based matching alone can incorrectly push
// filters when different join sides have columns with the same
// name (see https://github.com/apache/datafusion/issues/20213).
let (left_preserved, right_preserved) = lr_is_preserved(self.join_type);

// Build the set of allowed (name, index) pairs for each side.
// When a projection is present, the output schema differs from the
// raw join schema, so we need to map through the projection.
let output_schema = self.schema();
let output_fields = output_schema.fields();
let column_indices: Vec<ColumnIndex> = match self.projection.as_ref() {
Some(projection) => projection
.iter()
.map(|i| self.column_indices[*i].clone())
.collect(),
None => self.column_indices.clone(),
};

let mut left_allowed = std::collections::HashSet::new();
let mut right_allowed = std::collections::HashSet::new();
for (i, ci) in column_indices.iter().enumerate() {
let name = output_fields[i].name().as_str();
match ci.side {
JoinSide::Left => {
left_allowed.insert((name, i));
}
JoinSide::Right => {
right_allowed.insert((name, i));
}
JoinSide::None => {
// Mark columns — don't allow pushdown to either side
}
}
}

// Get basic filter descriptions for both children
let left_child = crate::filter_pushdown::ChildFilterDescription::from_child(
&parent_filters,
self.left(),
)?;
let mut right_child = crate::filter_pushdown::ChildFilterDescription::from_child(
&parent_filters,
self.right(),
)?;
let left_child = if left_preserved {
ChildFilterDescription::from_child_with_allowed_columns(
&parent_filters,
left_allowed,
self.left(),
)?
} else {
ChildFilterDescription::all_unsupported(&parent_filters)
};

let mut right_child = if right_preserved {
ChildFilterDescription::from_child_with_allowed_columns(
&parent_filters,
right_allowed,
self.right(),
)?
} else {
ChildFilterDescription::all_unsupported(&parent_filters)
};

// Add dynamic filters in Post phase if enabled
if matches!(phase, FilterPushdownPhase::Post)
&& self.allow_join_dynamic_filter_pushdown(config)
{
// Add actual dynamic filter to right side (probe side)
let dynamic_filter = Self::create_dynamic_filter(&self.on);
right_child = right_child.with_self_filter(dynamic_filter);
}
Expand All @@ -1448,19 +1494,6 @@ impl ExecutionPlan for HashJoinExec {
child_pushdown_result: ChildPushdownResult,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
// Note: this check shouldn't be necessary because we already marked all parent filters as unsupported for
// non-inner joins in `gather_filters_for_pushdown`.
// However it's a cheap check and serves to inform future devs touching this function that they need to be really
// careful pushing down filters through non-inner joins.
if self.join_type != JoinType::Inner {
// Other types of joins can support *some* filters, but restrictions are complex and error prone.
// For now we don't support them.
// See the logical optimizer rules for more details: datafusion/optimizer/src/push_down_filter.rs
return Ok(FilterPushdownPropagation::all_unsupported(
child_pushdown_result,
));
}

let mut result = FilterPushdownPropagation::if_any(child_pushdown_result.clone());
assert_eq!(child_pushdown_result.self_filters.len(), 2); // Should always be 2, we have 2 children
let right_child_self_filters = &child_pushdown_result.self_filters[1]; // We only push down filters to the right child
Expand Down Expand Up @@ -1501,6 +1534,22 @@ impl ExecutionPlan for HashJoinExec {
}
}

/// Determines which sides of a join are "preserved" for filter pushdown.
///
/// A preserved side means filters on that side's columns can be safely pushed
/// below the join. This mirrors the logic in the logical optimizer's
/// `lr_is_preserved` in `datafusion/optimizer/src/push_down_filter.rs`.
fn lr_is_preserved(join_type: JoinType) -> (bool, bool) {
match join_type {
JoinType::Inner => (true, true),
JoinType::Left => (true, false),
JoinType::Right => (false, true),
JoinType::Full => (false, false),
JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => (true, false),
JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => (false, true),
}
}

/// Accumulator for collecting min/max bounds from build-side data during hash join.
///
/// This struct encapsulates the logic for progressively computing column bounds
Expand Down Expand Up @@ -5715,4 +5764,18 @@ mod tests {
.contains("null_aware anti join only supports single column join key")
);
}

#[test]
fn test_lr_is_preserved() {
assert_eq!(lr_is_preserved(JoinType::Inner), (true, true));
assert_eq!(lr_is_preserved(JoinType::Left), (true, false));
assert_eq!(lr_is_preserved(JoinType::Right), (false, true));
assert_eq!(lr_is_preserved(JoinType::Full), (false, false));
assert_eq!(lr_is_preserved(JoinType::LeftSemi), (true, false));
assert_eq!(lr_is_preserved(JoinType::LeftAnti), (true, false));
assert_eq!(lr_is_preserved(JoinType::LeftMark), (true, false));
assert_eq!(lr_is_preserved(JoinType::RightSemi), (false, true));
assert_eq!(lr_is_preserved(JoinType::RightAnti), (false, true));
assert_eq!(lr_is_preserved(JoinType::RightMark), (false, true));
}
}
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ impl ExecutionPlan for ProjectionExec {
// expand alias column to original expr in parent filters
let invert_alias_map = self.collect_reverse_alias()?;
let output_schema = self.schema();
let checker = FilterColumnChecker::new(&output_schema);
let checker = FilterColumnChecker::new_schema(&output_schema);
let mut child_parent_filters = Vec::with_capacity(parent_filters.len());

for filter in parent_filters {
Expand Down
Loading
Loading