Skip to content

Conversation

@mbutrovich
Copy link
Contributor

@mbutrovich mbutrovich commented Feb 8, 2026

Which issue does this PR close?

Closes #3442

Rationale for this change

Dynamic Partition Pruning (DPP) is essential for optimizing star schema queries. Previously, native_datafusion scans would fall back to Spark when DPP was present. This PR enables full DPP support by deferring partition serialization until execution time, after DPP subqueries are resolved.

Additionally, this approach reduces serialization overhead when scanning large Parquet datasets, as partition metadata is no longer replicated across all Spark partitions.

What changes are included in this PR?

Architecture:

  • CometNativeScanExec now uses lazy serializedPartitionData to defer serialization to execution time
  • CometNativeScan.convert() creates a placeholder with only a scan_id at planning time
  • serializePartitions() resolves DPP subqueries and serializes filtered partitions at execution
  • Uses originalPlan.partitionFilters instead of partitionFilters because AQE's PlanDynamicPruningFilters converts subqueries to literals via makeCopy

SubqueryBroadcast Transformation:

  • Transform SubqueryBroadcastExec children to use CometBroadcastExchangeExec wrapped in CometColumnarToRowExec for row-based output
  • This enables broadcast exchange reuse via canonicalization - the same CometBroadcastExchangeExec used in the join can be reused by the DPP subquery filter
  • Uses transformUp instead of transformUpWithSubqueries to preserve ReusedSubqueryExec object identity for scalar subqueries

Configuration:

  • New flag spark.comet.scan.dpp.enabled (default: true) replaces spark.comet.dppFallback.enabled

Shims:

  • Added getDppFilteredFilePartitions() and getDppFilteredBucketedFilePartitions() to ShimCometScanExec for Spark 3.4/3.5/4.0
  • Added resolveSubqueryAdaptiveBroadcast() to ShimSubqueryBroadcast

Spark Diff Updates:

  • Updated DynamicPartitionPruningSuite.checkPartitionPruningPredicate to recognize CometColumnarToRowExec → CometBroadcastExchangeExec as a valid SubqueryBroadcast child structure

Other:

  • Removed custom equals/hashCode from CometNativeScanExec to prevent incorrect AQE exchange reuse between scans with different projections

How are these changes tested?

  • New CometExecSuite tests for DPP with native_datafusion scans, multiple partition columns, non-broadcast subqueries (SubqueryExec), and subquery reuse (ReusedSubqueryExec)
  • New test for DPP broadcast exchange reuse validation
  • New test for SubqueryBroadcast transformation preserving ReusedSubqueryExec references
  • New CometIcebergNativeSuite test for Iceberg DPP with non-broadcast joins
  • Updated Spark 3.5.8 diff to handle Comet's SubqueryBroadcast structure in DPP validation and CometNativeScan in DPP validation

@mbutrovich mbutrovich added this to the 0.14.0 milestone Feb 8, 2026
@mbutrovich mbutrovich marked this pull request as draft February 11, 2026 11:55
…Row -> CometNativeExec into SubqueryBroadcast -> CometColumnarToRow -> CometBroadcastExchange -> CometNativeExec

- This allows CometBroadcastExchange to be reused by both the SubqueryBroadcast path and the join path
- CometColumnarToRowExec is still needed because SubqueryBroadcastExec expects HashedRelation from doExecuteBroadcast()
Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome work @mbutrovich! LGTM pending CI. Let's get this merged and keep iterating/testing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add DPP support to native_datafusion/CometNativeScan

2 participants