Skip to content

fix: share unified memory pools across native execution contexts within a task#3924

Open
andygrove wants to merge 12 commits intoapache:mainfrom
andygrove:task-shared-unified-pool
Open

fix: share unified memory pools across native execution contexts within a task#3924
andygrove wants to merge 12 commits intoapache:mainfrom
andygrove:task-shared-unified-pool

Conversation

@andygrove
Copy link
Copy Markdown
Member

@andygrove andygrove commented Apr 10, 2026

Which issue does this PR close?

Closes #3921

Rationale for this change

When Comet executes a shuffle, it creates two native execution contexts that run concurrently within the same Spark task (e.g. one for pre-shuffle operators and one for the shuffle writer). Previously, each context created its own memory pool with the full per-task memory limit, effectively allowing 2x the intended memory to be consumed. This is a performance bug — not a correctness issue — but it causes significantly higher memory usage than expected, leading to OOM errors that can only be worked around by over-provisioning memory.

What changes are included in this PR?

  • Make fair_unified and greedy_unified memory pools task-shared, so a single pool instance is reused across all native execution contexts within the same Spark task. This uses the same TASK_SHARED_MEMORY_POOLS mechanism that the on-heap greedy_task_shared and fair_spill_task_shared pools already use.
  • Fix a tracing bug where total_reserved_for_thread() and unregister_and_total() double-counted memory when multiple execution contexts shared the same pool Arc. Deduplicates by Arc data pointer before summing reserved().
  • Update tuning guide to document that both pool types are shared across execution contexts.

No new configuration options are added. The existing fair_unified (default) and greedy_unified pool names are unchanged. No functional change — queries produce the same results, but memory usage is now correctly bounded.

How are these changes tested?

With this change I was able to run TPC-H and TPC-DS @ 1TB with just 8GB off-heap memory. Previously I was seeing OOM at 16 GB.

andygrove and others added 4 commits April 10, 2026 10:24
…ation

When Comet executes a shuffle, it creates two separate native plans (the
child plan and the shuffle writer plan) that run concurrently in a
pipelined fashion. Previously, each plan got its own memory pool at the
full per-task limit, effectively allowing 2x the intended memory to be
consumed.

The new `fair_unified_task_shared` pool type shares a single
CometFairMemoryPool across all native plans within the same Spark task.
This ensures the total memory stays within the per-task limit while
dynamically distributing memory among operators based on how many
register as memory consumers (e.g. if the child plan is a simple
scan+filter, the shuffle writer gets 100% of the pool).

This is now the default for off-heap mode.

Closes apache#3921

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
When using fair_unified_task_shared, multiple execution contexts on the
same thread share a single Arc<dyn MemoryPool>. The tracing code was
summing pool.reserved() for each registered context, double-counting
the shared pool and reporting 2x the actual memory reservation.

Deduplicate pools by Arc data pointer before summing so each underlying
pool is only counted once.
Make fair_unified_task_shared opt-in rather than the default to
simplify review. Update docs to reflect the new default.
@andygrove andygrove changed the title feat: add fair_unified_task_shared memory pool to fix 2x memory allocation [experimental] feat: add fair_unified_task_shared memory pool to fix 2x memory allocation Apr 11, 2026
Add context about how Comet creates two concurrent native plans per
Spark task during shuffle and why this matters for pool selection.
@andygrove andygrove marked this pull request as ready for review April 11, 2026 16:06
"The type of memory pool to be used for Comet native execution when running Spark in " +
"off-heap mode. Available pool types are `greedy_unified` and `fair_unified`. " +
"off-heap mode. Available pool types are `greedy_unified`, `fair_unified`, and " +
"`fair_unified_task_shared`. The `fair_unified_task_shared` pool is shared across " +
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read the description twice, it is difficult to understand without example what exactly fair_unified_task_shared helps to achieve

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Let me know if this is clearer now.


Comet implements multiple memory pool implementations. The type of pool can be specified with `spark.comet.exec.memoryPool`.

When Comet executes a shuffle, it creates two separate native plans within the same Spark task: the child plan
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it is not a plan, and rather a stage DAG?

plan is per operator: scan-> shuffle -> write
stage DAG is different, the plan above split into
Stage1: scan + shuffle write
Stage2: shuffle read + write

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is really DataFusion execution contexts. I updated.

let mut seen = std::collections::HashSet::new();
pools
.values()
.filter(|p| seen.insert(Arc::as_ptr(p) as *const () as usize))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

- Clarify terminology: replace "native plans" with "execution contexts"
  to avoid confusion with Spark plan/stage concepts
- Rewrite fair_unified_task_shared config description with concrete
  example of the 2x memory problem during shuffle
- Use filter_map with then() for pool deduplication
Add greedy_unified_task_shared pool type for completeness alongside
fair_unified_task_shared. Both _task_shared variants share a single
memory pool across all native execution contexts in the same Spark
task, preventing 2x memory consumption during shuffle.
@andygrove andygrove changed the title feat: add fair_unified_task_shared memory pool to fix 2x memory allocation feat: add task_shared variants of unified memory pools to fix 2x memory allocation Apr 13, 2026
Instead of adding new _task_shared pool variants, fix the existing
fair_unified and greedy_unified pools to share a single pool instance
across all native execution contexts within the same Spark task. This
fixes the bug where concurrent execution contexts (e.g. pre-shuffle
operators and shuffle writer) could each allocate up to the full
per-task memory limit independently.
@andygrove andygrove changed the title feat: add task_shared variants of unified memory pools to fix 2x memory allocation fix: share unified memory pools across native execution contexts within a task Apr 13, 2026
Copy link
Copy Markdown
Contributor

@mbutrovich mbutrovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nits, thanks @andygrove!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Comet allocates 2x available memory in memory pools for executePlan

3 participants