diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs index a255c0754582..141a90567a94 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -5146,3 +5146,142 @@ async fn test_hashjoin_dynamic_filter_pushdown_left_semi_join() { " ); } + +/// Reproduces a bug where FilterPushdown through ProjectionExec with duplicate +/// column names remaps filter predicates to the wrong source column. +/// +/// The bug is in two functions: +/// 1. `collect_reverse_alias` (projection.rs) uses `column_with_name()` which +/// returns the FIRST match — when two projection outputs share a name, the +/// second overwrites the first in the HashMap. +/// 2. `FilterRemapper::try_remap` (filter_pushdown.rs) uses `index_of()` which +/// also returns the FIRST match, silently rewriting column indices. +/// +/// We construct the physical plan directly (bypassing the logical optimizer) +/// to create the exact structure that triggers the bug: +/// +/// ```text +/// FilterExec: id@0 IS NULL ← checks output col 0 (right side's id) +/// ProjectionExec: [right_id@2 as id, ← output col 0 (from RIGHT) +/// name@1, +/// left_id@0 as id] ← output col 2 (from LEFT) +/// HashJoinExec: Left +/// left: [left_id, name] (columns 0-1) +/// right: [right_id] (column 2) +/// ``` +/// +/// Bug 1 overwrites Column("id", 0) → left_id@0 instead of right_id@2. +/// The filter `id@0 IS NULL` gets remapped to `left_id@0 IS NULL` (wrong side). +#[tokio::test] +async fn test_filter_pushdown_projection_duplicate_column_names() { + use datafusion_common::JoinType; + use datafusion_physical_expr::expressions::is_null; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Left table: orders (5 rows, all non-NULL left_id) + let left_batches = vec![ + record_batch!( + ("left_id", Int32, [1, 2, 3, 4, 5]), + ("name", Utf8, ["Alice", "Bob", "Charlie", "Diana", "Eve"]) + ) + .unwrap(), + ]; + let left_schema = Arc::new(Schema::new(vec![ + Field::new("left_id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + let left_scan = TestScanBuilder::new(Arc::clone(&left_schema)) + .with_batches(left_batches) + .build(); + + // Right table: returns (2 rows matching orders 1 and 3) + let right_batches = vec![record_batch!(("right_id", Int32, [1, 3])).unwrap()]; + let right_schema = Arc::new(Schema::new(vec![Field::new( + "right_id", + DataType::Int32, + false, + )])); + let right_scan = TestScanBuilder::new(Arc::clone(&right_schema)) + .with_batches(right_batches) + .build(); + + // HashJoinExec: LEFT JOIN on left_id = right_id + // Join output schema: [left_id(0), name(1), right_id(2)] + let join = Arc::new( + HashJoinExec::try_new( + left_scan, + right_scan, + vec![( + col("left_id", &left_schema).unwrap(), + col("right_id", &right_schema).unwrap(), + )], + None, + &JoinType::Left, + None, + PartitionMode::CollectLeft, + datafusion_common::NullEquality::NullEqualsNothing, + false, + ) + .unwrap(), + ); + let join_schema = join.schema(); + + // ProjectionExec: creates duplicate "id" columns + // output col 0: right_id@2 AS id (from RIGHT side, NULL for unmatched) + // output col 1: name@1 + // output col 2: left_id@0 AS id (from LEFT side, never NULL) + let projection = Arc::new( + ProjectionExec::try_new( + vec![ + (col("right_id", &join_schema).unwrap(), "id".to_string()), + (col("name", &join_schema).unwrap(), "name".to_string()), + (col("left_id", &join_schema).unwrap(), "id".to_string()), + ], + join, + ) + .unwrap(), + ); + // FilterExec: id@0 IS NULL + // This should check the RIGHT side's id (output col 0 = right_id). + // The anti-join pattern: find left rows with no match on the right. + let filter_expr = is_null(Arc::new(Column::new("id", 0))).unwrap(); + let plan = Arc::new(FilterExec::try_new(filter_expr, projection).unwrap()) + as Arc; + + // Show the plan BEFORE optimization + println!("=== Plan BEFORE FilterPushdown ==="); + println!("{}", format_plan_for_test(&plan)); + + // Apply the physical FilterPushdown optimizer + let config = ConfigOptions::default(); + let optimized = FilterPushdown::new() + .optimize(Arc::clone(&plan), &config) + .unwrap(); + + println!("\n=== Plan AFTER FilterPushdown ==="); + println!("{}", format_plan_for_test(&optimized)); + + // Execute the optimized plan. The original and optimized plans share + // DataSourceExec nodes via Arc, so we can only execute one of them + // (the data source yields its batches once). + let session_ctx = SessionContext::new_with_config(SessionConfig::new()); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let task_ctx = session_ctx.state().task_ctx(); + + let optimized_batches = collect(optimized, Arc::clone(&task_ctx)).await.unwrap(); + let optimized_count: usize = optimized_batches.iter().map(|b| b.num_rows()).sum(); + println!("\n=== Optimized results: {optimized_count} rows ==="); + println!("{}", pretty_format_batches(&optimized_batches).unwrap()); + + // The filter id@0 IS NULL checks the right side's id (NULL for unmatched + // rows in a Left join). Orders 2, 4, 5 have no match → 3 rows expected. + // + // Before the fix, collect_reverse_alias used column_with_name() which + // overwrote duplicate entries, and FilterRemapper::try_remap used + // index_of() which returned the first match — both causing the filter + // to be remapped to the wrong source column (0 rows returned). + assert_eq!(optimized_count, 3, "optimized plan should return 3 rows"); +} diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 9cb4382e2102..6c91e56ae9dd 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -30,7 +30,7 @@ use crate::column_rewriter::PhysicalColumnRewriter; use crate::execution_plan::CardinalityEffect; use crate::filter_pushdown::{ ChildFilterDescription, ChildPushdownResult, FilterDescription, FilterPushdownPhase, - FilterPushdownPropagation, FilterRemapper, PushedDownPredicate, + FilterPushdownPropagation, PushedDownPredicate, }; use crate::joins::utils::{ColumnIndex, JoinFilter, JoinOn, JoinOnRef}; use crate::{DisplayFormatType, ExecutionPlan, PhysicalExpr, check_if_same_properties}; @@ -45,7 +45,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, }; -use datafusion_common::{DataFusionError, JoinSide, Result, internal_err}; +use datafusion_common::{JoinSide, Result, internal_err}; use datafusion_execution::TaskContext; use datafusion_expr::ExpressionPlacement; use datafusion_physical_expr::equivalence::ProjectionMapping; @@ -205,18 +205,14 @@ impl ProjectionExec { &self, ) -> Result>> { let mut alias_map = datafusion_common::HashMap::new(); - for projection in self.projection_expr().iter() { - let (aliased_index, _output_field) = self - .projector - .output_schema() - .column_with_name(&projection.alias) - .ok_or_else(|| { - DataFusionError::Internal(format!( - "Expr {} with alias {} not found in output schema", - projection.expr, projection.alias - )) - })?; - let aliased_col = Column::new(&projection.alias, aliased_index); + // Use the enumerate index directly rather than `column_with_name`, + // because the output schema columns are ordered identically to the + // projection expressions. `column_with_name` returns the *first* + // column with a given name, which silently produces duplicate HashMap + // keys (and overwrites earlier entries) when the projection contains + // same-named columns from different join sides. + for (idx, projection) in self.projection_expr().iter().enumerate() { + let aliased_col = Column::new(&projection.alias, idx); alias_map.insert(aliased_col, Arc::clone(&projection.expr)); } Ok(alias_map) @@ -399,16 +395,22 @@ impl ExecutionPlan for ProjectionExec { ) -> Result { // expand alias column to original expr in parent filters let invert_alias_map = self.collect_reverse_alias()?; - let output_schema = self.schema(); - let remapper = FilterRemapper::new(output_schema); let mut child_parent_filters = Vec::with_capacity(parent_filters.len()); for filter in parent_filters { - // Check that column exists in child, then reassign column indices to match child schema - if let Some(reassigned) = remapper.try_remap(&filter)? { - // rewrite filter expression using invert alias map + // Validate that every column referenced by the filter exists in + // the reverse-alias map (keyed by exact (name, index) pair). + // We must NOT use FilterRemapper::try_remap here because its + // index_of lookup returns the *first* column with a given name, + // which silently re-targets the filter to the wrong column when + // the projection output contains duplicate column names (e.g. + // after a join where both sides have an `id` column). + let columns = collect_columns(&filter); + let all_in_alias_map = + columns.iter().all(|col| invert_alias_map.contains_key(col)); + if all_in_alias_map { let mut rewriter = PhysicalColumnRewriter::new(&invert_alias_map); - let rewritten = reassigned.rewrite(&mut rewriter)?.data; + let rewritten = filter.rewrite(&mut rewriter)?.data; child_parent_filters.push(PushedDownPredicate::supported(rewritten)); } else { child_parent_filters.push(PushedDownPredicate::unsupported(filter));