Skip to content
139 changes: 139 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5146,3 +5146,142 @@ async fn test_hashjoin_dynamic_filter_pushdown_left_semi_join() {
"
);
}

/// Reproduces a bug where FilterPushdown through ProjectionExec with duplicate
/// column names remaps filter predicates to the wrong source column.
///
/// The bug is in two functions:
/// 1. `collect_reverse_alias` (projection.rs) uses `column_with_name()` which
/// returns the FIRST match — when two projection outputs share a name, the
/// second overwrites the first in the HashMap.
/// 2. `FilterRemapper::try_remap` (filter_pushdown.rs) uses `index_of()` which
/// also returns the FIRST match, silently rewriting column indices.
///
/// We construct the physical plan directly (bypassing the logical optimizer)
/// to create the exact structure that triggers the bug:
///
/// ```text
/// FilterExec: id@0 IS NULL ← checks output col 0 (right side's id)
/// ProjectionExec: [right_id@2 as id, ← output col 0 (from RIGHT)
/// name@1,
/// left_id@0 as id] ← output col 2 (from LEFT)
/// HashJoinExec: Left
/// left: [left_id, name] (columns 0-1)
/// right: [right_id] (column 2)
/// ```
///
/// Bug 1 overwrites Column("id", 0) → left_id@0 instead of right_id@2.
/// The filter `id@0 IS NULL` gets remapped to `left_id@0 IS NULL` (wrong side).
#[tokio::test]
async fn test_filter_pushdown_projection_duplicate_column_names() {
use datafusion_common::JoinType;
use datafusion_physical_expr::expressions::is_null;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};

