fix: share unified memory pools across native execution contexts within a task#3924
Open
andygrove wants to merge 12 commits intoapache:mainfrom
Open
fix: share unified memory pools across native execution contexts within a task#3924andygrove wants to merge 12 commits intoapache:mainfrom
andygrove wants to merge 12 commits intoapache:mainfrom
Conversation
…ation When Comet executes a shuffle, it creates two separate native plans (the child plan and the shuffle writer plan) that run concurrently in a pipelined fashion. Previously, each plan got its own memory pool at the full per-task limit, effectively allowing 2x the intended memory to be consumed. The new `fair_unified_task_shared` pool type shares a single CometFairMemoryPool across all native plans within the same Spark task. This ensures the total memory stays within the per-task limit while dynamically distributing memory among operators based on how many register as memory consumers (e.g. if the child plan is a simple scan+filter, the shuffle writer gets 100% of the pool). This is now the default for off-heap mode. Closes apache#3921 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
When using fair_unified_task_shared, multiple execution contexts on the same thread share a single Arc<dyn MemoryPool>. The tracing code was summing pool.reserved() for each registered context, double-counting the shared pool and reporting 2x the actual memory reservation. Deduplicate pools by Arc data pointer before summing so each underlying pool is only counted once.
Make fair_unified_task_shared opt-in rather than the default to simplify review. Update docs to reflect the new default.
Add context about how Comet creates two concurrent native plans per Spark task during shuffle and why this matters for pool selection.
comphead
reviewed
Apr 13, 2026
| "The type of memory pool to be used for Comet native execution when running Spark in " + | ||
| "off-heap mode. Available pool types are `greedy_unified` and `fair_unified`. " + | ||
| "off-heap mode. Available pool types are `greedy_unified`, `fair_unified`, and " + | ||
| "`fair_unified_task_shared`. The `fair_unified_task_shared` pool is shared across " + |
Contributor
There was a problem hiding this comment.
read the description twice, it is difficult to understand without example what exactly fair_unified_task_shared helps to achieve
Member
Author
There was a problem hiding this comment.
Updated. Let me know if this is clearer now.
comphead
reviewed
Apr 13, 2026
|
|
||
| Comet implements multiple memory pool implementations. The type of pool can be specified with `spark.comet.exec.memoryPool`. | ||
|
|
||
| When Comet executes a shuffle, it creates two separate native plans within the same Spark task: the child plan |
Contributor
There was a problem hiding this comment.
maybe it is not a plan, and rather a stage DAG?
plan is per operator: scan-> shuffle -> write
stage DAG is different, the plan above split into
Stage1: scan + shuffle write
Stage2: shuffle read + write
Member
Author
There was a problem hiding this comment.
It is really DataFusion execution contexts. I updated.
comphead
reviewed
Apr 13, 2026
comphead
reviewed
Apr 13, 2026
native/core/src/execution/jni_api.rs
Outdated
| let mut seen = std::collections::HashSet::new(); | ||
| pools | ||
| .values() | ||
| .filter(|p| seen.insert(Arc::as_ptr(p) as *const () as usize)) |
- Clarify terminology: replace "native plans" with "execution contexts" to avoid confusion with Spark plan/stage concepts - Rewrite fair_unified_task_shared config description with concrete example of the 2x memory problem during shuffle - Use filter_map with then() for pool deduplication
Add greedy_unified_task_shared pool type for completeness alongside fair_unified_task_shared. Both _task_shared variants share a single memory pool across all native execution contexts in the same Spark task, preventing 2x memory consumption during shuffle.
Instead of adding new _task_shared pool variants, fix the existing fair_unified and greedy_unified pools to share a single pool instance across all native execution contexts within the same Spark task. This fixes the bug where concurrent execution contexts (e.g. pre-shuffle operators and shuffle writer) could each allocate up to the full per-task memory limit independently.
mbutrovich
reviewed
Apr 13, 2026
mbutrovich
reviewed
Apr 13, 2026
mbutrovich
approved these changes
Apr 13, 2026
Contributor
mbutrovich
left a comment
There was a problem hiding this comment.
Minor nits, thanks @andygrove!
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #3921
Rationale for this change
When Comet executes a shuffle, it creates two native execution contexts that run concurrently within the same Spark task (e.g. one for pre-shuffle operators and one for the shuffle writer). Previously, each context created its own memory pool with the full per-task memory limit, effectively allowing 2x the intended memory to be consumed. This is a performance bug — not a correctness issue — but it causes significantly higher memory usage than expected, leading to OOM errors that can only be worked around by over-provisioning memory.
What changes are included in this PR?
fair_unifiedandgreedy_unifiedmemory pools task-shared, so a single pool instance is reused across all native execution contexts within the same Spark task. This uses the sameTASK_SHARED_MEMORY_POOLSmechanism that the on-heapgreedy_task_sharedandfair_spill_task_sharedpools already use.total_reserved_for_thread()andunregister_and_total()double-counted memory when multiple execution contexts shared the same poolArc. Deduplicates byArcdata pointer before summingreserved().No new configuration options are added. The existing
fair_unified(default) andgreedy_unifiedpool names are unchanged. No functional change — queries produce the same results, but memory usage is now correctly bounded.How are these changes tested?
With this change I was able to run TPC-H and TPC-DS @ 1TB with just 8GB off-heap memory. Previously I was seeing OOM at 16 GB.