Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 25 additions & 51 deletions datafusion/physical-optimizer/src/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,39 +286,26 @@ pub fn adjust_input_keys_ordering(
) -> Result<Transformed<PlanWithKeyRequirements>> {
let plan = Arc::clone(&requirements.plan);

if let Some(HashJoinExec {
left,
right,
on,
filter,
join_type,
projection,
mode,
null_equality,
null_aware,
..
}) = plan.as_any().downcast_ref::<HashJoinExec>()
if let Some(
exec @ HashJoinExec {
left,
on,
join_type,
mode,
..
},
) = plan.as_any().downcast_ref::<HashJoinExec>()
{
match mode {
PartitionMode::Partitioned => {
let join_constructor = |new_conditions: (
Vec<(PhysicalExprRef, PhysicalExprRef)>,
Vec<SortOptions>,
)| {
HashJoinExecBuilder::new(
Arc::clone(left),
Arc::clone(right),
new_conditions.0,
*join_type,
)
.with_filter(filter.clone())
// TODO: although projection is not used in the join here, because projection pushdown is after enforce_distribution. Maybe we need to handle it later. Same as filter.
.with_projection_ref(projection.clone())
.with_partition_mode(PartitionMode::Partitioned)
.with_null_equality(*null_equality)
.with_null_aware(*null_aware)
.build()
.map(|e| Arc::new(e) as _)
HashJoinExecBuilder::from(exec)
.with_partition_mode(PartitionMode::Partitioned)
.with_on(new_conditions.0)
.build_exec()
};
return reorder_partitioned_join_keys(
requirements,
Expand Down Expand Up @@ -612,18 +599,15 @@ pub fn reorder_join_keys_to_inputs(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Arc<dyn ExecutionPlan>> {
let plan_any = plan.as_any();
if let Some(HashJoinExec {
left,
right,
on,
filter,
join_type,
projection,
mode,
null_equality,
null_aware,
..
}) = plan_any.downcast_ref::<HashJoinExec>()
if let Some(
exec @ HashJoinExec {
left,
right,
on,
mode,
..
},
) = plan_any.downcast_ref::<HashJoinExec>()
{
if matches!(mode, PartitionMode::Partitioned) {
let (join_keys, positions) = reorder_current_join_keys(
Expand All @@ -639,20 +623,10 @@ pub fn reorder_join_keys_to_inputs(
right_keys,
} = join_keys;
let new_join_on = new_join_conditions(&left_keys, &right_keys);
return Ok(Arc::new(
HashJoinExecBuilder::new(
Arc::clone(left),
Arc::clone(right),
new_join_on,
*join_type,
)
.with_filter(filter.clone())
.with_projection_ref(projection.clone())
return HashJoinExecBuilder::from(exec)
.with_partition_mode(PartitionMode::Partitioned)
.with_null_equality(*null_equality)
.with_null_aware(*null_aware)
.build()?,
));
.with_on(new_join_on)
.build_exec();
}
}
} else if let Some(SortMergeJoinExec {
Expand Down
13 changes: 12 additions & 1 deletion datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub use crate::ordering::InputOrderMode;
use crate::sort_pushdown::SortOrderPushdownResult;
pub use crate::stream::EmptyRecordBatchStream;

use arrow_schema::Schema;
pub use datafusion_common::hash_utils;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
pub use datafusion_common::utils::project_schema;
Expand All @@ -38,7 +39,7 @@ pub use datafusion_physical_expr::{

use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;
use std::sync::{Arc, LazyLock};

use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::display::DisplayableExecutionPlan;
Expand Down Expand Up @@ -1444,6 +1445,16 @@ pub enum CardinalityEffect {
GreaterEqual,
}

/// Can be used in contexts where properties have not yet been initialized properly.
pub(crate) static STUB_PROPERTIES: LazyLock<PlanProperties> = LazyLock::new(|| {
PlanProperties::new(
EquivalenceProperties::new(Arc::new(Schema::empty())),
Partitioning::UnknownPartitioning(1),
EmissionType::Final,
Boundedness::Bounded,
)
});

#[cfg(test)]
mod tests {
use std::any::Any;
Expand Down
Loading