Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
0b17209
[SPARK-55869][SQL] Add SupportsPushDownPredicateCapabilities for ca…
schenksj Mar 7, 2026
1817d75
[SPARK-55869][SQL] Add custom predicate pushdown for DSv2 tables
schenksj Mar 7, 2026
6365d91
[SPARK-55869][SQL] Add CustomOperatorParserExtension for infix operat…
schenksj Mar 7, 2026
c642850
[SPARK-55869][SQL] Update Javadoc, add config switch, regression test…
schenksj Mar 7, 2026
5ca2380
[SPARK-55869][SQL] Address code review findings across all layers
schenksj Mar 7, 2026
61e970e
[SPARK-55869][SQL] Add Tier 2 capability-gated predicates
schenksj Mar 7, 2026
b32ade9
[SPARK-55869][SQL] Add parse tree verification for nested operator re…
schenksj Mar 7, 2026
75f5ae7
[SPARK-55869][SQL] Add end-to-end tests with source-side filter consu…
schenksj Mar 7, 2026
a10f1f4
[SPARK-55869][SQL] Add comprehensive gap-coverage tests for extended …
schenksj Mar 7, 2026
4a90536
[SPARK-55869][SQL] Address code review findings for extended predicat…
schenksj Mar 7, 2026
5491fe2
[SPARK-55869][SQL] Add equals/hashCode/toString, fix StackOverflow, a…
schenksj Mar 7, 2026
a56ec68
[SPARK-55869][SQL] Fix V2ExpressionSQLBuilder recursion, extract help…
schenksj Mar 8, 2026
aaa3ae3
[SPARK-55869][SQL] Fix visitUnexpectedExpr recursion and add backti…
schenksj Mar 8, 2026
825e678
remove non-ascii character for scala style requirements
schenksj Mar 8, 2026
457f522
[SPARK-55869][SQL] Replace non-ASCII characters to pass scalastyle ch…
schenksj Mar 8, 2026
e3dd668
[SPARK-55869][SQL] Pass escape character in LIKE/ILIKE V2 predicates
schenksj Mar 8, 2026
ee2b0d2
Merge branch 'apache:master' into feature/extended-predicate-pushdown
schenksj Mar 8, 2026
5cbbd22
Merge branch 'apache:master' into feature/extended-predicate-pushdown
schenksj Mar 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog;

import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.types.DataType;

/**
* Describes a custom predicate function that a data source supports for pushdown.
*
* <p>The canonical name must be dot-qualified to avoid collisions with Spark's built-in
* predicate names (e.g. "com.mycompany.INDEXQUERY", not just "INDEXQUERY"). This follows
* the same convention as
* {@link org.apache.spark.sql.connector.catalog.functions.BoundFunction#canonicalName()}.
*
* <p>Parameter types are advisory: the analyzer uses them for implicit cast hints but
* does not reject queries when types don't match exactly. The data source can reject
* incompatible predicates at {@code pushPredicates()} time, and Spark will fall back
* to post-scan evaluation.
*
* @since 4.1.0
*/
@Evolving
public class CustomPredicateDescriptor {
private final String canonicalName;
private final String sqlName;
private final DataType[] parameterTypes;
private final boolean isDeterministic;

/**
* Creates a new custom predicate descriptor.
*
* @param canonicalName Dot-qualified canonical name (e.g. "com.mycompany.INDEXQUERY").
* Must contain at least one '.' to enforce namespace qualification.
* This is the name used in the V2 Predicate wire format.
* @param sqlName The short name used in SQL syntax (e.g. "indexquery"). This is what
* users write in SQL queries. Case-insensitive.
* @param parameterTypes Expected parameter types for implicit cast hints.
* Null entries mean "any type" for that position.
* An empty array means any number of arguments of any type.
* @param isDeterministic Whether the function is deterministic (affects optimizer decisions)
*/
public CustomPredicateDescriptor(
String canonicalName,
String sqlName,
DataType[] parameterTypes,
boolean isDeterministic) {
if (canonicalName == null) {
throw new IllegalArgumentException("canonicalName must not be null");
}
if (sqlName == null) {
throw new IllegalArgumentException("sqlName must not be null");
}
if (!canonicalName.contains(".")) {
throw new IllegalArgumentException(
"Canonical name must be dot-qualified (e.g. 'com.mycompany.FUNC'), got: "
+ canonicalName);
}
this.canonicalName = canonicalName.toUpperCase(Locale.ROOT);
this.sqlName = sqlName.toUpperCase(Locale.ROOT);
this.parameterTypes = parameterTypes != null ? parameterTypes.clone() : null;
this.isDeterministic = isDeterministic;
}

/**
* Convenience constructor using the last segment of canonicalName as the sql name.
*/
public CustomPredicateDescriptor(
String canonicalName,
DataType[] parameterTypes,
boolean isDeterministic) {
this(canonicalName,
canonicalName != null
? canonicalName.substring(canonicalName.lastIndexOf('.') + 1)
: null,
parameterTypes, isDeterministic);
}

public String canonicalName() { return canonicalName; }
public String sqlName() { return sqlName; }
public DataType[] parameterTypes() {
return parameterTypes != null ? parameterTypes.clone() : null;
}
public boolean isDeterministic() { return isDeterministic; }

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof CustomPredicateDescriptor)) return false;
CustomPredicateDescriptor that = (CustomPredicateDescriptor) o;
return isDeterministic == that.isDeterministic
&& Objects.equals(canonicalName, that.canonicalName)
&& Objects.equals(sqlName, that.sqlName)
&& Arrays.equals(parameterTypes, that.parameterTypes);
}

