Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ object CometConf extends ShimCometConf {
private val TRACING_GUIDE = "For more information, refer to the Comet Tracing " +
"Guide (https://datafusion.apache.org/comet/contributor-guide/tracing.html)"

private val DEBUGGING_GUIDE = "For more information, refer to the Comet Debugging " +
"Guide (https://datafusion.apache.org/comet/contributor-guide/debugging.html)"

/** List of all configs that is used for generating documentation */
val allConfs = new ListBuffer[ConfigEntry[_]]

Expand Down Expand Up @@ -549,6 +552,13 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_DEBUG_MEMORY_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_PREFIX.debug.memory")
.category(CATEGORY_TESTING)
.doc(s"When enabled, log all native memory pool interactions. $DEBUGGING_GUIDE.")
.booleanConf
.createWithDefault(false)

val COMET_EXTENDED_EXPLAIN_FORMAT_VERBOSE = "verbose"
val COMET_EXTENDED_EXPLAIN_FORMAT_FALLBACK = "fallback"

Expand Down
28 changes: 27 additions & 1 deletion docs/source/contributor-guide/debugging.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ To build Comet with this feature enabled:
make release COMET_FEATURES=backtrace
```

Start Comet with `RUST_BACKTRACE=1`
Set `RUST_BACKTRACE=1` for the Spark worker/executor process, or for `spark-submit` if running in local mode.

```console
RUST_BACKTRACE=1 $SPARK_HOME/spark-shell --jars spark/target/comet-spark-spark3.5_2.12-$COMET_VERSION.jar --conf spark.plugins=org.apache.spark.CometPlugin --conf spark.comet.enabled=true --conf spark.comet.exec.enabled=true
Expand Down Expand Up @@ -188,3 +188,29 @@ This produces output like the following:

Additionally, you can place a `log4rs.yaml` configuration file inside the Comet configuration directory specified by the `COMET_CONF_DIR` environment variable to enable more advanced logging configurations. This file uses the [log4rs YAML configuration format](https://docs.rs/log4rs/latest/log4rs/#configuration-via-a-yaml-file).
For example, see: [log4rs.yaml](https://github.com/apache/datafusion-comet/blob/main/conf/log4rs.yaml).

### Debugging Memory Reservations

Set `spark.comet.debug.memory=true` to log all calls that grow or shrink memory reservations.

Example log output:

```
[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256232960) returning Ok
[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256375168) returning Ok
[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256899456) returning Ok
[Task 486] MemoryPool[ExternalSorter[6]].try_grow(257296128) returning Ok
[Task 486] MemoryPool[ExternalSorter[6]].try_grow(257820416) returning Err
[Task 486] MemoryPool[ExternalSorterMerge[6]].shrink(10485760)
[Task 486] MemoryPool[ExternalSorter[6]].shrink(150464)
[Task 486] MemoryPool[ExternalSorter[6]].shrink(146688)
[Task 486] MemoryPool[ExternalSorter[6]].shrink(137856)
[Task 486] MemoryPool[ExternalSorter[6]].shrink(141952)
[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
[Task 486] MemoryPool[ExternalSorter[6]].shrink(524288)
[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(68928) returning Ok
```

When backtraces are enabled (see earlier section) then backtraces will be included for failed allocations.
12 changes: 10 additions & 2 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ use crate::execution::spark_plan::SparkPlan;

use crate::execution::tracing::{log_memory_usage, trace_begin, trace_end, with_trace};

use crate::execution::memory_pools::logging_pool::LoggingPool;
use crate::execution::spark_config::{
SparkConfig, COMET_DEBUG_ENABLED, COMET_EXPLAIN_NATIVE_ENABLED, COMET_MAX_TEMP_DIRECTORY_SIZE,
COMET_TRACING_ENABLED,
SparkConfig, COMET_DEBUG_ENABLED, COMET_DEBUG_MEMORY, COMET_EXPLAIN_NATIVE_ENABLED,
COMET_MAX_TEMP_DIRECTORY_SIZE, COMET_TRACING_ENABLED,
};
use crate::parquet::encryption_support::{CometEncryptionFactory, ENCRYPTION_FACTORY_ID};
use datafusion_comet_proto::spark_operator::operator::OpStruct;
Expand Down Expand Up @@ -193,6 +194,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
let tracing_enabled = spark_config.get_bool(COMET_TRACING_ENABLED);
let max_temp_directory_size =
spark_config.get_u64(COMET_MAX_TEMP_DIRECTORY_SIZE, 100 * 1024 * 1024 * 1024);
let logging_memory_pool = spark_config.get_bool(COMET_DEBUG_MEMORY);

with_trace("createPlan", tracing_enabled, || {
// Init JVM classes
Expand Down Expand Up @@ -229,6 +231,12 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
let memory_pool =
create_memory_pool(&memory_pool_config, task_memory_manager, task_attempt_id);

let memory_pool = if logging_memory_pool {
Arc::new(LoggingPool::new(task_attempt_id as u64, memory_pool))
} else {
memory_pool
};

// Get local directories for storing spill files
let num_local_dirs = env.get_array_length(&local_dirs)?;
let mut local_dirs_vec = vec![];
Expand Down
112 changes: 112 additions & 0 deletions native/core/src/execution/memory_pools/logging_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::execution::memory_pool::{
MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
};
use log::{info, warn};
use std::sync::Arc;

#[derive(Debug)]
pub(crate) struct LoggingPool {
task_attempt_id: u64,
pool: Arc<dyn MemoryPool>,
}

impl LoggingPool {
pub fn new(task_attempt_id: u64, pool: Arc<dyn MemoryPool>) -> Self {
Self {
task_attempt_id,
pool,
}
}
}

impl MemoryPool for LoggingPool {
fn register(&self, consumer: &MemoryConsumer) {
info!(
"[Task {}] MemoryPool[{}].register()",
self.task_attempt_id,
consumer.name(),
);
self.pool.register(consumer)
}

fn unregister(&self, consumer: &MemoryConsumer) {
info!(
"[Task {}] MemoryPool[{}].unregister()",
self.task_attempt_id,
consumer.name(),
);
self.pool.unregister(consumer)
}

fn grow(&self, reservation: &MemoryReservation, additional: usize) {
info!(
"[Task {}] MemoryPool[{}].grow({})",
self.task_attempt_id,
reservation.consumer().name(),
additional
);
self.pool.grow(reservation, additional);
}

fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
info!(
"[Task {}] MemoryPool[{}].shrink({})",
self.task_attempt_id,
reservation.consumer().name(),
shrink
);
self.pool.shrink(reservation, shrink);
}

fn try_grow(
&self,
reservation: &MemoryReservation,
additional: usize,
) -> datafusion::common::Result<()> {
match self.pool.try_grow(reservation, additional) {
Ok(_) => {
info!(
"[Task {}] MemoryPool[{}].try_grow({}) returning Ok",
self.task_attempt_id,
reservation.consumer().name(),
additional
);
Ok(())
}
Err(e) => {
warn!(
"[Task {}] MemoryPool[{}].try_grow({}) returning Err: {e:?}",
self.task_attempt_id,
reservation.consumer().name(),
additional
);
Err(e)
}
}
}

fn reserved(&self) -> usize {
self.pool.reserved()
}

fn memory_limit(&self) -> MemoryLimit {
self.pool.memory_limit()
}
}
1 change: 1 addition & 0 deletions native/core/src/execution/memory_pools/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

mod config;
mod fair_pool;
pub mod logging_pool;
mod task_shared;
mod unified_pool;

Expand Down
1 change: 1 addition & 0 deletions native/core/src/execution/spark_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub(crate) const COMET_TRACING_ENABLED: &str = "spark.comet.tracing.enabled";
pub(crate) const COMET_DEBUG_ENABLED: &str = "spark.comet.debug.enabled";
pub(crate) const COMET_EXPLAIN_NATIVE_ENABLED: &str = "spark.comet.explain.native.enabled";
pub(crate) const COMET_MAX_TEMP_DIRECTORY_SIZE: &str = "spark.comet.maxTempDirectorySize";
pub(crate) const COMET_DEBUG_MEMORY: &str = "spark.comet.debug.memory";

pub(crate) trait SparkConfig {
fn get_bool(&self, name: &str) -> bool;
Expand Down
Loading