Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
local_dirs_vec,
max_temp_directory_size,
task_cpus as usize,
&spark_config,
)?;

let plan_creation_time = start.elapsed();
Expand Down Expand Up @@ -300,6 +301,7 @@ fn prepare_datafusion_session_context(
local_dirs: Vec<String>,
max_temp_directory_size: u64,
task_cpus: usize,
spark_config: &HashMap<String, String>,
) -> CometResult<SessionContext> {
let paths = local_dirs.into_iter().map(PathBuf::from).collect();
let disk_manager = DiskManagerBuilder::default()
Expand All @@ -308,10 +310,7 @@ fn prepare_datafusion_session_context(
let mut rt_config = RuntimeEnvBuilder::new().with_disk_manager_builder(disk_manager);
rt_config = rt_config.with_memory_pool(memory_pool);

// Get Datafusion configuration from Spark Execution context
// can be configured in Comet Spark JVM using Spark --conf parameters
// e.g: spark-shell --conf spark.datafusion.sql_parser.parse_float_as_decimal=true
let session_config = SessionConfig::new()
let mut session_config = SessionConfig::new()
.with_target_partitions(task_cpus)
// This DataFusion context is within the scope of an executing Spark Task. We want to set
// its internal parallelism to the number of CPUs allocated to Spark Tasks. This can be
Expand All @@ -328,6 +327,17 @@ fn prepare_datafusion_session_context(
&ScalarValue::Float64(Some(1.1)),
);

// Pass through DataFusion configs from Spark.
// e.g: spark-shell --conf spark.comet.datafusion.sql_parser.parse_float_as_decimal=true
// becomes datafusion.sql_parser.parse_float_as_decimal=true
const SPARK_COMET_DF_PREFIX: &str = "spark.comet.datafusion.";
for (key, value) in spark_config {
if let Some(df_key) = key.strip_prefix(SPARK_COMET_DF_PREFIX) {
let df_key = format!("datafusion.{df_key}");
session_config = session_config.set_str(&df_key, value);
}
}

let runtime = rt_config.build()?;

let mut session_ctx = SessionContext::new_with_config_rt(session_config, Arc::new(runtime));
Expand Down
Loading