@Override
public int hashCode() {
int result = Objects.hash(canonicalName, sqlName, isDeterministic);
result = 31 * result + Arrays.hashCode(parameterTypes);
return result;
}

@Override
public String toString() {
return "CustomPredicateDescriptor{"
+ "canonicalName='" + canonicalName + '\''
+ ", sqlName='" + sqlName + '\''
+ ", parameterTypes=" + Arrays.toString(parameterTypes)
+ ", isDeterministic=" + isDeterministic
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog;

import org.apache.spark.annotation.Evolving;

/**
* A mix-in interface for {@link Table} that declares custom predicate functions
* available for this table. These functions are registered into the function
* resolution scope during analysis when queries reference this table.
* <p>
* Custom predicate functions appear as regular functions in SQL and DataFrame APIs.
* During pushdown they are translated to V2 predicates whose name is the descriptor's
* canonical name (dot-qualified). The data source receives these predicates via
* {@link org.apache.spark.sql.connector.read.SupportsPushDownV2Filters#pushPredicates}.
* <p>
* If a custom predicate is not pushed down (e.g., the data source rejects it), the
* query will fail with a clear error before execution, since custom predicates cannot
* be evaluated by Spark.
*
* @since 4.1.0
*/
@Evolving
public interface SupportsCustomPredicates extends Table {

/**
* Returns custom predicate function descriptors for this table.
* These are resolved during analysis and pushed down during optimization.
*
* @return non-null array of custom predicate descriptors; may be empty
*/
CustomPredicateDescriptor[] customPredicates();
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,103 @@
* </li>
* </ol>
*
* <p>The following predicate names are <em>extended/opt-in</em> and are only translated when a
* data source declares support via
* {@link org.apache.spark.sql.connector.read.SupportsPushDownPredicateCapabilities}:
* <ol>
* <li>Name: <code>LIKE</code>
* <ul>
* <li>SQL semantic: <code>expr1 LIKE expr2</code> (full pattern matching)</li>
* <li>Children: <code>[expr, pattern, escapeChar]</code> where escapeChar is a
* single-character string literal (default <code>\</code>)</li>
* <li>Since version: 4.1.0</li>
* </ul>
* </li>
* <li>Name: <code>RLIKE</code>
* <ul>
* <li>SQL semantic: <code>expr1 RLIKE expr2</code> (regex matching)</li>
* <li>Since version: 4.1.0</li>
* </ul>
* </li>
* <li>Name: <code>ILIKE</code>
* <ul>
* <li>SQL semantic: <code>expr1 ILIKE expr2</code> (case-insensitive LIKE).
* Only translated when the pattern is a non-literal expression (e.g. a
* column reference). Literal patterns are constant-folded by the optimizer,
* which removes the <code>Lower()</code> wrappers needed for detection.</li>
* <li>Children: <code>[expr, pattern, escapeChar]</code> where escapeChar is a
* single-character string literal (default <code>\</code>)</li>
* <li>Since version: 4.1.0</li>
* </ul>
* </li>
* <li>Name: <code>IS_NAN</code>
* <ul>
* <li>SQL semantic: <code>isnan(expr)</code></li>
* <li>Since version: 4.1.0</li>
* </ul>
* </li>
* <li>Name: <code>ARRAY_CONTAINS</code>
* <ul>
* <li>SQL semantic: <code>array_contains(expr1, expr2)</code></li>
* <li>Since version: 4.1.0</li>
* </ul>
* </li>
* <li>Name: <code>MAP_CONTAINS_KEY</code>
* <ul>
* <li>SQL semantic: <code>map_contains_key(expr1, expr2)</code></li>
* <li>Since version: 4.1.0</li>
* </ul>
* </li>
* <li>Name: <code>ARRAYS_OVERLAP</code>
* <ul>
* <li>SQL semantic: <code>arrays_overlap(arr1, arr2)</code></li>
* <li>Since version: 4.1.0</li>
* </ul>
* </li>
* <li>Name: <code>LIKE_ALL</code>
* <ul>
* <li>SQL semantic: <code>expr LIKE ALL (p1, p2, ...)</code></li>
* <li>Children: <code>[expr, pattern1, pattern2, ...]</code></li>
* <li>Since version: 4.1.0</li>
* </ul>
* </li>
* <li>Name: <code>LIKE_ANY</code>
* <ul>
* <li>SQL semantic: <code>expr LIKE ANY (p1, p2, ...)</code></li>
* <li>Children: <code>[expr, pattern1, pattern2, ...]</code></li>
* <li>Since version: 4.1.0</li>
* </ul>
* </li>
* <li>Name: <code>NOT_LIKE_ALL</code>
* <ul>
* <li>SQL semantic: <code>expr NOT LIKE ALL (p1, p2, ...)</code></li>
* <li>Children: <code>[expr, pattern1, pattern2, ...]</code></li>
* <li>Since version: 4.1.0</li>
* </ul>
* </li>
* <li>Name: <code>NOT_LIKE_ANY</code>
* <ul>
* <li>SQL semantic: <code>expr NOT LIKE ANY (p1, p2, ...)</code></li>
* <li>Children: <code>[expr, pattern1, pattern2, ...]</code></li>
* <li>Since version: 4.1.0</li>
* </ul>
* </li>
* </ol>
*
* <p>Data sources may also declare <em>custom predicate functions</em> via
* {@link org.apache.spark.sql.connector.catalog.SupportsCustomPredicates}.
* Custom predicates use dot-qualified canonical names to avoid collisions
* with built-in predicate names. For example:
* <pre>{@code
* Predicate("com.mycompany.INDEXQUERY", [col_ref, literal])
* Predicate("com.mycompany.MY_SEARCH", [col1, col2])
* }</pre>
*
* <p>Custom predicate names must contain at least one '.' character.
* Spark's built-in names are unqualified upper-case tokens, so there is
* no collision risk. Data sources match on {@code predicate.name()}
* directly in their {@code pushPredicates()} implementation.
*
* @since 3.3.0
*/
@Evolving
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.read;

import java.util.Set;

import org.apache.spark.annotation.Evolving;

/**
* A mix-in interface for {@link ScanBuilder} that allows data sources to declare
* extended predicate capabilities beyond the default set. When a ScanBuilder
* implements this interface, the V2 expression translator will attempt to translate
* additional builtin Catalyst expressions that match the declared capabilities.
* <p>
* This interface works in conjunction with {@link SupportsPushDownV2Filters}.
* The capabilities declared here enable Spark to <em>translate</em> additional
* expressions to V2 predicates. The data source may still reject individual
* predicates at {@link SupportsPushDownV2Filters#pushPredicates} time.
* <p>
* Supported predicate names (upper-cased):
* <ul>
* <li>{@code "LIKE"} — full LIKE pattern matching</li>
* <li>{@code "RLIKE"} — regex matching</li>
* <li>{@code "ILIKE"} — case-insensitive LIKE (non-literal patterns only)</li>
* <li>{@code "IS_NAN"} — NaN check for numeric types</li>
* <li>{@code "ARRAY_CONTAINS"} — element membership in arrays</li>
* <li>{@code "MAP_CONTAINS_KEY"} — key existence in maps</li>
* <li>{@code "ARRAYS_OVERLAP"} — array intersection check</li>
* <li>{@code "LIKE_ALL"} — match all patterns</li>
* <li>{@code "LIKE_ANY"} — match any pattern</li>
* <li>{@code "NOT_LIKE_ALL"} — match no patterns (negation of LIKE_ALL)</li>
* <li>{@code "NOT_LIKE_ANY"} — not match any (negation of LIKE_ANY)</li>
* </ul>
*
* @since 4.1.0
*/
@Evolving
public interface SupportsPushDownPredicateCapabilities extends ScanBuilder {

/**
* Returns the set of additional predicate/expression names this data source
* supports for pushdown, beyond the default always-translated set.
* <p>
* Names should be upper-cased. The data source may still reject individual
* predicates at {@code pushPredicates()} time (e.g., if a particular RLIKE
* pattern is too complex). This method merely enables Spark to attempt
* translation.
*
* @return non-null set of upper-cased predicate/expression names; may be empty
*/
Set<String> supportedPredicateNames();
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,20 @@ yield visitBinaryArithmetic(
"COT", "ASIN", "ASINH", "ACOS", "ACOSH", "ATAN", "ATANH", "ATAN2", "CBRT", "DEGREES",
"RADIANS", "SIGN", "WIDTH_BUCKET", "SUBSTRING", "UPPER", "LOWER", "TRANSLATE",
"DATE_ADD", "DATE_DIFF", "TRUNC", "AES_ENCRYPT", "AES_DECRYPT", "SHA1", "SHA2", "MD5",
"CRC32", "BIT_LENGTH", "CHAR_LENGTH", "CONCAT", "RPAD", "LPAD" ->
"CRC32", "BIT_LENGTH", "CHAR_LENGTH", "CONCAT", "RPAD", "LPAD",
// Extended capability-gated predicates (SPARK-55869)
"LIKE", "RLIKE", "ILIKE", "IS_NAN", "ARRAY_CONTAINS", "MAP_CONTAINS_KEY",
"ARRAYS_OVERLAP", "LIKE_ALL", "LIKE_ANY", "NOT_LIKE_ALL", "NOT_LIKE_ANY" ->
visitSQLFunction(name, e.children());
case "CASE_WHEN" -> visitCaseWhen(expressionsToStringArray(e.children()));
case "TRIM" -> visitTrim("BOTH", expressionsToStringArray(e.children()));
case "LTRIM" -> visitTrim("LEADING", expressionsToStringArray(e.children()));
case "RTRIM" -> visitTrim("TRAILING", expressionsToStringArray(e.children()));
case "OVERLAY" -> visitOverlay(expressionsToStringArray(e.children()));
// TODO supports other expressions
default -> visitUnexpectedExpr(expr);
// Custom predicates use dot-qualified canonical names (e.g. "COM.MYCO.FUNC").
// Render those as SQL function calls; fail on other unknown names.
default -> name.contains(".") ?
visitSQLFunction(name, e.children()) : visitUnexpectedExpr(expr);
};
} else if (expr instanceof Min min) {
return visitAggregateFunction("MIN", false, min.children());
Expand Down Expand Up @@ -327,9 +332,13 @@ protected String visitUserDefinedAggregateFunction(
Map.of("class", this.getClass().getSimpleName(), "funcName", funcName));
}

// Use expr.getClass().getSimpleName() instead of String.valueOf(expr) to avoid
// infinite recursion: ExpressionWithToString.toString() calls build() which can
// reach visitUnexpectedExpr again for unknown expressions.
protected String visitUnexpectedExpr(Expression expr) throws IllegalArgumentException {
throw new SparkIllegalArgumentException(
"_LEGACY_ERROR_TEMP_3207", Map.of("expr", String.valueOf(expr)));
"_LEGACY_ERROR_TEMP_3207",
Map.of("expr", expr.getClass().getSimpleName()));
}

protected String visitOverlay(String[] inputs) {
Expand Down
Loading