From 5f38e2e801aedfef85172707fce1c7eff4f1000d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 11 Feb 2026 08:26:45 -0700 Subject: [PATCH 1/2] chore: Add memory reservation debug logging Add a new `spark.comet.debug.memory` config that wraps the native memory pool in a LoggingPool decorator, logging all grow/shrink/try_grow calls with task ID and consumer name. This helps diagnose memory reservation issues in production environments. Co-Authored-By: Claude Opus 4.6 --- .../scala/org/apache/comet/CometConf.scala | 10 ++ docs/source/contributor-guide/debugging.md | 28 ++++- native/core/src/execution/jni_api.rs | 12 ++- .../execution/memory_pools/logging_pool.rs | 102 ++++++++++++++++++ native/core/src/execution/memory_pools/mod.rs | 1 + native/core/src/execution/spark_config.rs | 1 + 6 files changed, 151 insertions(+), 3 deletions(-) create mode 100644 native/core/src/execution/memory_pools/logging_pool.rs diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 522ccbc94c..5ab81ba8dd 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -54,6 +54,9 @@ object CometConf extends ShimCometConf { private val TRACING_GUIDE = "For more information, refer to the Comet Tracing " + "Guide (https://datafusion.apache.org/comet/contributor-guide/tracing.html)" + private val DEBUGGING_GUIDE = "For more information, refer to the Comet Debugging " + + "Guide (https://datafusion.apache.org/comet/contributor-guide/debugging.html)" + /** List of all configs that is used for generating documentation */ val allConfs = new ListBuffer[ConfigEntry[_]] @@ -549,6 +552,13 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) + val COMET_DEBUG_MEMORY_ENABLED: ConfigEntry[Boolean] = + conf(s"$COMET_PREFIX.debug.memory") + .category(CATEGORY_TESTING) + .doc(s"When enabled, log all native memory pool interactions. $DEBUGGING_GUIDE.") + .booleanConf + .createWithDefault(false) + val COMET_EXTENDED_EXPLAIN_FORMAT_VERBOSE = "verbose" val COMET_EXTENDED_EXPLAIN_FORMAT_FALLBACK = "fallback" diff --git a/docs/source/contributor-guide/debugging.md b/docs/source/contributor-guide/debugging.md index db5bdfc593..1d2447d2e8 100644 --- a/docs/source/contributor-guide/debugging.md +++ b/docs/source/contributor-guide/debugging.md @@ -127,7 +127,7 @@ To build Comet with this feature enabled: make release COMET_FEATURES=backtrace ``` -Start Comet with `RUST_BACKTRACE=1` +Set `RUST_BACKTRACE=1` for the Spark worker/executor process, or for `spark-submit` if running in local mode. ```console RUST_BACKTRACE=1 $SPARK_HOME/spark-shell --jars spark/target/comet-spark-spark3.5_2.12-$COMET_VERSION.jar --conf spark.plugins=org.apache.spark.CometPlugin --conf spark.comet.enabled=true --conf spark.comet.exec.enabled=true @@ -188,3 +188,29 @@ This produces output like the following: Additionally, you can place a `log4rs.yaml` configuration file inside the Comet configuration directory specified by the `COMET_CONF_DIR` environment variable to enable more advanced logging configurations. This file uses the [log4rs YAML configuration format](https://docs.rs/log4rs/latest/log4rs/#configuration-via-a-yaml-file). For example, see: [log4rs.yaml](https://github.com/apache/datafusion-comet/blob/main/conf/log4rs.yaml). + +### Debugging Memory Reservations + +Set `spark.comet.debug.memory=true` to log all calls that grow or shrink memory reservations. + +Example log output: + +``` +[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256232960) returning Ok +[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256375168) returning Ok +[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256899456) returning Ok +[Task 486] MemoryPool[ExternalSorter[6]].try_grow(257296128) returning Ok +[Task 486] MemoryPool[ExternalSorter[6]].try_grow(257820416) returning Err +[Task 486] MemoryPool[ExternalSorterMerge[6]].shrink(10485760) +[Task 486] MemoryPool[ExternalSorter[6]].shrink(150464) +[Task 486] MemoryPool[ExternalSorter[6]].shrink(146688) +[Task 486] MemoryPool[ExternalSorter[6]].shrink(137856) +[Task 486] MemoryPool[ExternalSorter[6]].shrink(141952) +[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok +[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok +[Task 486] MemoryPool[ExternalSorter[6]].shrink(524288) +[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok +[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(68928) returning Ok +``` + +When backtraces are enabled (see earlier section) then backtraces will be included for failed allocations. diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index b1e48828f7..ea2ebad870 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -82,9 +82,10 @@ use crate::execution::spark_plan::SparkPlan; use crate::execution::tracing::{log_memory_usage, trace_begin, trace_end, with_trace}; +use crate::execution::memory_pools::logging_pool::LoggingPool; use crate::execution::spark_config::{ - SparkConfig, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_MAX_TEMP_DIRECTORY_SIZE, - COMET_TRACING_ENABLED, + SparkConfig, COMET_DEBUG_ENABLED, COMET_DEBUG_MEMORY, COMET_EXPLAIN_NATIVE_ENABLED, + COMET_MAX_TEMP_DIRECTORY_SIZE, COMET_TRACING_ENABLED, }; use crate::parquet::encryption_support::{CometEncryptionFactory, ENCRYPTION_FACTORY_ID}; use datafusion_comet_proto::spark_operator::operator::OpStruct; @@ -193,6 +194,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( let tracing_enabled = spark_config.get_bool(COMET_TRACING_ENABLED); let max_temp_directory_size = spark_config.get_u64(COMET_MAX_TEMP_DIRECTORY_SIZE, 100 * 1024 * 1024 * 1024); + let logging_memory_pool = spark_config.get_bool(COMET_DEBUG_MEMORY); with_trace("createPlan", tracing_enabled, || { // Init JVM classes @@ -229,6 +231,12 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( let memory_pool = create_memory_pool(&memory_pool_config, task_memory_manager, task_attempt_id); + let memory_pool = if logging_memory_pool { + Arc::new(LoggingPool::new(task_attempt_id as u64, memory_pool)) + } else { + memory_pool + }; + // Get local directories for storing spill files let num_local_dirs = env.get_array_length(&local_dirs)?; let mut local_dirs_vec = vec![]; diff --git a/native/core/src/execution/memory_pools/logging_pool.rs b/native/core/src/execution/memory_pools/logging_pool.rs new file mode 100644 index 0000000000..8cd2ca6e55 --- /dev/null +++ b/native/core/src/execution/memory_pools/logging_pool.rs @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::execution::memory_pool::{ + MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation, +}; +use log::info; +use std::sync::Arc; + +#[derive(Debug)] +pub(crate) struct LoggingPool { + task_attempt_id: u64, + pool: Arc, +} + +impl LoggingPool { + pub fn new(task_attempt_id: u64, pool: Arc) -> Self { + Self { + task_attempt_id, + pool, + } + } +} + +impl MemoryPool for LoggingPool { + fn register(&self, consumer: &MemoryConsumer) { + self.pool.register(consumer) + } + + fn unregister(&self, consumer: &MemoryConsumer) { + self.pool.unregister(consumer) + } + + fn grow(&self, reservation: &MemoryReservation, additional: usize) { + info!( + "[Task {}] MemoryPool[{}].grow({})", + self.task_attempt_id, + reservation.consumer().name(), + additional + ); + self.pool.grow(reservation, additional); + } + + fn shrink(&self, reservation: &MemoryReservation, shrink: usize) { + info!( + "[Task {}] MemoryPool[{}].shrink({})", + self.task_attempt_id, + reservation.consumer().name(), + shrink + ); + self.pool.shrink(reservation, shrink); + } + + fn try_grow( + &self, + reservation: &MemoryReservation, + additional: usize, + ) -> datafusion::common::Result<()> { + match self.pool.try_grow(reservation, additional) { + Ok(_) => { + info!( + "[Task {}] MemoryPool[{}].try_grow({}) returning Ok", + self.task_attempt_id, + reservation.consumer().name(), + additional + ); + Ok(()) + } + Err(e) => { + info!( + "[Task {}] MemoryPool[{}].try_grow({}) returning Err: {e:?}", + self.task_attempt_id, + reservation.consumer().name(), + additional + ); + Err(e) + } + } + } + + fn reserved(&self) -> usize { + self.pool.reserved() + } + + fn memory_limit(&self) -> MemoryLimit { + self.pool.memory_limit() + } +} diff --git a/native/core/src/execution/memory_pools/mod.rs b/native/core/src/execution/memory_pools/mod.rs index fc6a81a5e5..d8b3473353 100644 --- a/native/core/src/execution/memory_pools/mod.rs +++ b/native/core/src/execution/memory_pools/mod.rs @@ -17,6 +17,7 @@ mod config; mod fair_pool; +pub mod logging_pool; mod task_shared; mod unified_pool; diff --git a/native/core/src/execution/spark_config.rs b/native/core/src/execution/spark_config.rs index 60ebb2ff8b..b257a5ba68 100644 --- a/native/core/src/execution/spark_config.rs +++ b/native/core/src/execution/spark_config.rs @@ -21,6 +21,7 @@ pub(crate) const COMET_TRACING_ENABLED: &str = "spark.comet.tracing.enabled"; pub(crate) const COMET_DEBUG_ENABLED: &str = "spark.comet.debug.enabled"; pub(crate) const COMET_EXPLAIN_NATIVE_ENABLED: &str = "spark.comet.explain.native.enabled"; pub(crate) const COMET_MAX_TEMP_DIRECTORY_SIZE: &str = "spark.comet.maxTempDirectorySize"; +pub(crate) const COMET_DEBUG_MEMORY: &str = "spark.comet.debug.memory"; pub(crate) trait SparkConfig { fn get_bool(&self, name: &str) -> bool; From 2646c8a4a762d0f27b503bde17ff9b365f2d8029 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 11 Feb 2026 08:32:09 -0700 Subject: [PATCH 2/2] improve --- .../src/execution/memory_pools/logging_pool.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/native/core/src/execution/memory_pools/logging_pool.rs b/native/core/src/execution/memory_pools/logging_pool.rs index 8cd2ca6e55..12a6b7c93f 100644 --- a/native/core/src/execution/memory_pools/logging_pool.rs +++ b/native/core/src/execution/memory_pools/logging_pool.rs @@ -18,7 +18,7 @@ use datafusion::execution::memory_pool::{ MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation, }; -use log::info; +use log::{info, warn}; use std::sync::Arc; #[derive(Debug)] @@ -38,10 +38,20 @@ impl LoggingPool { impl MemoryPool for LoggingPool { fn register(&self, consumer: &MemoryConsumer) { + info!( + "[Task {}] MemoryPool[{}].register()", + self.task_attempt_id, + consumer.name(), + ); self.pool.register(consumer) } fn unregister(&self, consumer: &MemoryConsumer) { + info!( + "[Task {}] MemoryPool[{}].unregister()", + self.task_attempt_id, + consumer.name(), + ); self.pool.unregister(consumer) } @@ -81,7 +91,7 @@ impl MemoryPool for LoggingPool { Ok(()) } Err(e) => { - info!( + warn!( "[Task {}] MemoryPool[{}].try_grow({}) returning Err: {e:?}", self.task_attempt_id, reservation.consumer().name(),