From 7feff85c90302e8679627400cbfd027d55676ce8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 9 Feb 2026 08:15:33 -0700 Subject: [PATCH] feat: pass spark.comet.datafusion.* configs through to DataFusion session Spark configs with the prefix spark.comet.datafusion.* are now passed through to DataFusion's SessionConfig. The prefix spark.comet. is stripped so that e.g. spark.comet.datafusion.sql_parser.parse_float_as_decimal becomes datafusion.sql_parser.parse_float_as_decimal in DataFusion. Co-Authored-By: Claude Opus 4.6 --- native/core/src/execution/jni_api.rs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index 146e0feb8e..a3ec08f36c 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -246,6 +246,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( local_dirs_vec, max_temp_directory_size, task_cpus as usize, + &spark_config, )?; let plan_creation_time = start.elapsed(); @@ -300,6 +301,7 @@ fn prepare_datafusion_session_context( local_dirs: Vec, max_temp_directory_size: u64, task_cpus: usize, + spark_config: &HashMap, ) -> CometResult { let paths = local_dirs.into_iter().map(PathBuf::from).collect(); let disk_manager = DiskManagerBuilder::default() @@ -308,10 +310,7 @@ fn prepare_datafusion_session_context( let mut rt_config = RuntimeEnvBuilder::new().with_disk_manager_builder(disk_manager); rt_config = rt_config.with_memory_pool(memory_pool); - // Get Datafusion configuration from Spark Execution context - // can be configured in Comet Spark JVM using Spark --conf parameters - // e.g: spark-shell --conf spark.datafusion.sql_parser.parse_float_as_decimal=true - let session_config = SessionConfig::new() + let mut session_config = SessionConfig::new() .with_target_partitions(task_cpus) // This DataFusion context is within the scope of an executing Spark Task. We want to set // its internal parallelism to the number of CPUs allocated to Spark Tasks. This can be @@ -328,6 +327,17 @@ fn prepare_datafusion_session_context( &ScalarValue::Float64(Some(1.1)), ); + // Pass through DataFusion configs from Spark. + // e.g: spark-shell --conf spark.comet.datafusion.sql_parser.parse_float_as_decimal=true + // becomes datafusion.sql_parser.parse_float_as_decimal=true + const SPARK_COMET_DF_PREFIX: &str = "spark.comet.datafusion."; + for (key, value) in spark_config { + if let Some(df_key) = key.strip_prefix(SPARK_COMET_DF_PREFIX) { + let df_key = format!("datafusion.{df_key}"); + session_config = session_config.set_str(&df_key, value); + } + } + let runtime = rt_config.build()?; let mut session_ctx = SessionContext::new_with_config_rt(session_config, Arc::new(runtime));