// Left table: orders (5 rows, all non-NULL left_id)
let left_batches = vec![
record_batch!(
("left_id", Int32, [1, 2, 3, 4, 5]),
("name", Utf8, ["Alice", "Bob", "Charlie", "Diana", "Eve"])
)
.unwrap(),
];
let left_schema = Arc::new(Schema::new(vec![
Field::new("left_id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));
let left_scan = TestScanBuilder::new(Arc::clone(&left_schema))
.with_batches(left_batches)
.build();

// Right table: returns (2 rows matching orders 1 and 3)
let right_batches = vec![record_batch!(("right_id", Int32, [1, 3])).unwrap()];
let right_schema = Arc::new(Schema::new(vec![Field::new(
"right_id",
DataType::Int32,
false,
)]));
let right_scan = TestScanBuilder::new(Arc::clone(&right_schema))
.with_batches(right_batches)
.build();

// HashJoinExec: LEFT JOIN on left_id = right_id
// Join output schema: [left_id(0), name(1), right_id(2)]
let join = Arc::new(
HashJoinExec::try_new(
left_scan,
right_scan,
vec![(
col("left_id", &left_schema).unwrap(),
col("right_id", &right_schema).unwrap(),
)],
None,
&JoinType::Left,
None,
PartitionMode::CollectLeft,
datafusion_common::NullEquality::NullEqualsNothing,
false,
)
.unwrap(),
);
let join_schema = join.schema();

// ProjectionExec: creates duplicate "id" columns
// output col 0: right_id@2 AS id (from RIGHT side, NULL for unmatched)
// output col 1: name@1
// output col 2: left_id@0 AS id (from LEFT side, never NULL)
let projection = Arc::new(
ProjectionExec::try_new(
vec![
(col("right_id", &join_schema).unwrap(), "id".to_string()),
(col("name", &join_schema).unwrap(), "name".to_string()),
(col("left_id", &join_schema).unwrap(), "id".to_string()),
],
join,
)
.unwrap(),
);
// FilterExec: id@0 IS NULL
// This should check the RIGHT side's id (output col 0 = right_id).
// The anti-join pattern: find left rows with no match on the right.
let filter_expr = is_null(Arc::new(Column::new("id", 0))).unwrap();
let plan = Arc::new(FilterExec::try_new(filter_expr, projection).unwrap())
as Arc<dyn ExecutionPlan>;

// Show the plan BEFORE optimization
println!("=== Plan BEFORE FilterPushdown ===");
println!("{}", format_plan_for_test(&plan));

// Apply the physical FilterPushdown optimizer
let config = ConfigOptions::default();
let optimized = FilterPushdown::new()
.optimize(Arc::clone(&plan), &config)
.unwrap();

println!("\n=== Plan AFTER FilterPushdown ===");
println!("{}", format_plan_for_test(&optimized));

// Execute the optimized plan. The original and optimized plans share
// DataSourceExec nodes via Arc, so we can only execute one of them
// (the data source yields its batches once).
let session_ctx = SessionContext::new_with_config(SessionConfig::new());
session_ctx.register_object_store(
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
Arc::new(InMemory::new()),
);
let task_ctx = session_ctx.state().task_ctx();

let optimized_batches = collect(optimized, Arc::clone(&task_ctx)).await.unwrap();
let optimized_count: usize = optimized_batches.iter().map(|b| b.num_rows()).sum();
println!("\n=== Optimized results: {optimized_count} rows ===");
println!("{}", pretty_format_batches(&optimized_batches).unwrap());

// The filter id@0 IS NULL checks the right side's id (NULL for unmatched
// rows in a Left join). Orders 2, 4, 5 have no match → 3 rows expected.
//
// Before the fix, collect_reverse_alias used column_with_name() which
// overwrote duplicate entries, and FilterRemapper::try_remap used
// index_of() which returned the first match — both causing the filter
// to be remapped to the wrong source column (0 rows returned).
assert_eq!(optimized_count, 3, "optimized plan should return 3 rows");
}
42 changes: 22 additions & 20 deletions datafusion/physical-plan/src/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::column_rewriter::PhysicalColumnRewriter;
use crate::execution_plan::CardinalityEffect;
use crate::filter_pushdown::{
ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase,
FilterPushdownPropagation, FilterRemapper, PushedDownPredicate,
FilterPushdownPropagation, PushedDownPredicate,
};
use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef};
use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr, check_if_same_properties};
Expand All @@ -45,7 +45,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
};
use datafusion_common::{DataFusionError, JoinSide, Result, internal_err};
use datafusion_common::{JoinSide, Result, internal_err};
use datafusion_execution::TaskContext;
use datafusion_expr::ExpressionPlacement;
use datafusion_physical_expr::equivalence::ProjectionMapping;
Expand Down Expand Up @@ -205,18 +205,14 @@ impl ProjectionExec {
&self,
) -> Result<datafusion_common::HashMap<Column, Arc<dyn PhysicalExpr>>> {
let mut alias_map = datafusion_common::HashMap::new();
for projection in self.projection_expr().iter() {
let (aliased_index, _output_field) = self
.projector
.output_schema()
.column_with_name(&projection.alias)
.ok_or_else(|| {
DataFusionError::Internal(format!(
"Expr {} with alias {} not found in output schema",
projection.expr, projection.alias
))
})?;
let aliased_col = Column::new(&projection.alias, aliased_index);
// Use the enumerate index directly rather than `column_with_name`,
// because the output schema columns are ordered identically to the
// projection expressions. `column_with_name` returns the *first*
// column with a given name, which silently produces duplicate HashMap
// keys (and overwrites earlier entries) when the projection contains
// same-named columns from different join sides.
for (idx, projection) in self.projection_expr().iter().enumerate() {
let aliased_col = Column::new(&projection.alias, idx);
alias_map.insert(aliased_col, Arc::clone(&projection.expr));
}
Ok(alias_map)
Expand Down Expand Up @@ -399,16 +395,22 @@ impl ExecutionPlan for ProjectionExec {
) -> Result<FilterDescription> {
// expand alias column to original expr in parent filters
let invert_alias_map = self.collect_reverse_alias()?;
let output_schema = self.schema();
let remapper = FilterRemapper::new(output_schema);
let mut child_parent_filters = Vec::with_capacity(parent_filters.len());

for filter in parent_filters {
// Check that column exists in child, then reassign column indices to match child schema
if let Some(reassigned) = remapper.try_remap(&filter)? {
// rewrite filter expression using invert alias map
// Validate that every column referenced by the filter exists in
// the reverse-alias map (keyed by exact (name, index) pair).
// We must NOT use FilterRemapper::try_remap here because its
// index_of lookup returns the *first* column with a given name,
// which silently re-targets the filter to the wrong column when
// the projection output contains duplicate column names (e.g.
// after a join where both sides have an `id` column).
let columns = collect_columns(&filter);
let all_in_alias_map =
columns.iter().all(|col| invert_alias_map.contains_key(col));
if all_in_alias_map {
let mut rewriter = PhysicalColumnRewriter::new(&invert_alias_map);
let rewritten = reassigned.rewrite(&mut rewriter)?.data;
let rewritten = filter.rewrite(&mut rewriter)?.data;
child_parent_filters.push(PushedDownPredicate::supported(rewritten));
} else {
child_parent_filters.push(PushedDownPredicate::unsupported(filter));
Expand Down
Loading