Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions datafusion/core/tests/physical_optimizer/pushdown_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ fn test_prefix_match_through_transparent_nodes() {

#[test]
fn test_no_prefix_match_wrong_direction() {
// Test that prefix matching does NOT work if the direction is wrong
// Test that when the requested sort [a DESC] matches a prefix of the source's
// natural ordering [a DESC, b ASC], the Sort is eliminated (Exact pushdown).
let schema = schema();

// Source has [a DESC, b ASC] ordering
Expand All @@ -265,7 +266,7 @@ fn test_no_prefix_match_wrong_direction() {
let source_ordering = LexOrdering::new(vec![a.clone().reverse(), b]).unwrap();
let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);

// Request [a DESC] - same direction as source, NOT a reverse prefix
// Request [a DESC] - same direction as source prefix, Sort should be eliminated
let same_direction = LexOrdering::new(vec![a.clone().reverse()]).unwrap();
let plan = sort_exec(same_direction, source);

Expand All @@ -278,8 +279,7 @@ fn test_no_prefix_match_wrong_direction() {
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
output:
Ok:
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC], file_type=parquet
"
);
}
Expand Down
35 changes: 19 additions & 16 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,19 +742,17 @@ impl FileSource for ParquetSource {
///
/// With both pieces of information, ParquetSource can decide what optimizations to apply.
///
/// # Phase 1 Behavior (Current)
/// Returns `Inexact` when reversing the row group scan order would help satisfy the
/// requested ordering. We still need a Sort operator at a higher level because:
/// - We only reverse row group read order, not rows within row groups
/// - This provides approximate ordering that benefits limit pushdown
///
/// # Phase 2 (Future)
/// Could return `Exact` when we can guarantee perfect ordering through techniques like:
/// - File reordering based on statistics
/// - Detecting already-sorted data
/// This would allow removing the Sort operator entirely.
/// # Behavior
/// - Returns `Exact` when the file's natural ordering (from Parquet metadata) already
/// satisfies the requested ordering. This allows the Sort operator to be eliminated
/// if the files within each group are also non-overlapping (checked by FileScanConfig).
/// - Returns `Inexact` when reversing the row group scan order would help satisfy the
/// requested ordering. We still need a Sort operator at a higher level because:
/// - We only reverse row group read order, not rows within row groups
/// - This provides approximate ordering that benefits limit pushdown
///
/// # Returns
/// - `Exact`: The file's natural ordering satisfies the request (within-file ordering guaranteed)
/// - `Inexact`: Created an optimized source (e.g., reversed scan) that approximates the order
/// - `Unsupported`: Cannot optimize for this ordering
fn try_pushdown_sort(
Expand All @@ -766,6 +764,16 @@ impl FileSource for ParquetSource {
return Ok(SortOrderPushdownResult::Unsupported);
}

// Check if the natural (non-reversed) ordering already satisfies the request.
// Parquet metadata guarantees within-file ordering, so if the ordering matches
// we can return Exact. FileScanConfig will verify that files within each group
// are non-overlapping before declaring the entire scan as Exact.
if eq_properties.ordering_satisfy(order.iter().cloned())? {
return Ok(SortOrderPushdownResult::Exact {
inner: Arc::new(self.clone()) as Arc<dyn FileSource>,
});
}

// Build new equivalence properties with the reversed ordering.
// This allows us to check if the reversed ordering satisfies the request
// by leveraging:
Expand Down Expand Up @@ -810,11 +818,6 @@ impl FileSource for ParquetSource {
Ok(SortOrderPushdownResult::Inexact {
inner: Arc::new(new_source) as Arc<dyn FileSource>,
})

// TODO Phase 2: Add support for other optimizations:
// - File reordering based on min/max statistics
// - Detection of exact ordering (return Exact to remove Sort operator)
// - Partial sort pushdown for prefix matches
}
}

Expand Down
Loading