Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.internal.config

/**
* Defines how a configuration value is bound when used within SQL views, UDFs, or procedures.
*
* This enum controls whether a config value propagates from the active session or uses the value
* saved during view/UDF/procedure creation. If the policy is PERSISTED, but there is no saved
* value, a Spark default value is used.
*
* This is particularly important for configs that affect query behavior and where views/UDFs/
* procedures should change their behavior based on the caller's session settings. If the policy
* is PERSISTED, session-level config changes will not apply to views/UDFs/procedures but only
* to outer queries. In order for session-level changes to propagate correctly, this value must
* be explicitly set to SESSION.
*/
object ConfigBindingPolicy extends Enumeration {
type ConfigBindingPolicy = Value

/**
* The config value propagates from the active session to views/UDFs/procedures.
* This is important for queries that should have uniform behavior across the entire query.
*/
val SESSION: Value = Value("SESSION")

/**
* The config uses the value saved on view/UDF/procedure creation if it exists,
* or Spark default value for that config if it doesn't.
*/
val PERSISTED: Value = Value("PERSISTED")

/**
* The config does not apply to views/UDFs/procedures. If this config is accessed during
* view/UDF/procedure resolution, the value will be read from the active session (same as
* [[SESSION]]).
*/
val NOT_APPLICABLE: Value = Value("NOT_APPLICABLE")
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ private[spark] class TypedConfigBuilder[T](
def createOptional: OptionalConfigEntry[T] = {
val entry = new OptionalConfigEntry[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, converter, stringConverter, parent._doc,
parent._public, parent._version)
parent._public, parent._version, parent._bindingPolicy)
parent._onCreate.foreach(_(entry))
entry
}
Expand All @@ -227,7 +227,8 @@ private[spark] class TypedConfigBuilder[T](
val transformedDefault = converter(stringConverter(default))
val entry = new ConfigEntryWithDefault[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, transformedDefault, converter,
stringConverter, parent._doc, parent._public, parent._version)
stringConverter, parent._doc, parent._public, parent._version,
parent._bindingPolicy)
parent._onCreate.foreach(_ (entry))
entry
}
Expand All @@ -237,7 +238,7 @@ private[spark] class TypedConfigBuilder[T](
def createWithDefaultFunction(defaultFunc: () => T): ConfigEntry[T] = {
val entry = new ConfigEntryWithDefaultFunction[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, defaultFunc, converter, stringConverter,
parent._doc, parent._public, parent._version)
parent._doc, parent._public, parent._version, parent._bindingPolicy)
parent._onCreate.foreach(_ (entry))
entry
}
Expand All @@ -249,7 +250,7 @@ private[spark] class TypedConfigBuilder[T](
def createWithDefaultString(default: String): ConfigEntry[T] = {
val entry = new ConfigEntryWithDefaultString[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, default, converter, stringConverter,
parent._doc, parent._public, parent._version)
parent._doc, parent._public, parent._version, parent._bindingPolicy)
parent._onCreate.foreach(_(entry))
entry
}
Expand All @@ -272,6 +273,27 @@ private[spark] case class ConfigBuilder(key: String) {
private[config] var _version = ""
private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None
private[config] var _alternatives = List.empty[String]
private[config] var _bindingPolicy: Option[ConfigBindingPolicy.Value] = None

/**
* Sets the binding policy for how this config value behaves within SQL views, UDFs, or
* procedures.
*
* - [[ConfigBindingPolicy.SESSION]]: The config value propagates from the active session
* to views/UDFs/procedures. This is important for queries that should have uniform behavior
* across the entire query.
*
* - [[ConfigBindingPolicy.PERSISTED]]: The view/UDF/procedure will use the value saved on
* view/UDF/procedure creation if it exists, or Spark default value for that config if it
* doesn't.
*
* - [[ConfigBindingPolicy.NOT_APPLICABLE]]: The config does not interact with view/UDF/procedure
* resolution. If accessed at runtime, it behaves the same as [[ConfigBindingPolicy.SESSION]].
*/
def withBindingPolicy(policy: ConfigBindingPolicy.Value): ConfigBuilder = {
_bindingPolicy = Some(policy)
this
}

def internal(): ConfigBuilder = {
_public = false
Expand Down Expand Up @@ -354,7 +376,7 @@ private[spark] case class ConfigBuilder(key: String) {

def fallbackConf[T](fallback: ConfigEntry[T]): ConfigEntry[T] = {
val entry = new FallbackConfigEntry(key, _prependedKey, _prependSeparator, _alternatives, _doc,
_public, _version, fallback)
_public, _version, _bindingPolicy, fallback)
_onCreate.foreach(_(entry))
entry
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ private[spark] abstract class ConfigEntry[T] (
val stringConverter: T => String,
val doc: String,
val isPublic: Boolean,
val version: String) {
val version: String,
val bindingPolicy: Option[ConfigBindingPolicy.Value] = None) {

import ConfigEntry._

Expand All @@ -106,7 +107,7 @@ private[spark] abstract class ConfigEntry[T] (

override def toString: String = {
s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, " +
s"public=$isPublic, version=$version)"
s"public=$isPublic, version=$version, bindingPolicy=$bindingPolicy)"
}
}

Expand All @@ -120,7 +121,8 @@ private class ConfigEntryWithDefault[T] (
stringConverter: T => String,
doc: String,
isPublic: Boolean,
version: String)
version: String,
bindingPolicy: Option[ConfigBindingPolicy.Value] = None)
extends ConfigEntry(
key,
prependedKey,
Expand All @@ -130,7 +132,8 @@ private class ConfigEntryWithDefault[T] (
stringConverter,
doc,
isPublic,
version
version,
bindingPolicy
) {

override def defaultValue: Option[T] = Some(_defaultValue)
Expand All @@ -152,7 +155,8 @@ private class ConfigEntryWithDefaultFunction[T] (
stringConverter: T => String,
doc: String,
isPublic: Boolean,
version: String)
version: String,
bindingPolicy: Option[ConfigBindingPolicy.Value] = None)
extends ConfigEntry(
key,
prependedKey,
Expand All @@ -162,7 +166,8 @@ private class ConfigEntryWithDefaultFunction[T] (
stringConverter,
doc,
isPublic,
version
version,
bindingPolicy
) {

override def defaultValue: Option[T] = Some(_defaultFunction())
Expand All @@ -184,7 +189,8 @@ private class ConfigEntryWithDefaultString[T] (
stringConverter: T => String,
doc: String,
isPublic: Boolean,
version: String)
version: String,
bindingPolicy: Option[ConfigBindingPolicy.Value] = None)
extends ConfigEntry(
key,
prependedKey,
Expand All @@ -194,7 +200,8 @@ private class ConfigEntryWithDefaultString[T] (
stringConverter,
doc,
isPublic,
version
version,
bindingPolicy
) {

override def defaultValue: Option[T] = Some(valueConverter(_defaultValue))
Expand All @@ -220,7 +227,8 @@ private[spark] class OptionalConfigEntry[T](
val rawStringConverter: T => String,
doc: String,
isPublic: Boolean,
version: String)
version: String,
bindingPolicy: Option[ConfigBindingPolicy.Value] = None)
extends ConfigEntry[Option[T]](
key,
prependedKey,
Expand All @@ -230,7 +238,8 @@ private[spark] class OptionalConfigEntry[T](
v => v.map(rawStringConverter).orNull,
doc,
isPublic,
version
version,
bindingPolicy
) {

override def defaultValueString: String = ConfigEntry.UNDEFINED
Expand All @@ -251,6 +260,7 @@ private[spark] class FallbackConfigEntry[T] (
doc: String,
isPublic: Boolean,
version: String,
bindingPolicy: Option[ConfigBindingPolicy.Value] = None,
val fallback: ConfigEntry[T])
extends ConfigEntry[T](
key,
Expand All @@ -261,7 +271,8 @@ private[spark] class FallbackConfigEntry[T] (
fallback.stringConverter,
doc,
isPublic,
version
version,
bindingPolicy
) {

override def defaultValueString: String = s"<value of ${fallback.key}>"
Expand All @@ -285,4 +296,6 @@ private[spark] object ConfigEntry {

def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key)

def listAllEntries(): java.util.Collection[ConfigEntry[_]] = knownConfigs.values()

}
1 change: 1 addition & 0 deletions dev/.rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ vote.tmpl
GangliaReporter.java
application_1578436911597_0052
config.properties
configs-without-binding-policy-exceptions
local-1596020211915
app-20200706201101-0003
application_1628109047826_1317105
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.jdk.CollectionConverters._
import scala.util.{Failure, Random, Success, Try}

import org.apache.spark.{SparkException, SparkThrowable, SparkUnsupportedOperationException}
import org.apache.spark.internal.config.ConfigBindingPolicy
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis.resolver.{
Expand Down Expand Up @@ -239,29 +240,23 @@ object AnalysisContext {
}

object Analyzer {
// List of configurations that should be passed on when resolving views and SQL UDF.
private val RETAINED_ANALYSIS_FLAGS = Seq(
"spark.sql.view.schemaEvolution.preserveUserComments",
// retainedHiveConfigs
// TODO: remove these Hive-related configs after the `RelationConversions` is moved to
// optimization phase.
"spark.sql.hive.convertMetastoreParquet",
"spark.sql.hive.convertMetastoreOrc",
"spark.sql.hive.convertInsertingPartitionedTable",
"spark.sql.hive.convertInsertingUnpartitionedTable",
"spark.sql.hive.convertMetastoreCtas",
// retainedLoggingConfigs
"spark.sql.planChangeLog.level",
"spark.sql.expressionTreeChangeLog.level"
)

// Configs with bindingPolicy SESSION or NOT_APPLICABLE are retained when resolving views and
// SQL UDFs, so that their values propagate from the active session rather than falling back to
// Spark defaults. Note: configs defined in lazily-loaded modules (e.g., sql/hive) will only
// be included if their holding Scala object has been initialized before this set is computed.
def retainResolutionConfigsForAnalysis(
newConf: SQLConf,
existingConf: SQLConf,
createSparkVersion: String = ""): Unit = {
val retainedConfigKeys = SQLConf.getConfigEntries().asScala
.filter(entry =>
entry.bindingPolicy.contains(ConfigBindingPolicy.SESSION) ||
entry.bindingPolicy.contains(ConfigBindingPolicy.NOT_APPLICABLE))
.map(_.key)
.toSet

val retainedConfigs = existingConf.getAllConfs.filter { case (key, _) =>
// Also apply catalog configs
RETAINED_ANALYSIS_FLAGS.contains(key) || key.startsWith("spark.sql.catalog.")
retainedConfigKeys.contains(key) || key.startsWith("spark.sql.catalog.")
}

retainedConfigs.foreach { case (k, v) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ object SQLConf {
sqlConfEntries.get(key)
}

private[internal] def getConfigEntries(): util.Collection[ConfigEntry[_]] = {
private[sql] def getConfigEntries(): util.Collection[ConfigEntry[_]] = {
sqlConfEntries.values()
}

Expand Down Expand Up @@ -544,6 +544,7 @@ object SQLConf {
s"plan after a rule or batch is applied. The value can be " +
s"${VALID_LOG_LEVELS.mkString(", ")}.")
.version("3.1.0")
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.enumConf(classOf[Level])
.createWithDefault(Level.TRACE)

Expand Down Expand Up @@ -577,6 +578,7 @@ object SQLConf {
"the resolved expression tree in the single-pass bottom-up Resolver. The value can be " +
s"${VALID_LOG_LEVELS.mkString(", ")}.")
.version("4.0.0")
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.enumConf(classOf[Level])
.createWithDefault(Level.TRACE)

Expand Down Expand Up @@ -2258,6 +2260,7 @@ object SQLConf {
"when the underlying table schema evolves. When disabled, view comments will be " +
"overwritten with table comments on every schema sync.")
.version("4.2.0")
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.booleanConf
.createWithDefault(true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys
import org.apache.spark.internal.config.ConfigBindingPolicy
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.classic.SQLContext
import org.apache.spark.sql.execution.command.DDLUtils
Expand Down Expand Up @@ -126,6 +127,7 @@ private[spark] object HiveUtils extends Logging {
.doc("When set to true, the built-in Parquet reader and writer are used to process " +
"parquet tables created by using the HiveQL syntax, instead of Hive serde.")
.version("1.1.1")
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.booleanConf
.createWithDefault(true)

Expand All @@ -142,6 +144,7 @@ private[spark] object HiveUtils extends Logging {
.doc("When set to true, the built-in ORC reader and writer are used to process " +
"ORC tables created by using the HiveQL syntax, instead of Hive serde.")
.version("2.0.0")
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.booleanConf
.createWithDefault(true)

Expand All @@ -152,6 +155,7 @@ private[spark] object HiveUtils extends Logging {
"to process inserting into partitioned ORC/Parquet tables created by using the HiveSQL " +
"syntax.")
.version("3.0.0")
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.booleanConf
.createWithDefault(true)

Expand All @@ -162,6 +166,7 @@ private[spark] object HiveUtils extends Logging {
"to process inserting into unpartitioned ORC/Parquet tables created by using the HiveSQL " +
"syntax.")
.version("4.0.0")
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.booleanConf
.createWithDefault(true)

Expand All @@ -171,6 +176,7 @@ private[spark] object HiveUtils extends Logging {
"`spark.sql.hive.convertMetastoreParquet` or `spark.sql.hive.convertMetastoreOrc` is " +
"enabled respectively for Parquet and ORC formats")
.version("3.0.0")
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.booleanConf
.createWithDefault(true)

Expand Down
Loading