Skip to content

[SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2#54669

Open
schenksj wants to merge 18 commits intoapache:masterfrom
schenksj:feature/extended-predicate-pushdown
Open

[SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2#54669
schenksj wants to merge 18 commits intoapache:masterfrom
schenksj:feature/extended-predicate-pushdown

Conversation

@schenksj
Copy link

@schenksj schenksj commented Mar 7, 2026

[SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2

What changes were proposed in this pull request?

This PR extends Spark's DataSource V2 predicate pushdown framework with three layers of new functionality, all gated behind a single config switch (spark.sql.dataSource.extendedPredicatePushdown.enabled, default true).

Layer 1: Capability-Gated Builtin Predicate Translation

Data sources can now opt in to receiving additional builtin predicates by implementing SupportsPushDownPredicateCapabilities on their ScanBuilder. The interface declares a set of predicate names the source can handle:

  • LIKE — full pattern matching (expr1 LIKE expr2)
  • RLIKE — regex matching (expr1 RLIKE expr2)
  • ILIKE — case-insensitive LIKE (expr1 ILIKE expr2)
  • IS_NAN — NaN check (isnan(expr))
  • ARRAY_CONTAINS — array element check (array_contains(expr1, expr2))
  • MAP_CONTAINS_KEY — map key check (map_contains_key(expr1, expr2))
  • ARRAYS_OVERLAP — array intersection check (arrays_overlap(arr1, arr2))
  • LIKE_ALL — match all patterns (expr LIKE ALL (p1, p2, ...))
  • LIKE_ANY — match any pattern (expr LIKE ANY (p1, p2, ...))
  • NOT_LIKE_ALL — negated match-all (expr NOT LIKE ALL (p1, p2, ...))
  • NOT_LIKE_ANY — negated match-any (expr NOT LIKE ANY (p1, p2, ...))

V2ExpressionBuilder checks the declared capabilities before translating these expressions, so sources that don't declare support continue to see existing behavior.

Layer 2: Custom Predicate Functions via SupportsCustomPredicates

Tables can declare custom predicate functions by implementing SupportsCustomPredicates, which returns an array of CustomPredicateDescriptor objects. Each descriptor specifies:

  • canonicalName() — dot-qualified name (e.g. com.mycompany.MY_SEARCH) used in the V2 Predicate
  • sqlName() — the unqualified name users write in SQL (e.g. my_search)
  • parameterTypes() — optional expected parameter types (enables automatic casting)
  • isDeterministic() — whether the predicate is deterministic

Users write standard SQL function-call syntax: SELECT * FROM t WHERE my_search(col, 'param'). The analyzer resolves these against the table's descriptors and produces CustomPredicateExpression nodes, which V2ExpressionBuilder translates into V2 Predicate objects using the dot-qualified canonical name.

A post-optimizer rule (EnsureCustomPredicatesPushed) fails the query if any custom predicate remains unpushed, since Spark cannot evaluate them locally.

Layer 3: Custom Infix Operator Syntax via Parser Extensions

For data sources that want infix operator syntax (e.g. col INDEXQUERY 'param'), an abstract CustomOperatorParserExtension base class is provided. It rewrites infix expressions to function calls before the standard parser runs:

col INDEXQUERY 'param'  →  INDEXQUERY(col, 'param')

Data source authors extend CustomOperatorParserExtension, implement customOperators, and register via SparkSessionExtensions.injectParser.

How was this patch tested?

  • Unit tests in DataSourceV2StrategySuite:

    • 10 tests for CustomOperatorParserExtension (infix rewriting, case-insensitivity, string literal preservation, placeholder collision resistance, complex WHERE clauses, parseQuery delegation, etc.)
    • 11 tests for capability-gated predicate translation (LIKE, RLIKE, ILIKE, IS_NAN, ARRAY_CONTAINS, MAP_CONTAINS_KEY, ARRAYS_OVERLAP, LIKE_ALL, LIKE_ANY, NOT_LIKE_ALL, NOT_LIKE_ANY, plus MAP_CONTAINS_KEY precedence)
    • 4 tests for CustomPredicateDescriptor validation (null canonicalName, null sqlName, missing dot, convenience constructor)
    • Existing V2 predicate translation tests continue to pass
  • Integration tests in DataSourceV2Suite:

    • Custom predicate pushdown end-to-end (function resolved, translated, pushed to scan)
    • Custom predicates alongside standard predicates with data verification (checkAnswer)
    • Config disabled prevents custom predicate resolution (Layer 2 and Layer 1)
    • Without capability declaration, extended predicates are not pushed
    • Rejected custom predicate fails with clear error via EnsureCustomPredicatesPushed
    • Capability-gated predicate pushdown (RLIKE, ILIKE pushed only when declared)
    • CASE expression tests for all three layers
    • Layer 3 parser extension end-to-end with Layer 2
  • Regression suites -- all passing:

    • DataSourceV2Suite (112 tests)
    • DataSourceV2StrategySuite (82 tests)
    • DataSourceV2FunctionSuite (44 tests)
    • V2PredicateSuite (18 tests)
    • SparkSessionExtensionSuite (31 tests)
    • AnalysisSuite (82 tests)
    • JDBCV2Suite (79 tests)
  • Style checks: scalastyle (catalyst + core) and checkstyle (catalyst) all pass.

Was this patch authored or co-authored using generative AI tooling?

Yes.

New Files

File Module Description
SupportsPushDownPredicateCapabilities.java catalyst Interface on ScanBuilder declaring supported predicate names (Layer 1)
SupportsCustomPredicates.java catalyst Interface on Table declaring custom predicate descriptors (Layer 2)
CustomPredicateDescriptor.java catalyst Descriptor for a custom predicate function (Layer 2)
CustomPredicateExpression.scala catalyst Catalyst expression node for resolved custom predicates (Layer 2)
ResolveCustomPredicates.scala catalyst Analyzer rule resolving function calls against table descriptors (Layer 2)
EnsureCustomPredicatesPushed.scala catalyst Post-optimizer rule ensuring custom predicates are pushed (Layer 2)
CustomOperatorParserExtension.scala catalyst Abstract parser wrapper for infix operator rewriting (Layer 3)

Modified Files

File Description
Analyzer.scala Added ResolveCustomPredicates to Resolution batch; modified LookupFunctions to skip custom predicate names
V2ExpressionBuilder.scala Added capability-gated translation for LIKE/RLIKE/ILIKE/IS_NAN/ARRAY_CONTAINS/MAP_CONTAINS_KEY/ARRAYS_OVERLAP/LIKE_ALL/LIKE_ANY/NOT_LIKE_ALL/NOT_LIKE_ANY; added CustomPredicateExpression translation
PushDownUtils.scala Query SupportsPushDownPredicateCapabilities and pass extra capabilities to translation
DataSourceV2Strategy.scala Accept extra capabilities in translateFilterV2WithMapping
V2ExpressionSQLBuilder.java Fixed StackOverflowError for unknown predicate names in toString()
Predicate.java Updated Javadoc with new predicate names and custom predicate conventions
SQLConf.scala Added spark.sql.dataSource.extendedPredicatePushdown.enabled config
SparkOptimizer.scala Added EnsureCustomPredicatesPushed post-optimization rule
DataSourceV2StrategySuite.scala Added parser extension unit tests
DataSourceV2Suite.scala Added custom predicate and capability-gated predicate integration tests

Configuration

Config Key Default Description
spark.sql.dataSource.extendedPredicatePushdown.enabled true Master switch for all extended predicate pushdown features (Layers 1-3)

schenksj added 5 commits March 6, 2026 20:51
…ability-gated predicate pushdown

  ### What changes were proposed in this pull request?

  Introduces `SupportsPushDownPredicateCapabilities`, a new mix-in interface for
  `ScanBuilder` that allows DataSource V2 implementations to declare support for
  extended predicate types beyond the default always-translated set.

  When a ScanBuilder implements this interface, the V2 expression translator will
  attempt to translate additional builtin Catalyst expressions (LIKE, RLIKE, ILIKE,
  IS_NAN, ARRAY_CONTAINS, MAP_CONTAINS_KEY) that match the declared capabilities.

  Changes:
  - New `SupportsPushDownPredicateCapabilities` interface in `connector.read`
  - `V2ExpressionBuilder` gains `extraCapabilities` parameter with 6 new
    capability-gated match cases (ILIKE before LIKE, MAP_CONTAINS_KEY before
    ARRAY_CONTAINS for correct pattern precedence)
  - `PushDownUtils.pushFilters()` queries capabilities once per scan builder
    and threads them through translation
  - `DataSourceV2Strategy.translateFilterV2WithMapping()` and
    `translateLeafNodeFilterV2()` accept and pass capabilities
  - `Predicate.java` Javadoc updated with extended opt-in predicate names

  ### Why are the changes needed?

  DataSource V2 predicate pushdown is limited to a fixed set of expressions
  hardcoded in `V2ExpressionBuilder`. Data sources that need predicates like
  RLIKE, LIKE, or ARRAY_CONTAINS must resort to fragile workarounds (thread-local
  state, logical plan interception) to push these common predicates.

  ### Does this PR introduce _any_ user-facing change?

  Yes. Data source authors can now implement `SupportsPushDownPredicateCapabilities`
  to opt in to translation of additional builtin predicates. No changes for
  end users unless a data source adopts the new interface.

  ### How was this patch tested?

  - 8 new unit tests in `DataSourceV2StrategySuite` covering each capability-gated
    predicate (LIKE, RLIKE, IS_NAN, ARRAY_CONTAINS, MAP_CONTAINS_KEY, ILIKE),
    pattern precedence, compound expressions, and non-interference with defaults
  - 1 new integration test in `DataSourceV2Suite` with a test ScanBuilder
    implementing `SupportsPushDownPredicateCapabilities`
  Introduces SupportsCustomPredicates, a mix-in for DSv2 Table that lets
  data sources declare custom predicate functions (e.g. spatial search,
  full-text indexing) that Spark resolves in SQL and pushes down as V2
  Predicates.

  New components:
  - CustomPredicateDescriptor: describes a custom predicate with a
    dot-qualified canonical name, SQL name, parameter types, and
    determinism flag.
  - SupportsCustomPredicates: Table mix-in that declares descriptors.
  - CustomPredicateExpression: Catalyst expression for resolved custom
    predicates. Cannot be evaluated by Spark; must be pushed to the
    data source.
  - ResolveCustomPredicates: Analyzer rule (Resolution batch) that
    resolves UnresolvedFunction calls against table-declared descriptors.
  - EnsureCustomPredicatesPushed: Post-optimizer rule that fails the
    query if any custom predicate remains unpushed.

  Also modifies:
  - LookupFunctions: skip validation for functions matching custom
    predicates from DSv2 tables, so ResolveCustomPredicates can handle
    them in the Resolution batch.
  - V2ExpressionBuilder: translate CustomPredicateExpression to V2
    Predicate using the descriptor's canonical name.
  - V2ExpressionSQLBuilder: render unknown GeneralScalarExpression names
    as SQL function calls instead of throwing, supporting custom
    predicate display.
  - SparkOptimizer: register EnsureCustomPredicatesPushed after
    V2ScanRelationPushDown.
…or syntax

  Adds a helper base class that lets data source authors define custom
  infix operators (e.g. `col INDEXQUERY 'param'`) by rewriting them to
  function-call syntax before parsing. The rewritten calls flow through
  Layer 2's SupportsCustomPredicates for resolution and pushdown.

  New components:
  - CustomOperatorParserExtension: abstract ParserInterface wrapper
    that rewrites `expr OP expr` to `func(expr, expr)` in SQL text.
    Masks string literals to avoid rewriting inside quotes. Supports
    case-insensitive operator matching.

  Also fixes:
  - V2ExpressionSQLBuilder: render unknown GeneralScalarExpression
    names as SQL function calls instead of falling through to
    visitUnexpectedExpr (which caused StackOverflowError via
    recursive toString).
  - LookupFunctions: skip validation for functions matching custom
    predicates declared by DSv2 tables referenced in the plan, so
    ResolveCustomPredicates can handle them in the Resolution batch.
…s, and PR text

  - Update Predicate.java Javadoc with extended predicate names and
    custom predicate conventions (dot-qualified canonical names)
  - Add spark.sql.dataSource.extendedPredicatePushdown.enabled config
    switch (default true) gating all three layers
  - Wire config into PushDownUtils, ResolveCustomPredicates, and
    LookupFunctions
  - Fix V2ExpressionSQLBuilder StackOverflowError for unknown predicate
    names by falling back to visitSQLFunction instead of visitUnexpectedExpr
  - Add SPARK-55869-PR.md with pull request description
  - All regression suites passing: DataSourceV2Suite, JDBCV2Suite,
    AnalysisSuite, V2PredicateSuite, SparkSessionExtensionSuite, and others
  Production code fixes:
  - Replace assert with IllegalArgumentException in
    CustomPredicateDescriptor for dot-qualified name validation
  - Add null checks for canonicalName and sqlName parameters
  - Defensive-copy parameterTypes array in constructor and getter
  - Eliminate double lookup in ResolveCustomPredicates by restructuring
    pattern match to bind descriptor directly
  - Document single-table limitation of collectCustomPredicates
  - Use UUID-based placeholder prefix in CustomOperatorParserExtension
    to prevent collision with user SQL containing placeholder-like text
  - Add @return nullability docs to SupportsCustomPredicates and
    SupportsPushDownPredicateCapabilities interfaces

  New tests:
  - Config disabled prevents custom predicate resolution
  - Custom predicates return correct data (checkAnswer verification)
  - Rejected custom predicate fails with clear error via
    EnsureCustomPredicatesPushed
  - Placeholder collision resistance in parser extension
  - Complex WHERE clauses with multiple operator rewrites
@schenksj schenksj changed the title [WIP] [SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2 [SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2 Mar 7, 2026
@schenksj schenksj changed the title [SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2 [WIP][SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2 Mar 7, 2026
  Layer 1 Tier 2 predicates:
  - ARRAYS_OVERLAP: array intersection check
  - LIKE_ALL / NOT_LIKE_ALL: match all/no patterns
  - LIKE_ANY / NOT_LIKE_ANY: match any/not-any patterns
  - Patterns translated as V2 LiteralValue children alongside the
    expression column reference

  New tests:
  - 5 capability-gated translation tests for Tier 2 predicates
@schenksj schenksj changed the title [WIP][SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2 [SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2 Mar 7, 2026
…writing

  Extend the nested AND/OR parser extension test to verify the parsed
  logical plan tree structure, not just string containment. The test
  confirms that:
    (col INDEXQUERY 'value' AND col1 XYZQUERY 'value2')
    OR (col2 INDEXQUERY 'value' AND
        (col3 XYZQUERY 'value3' OR col4 INDEXQUERY 'value4'))
  parses into Or(And(indexquery, xyzquery), And(indexquery, Or(xyzquery,
  indexquery))) with correct UnresolvedFunction name parts at each node.
@schenksj
Copy link
Author

schenksj commented Mar 7, 2026

@cloud-fan @huaxingao -- Would one of you be the right person to work with for a review?

…mption

  Add integration tests demonstrating all three layers with DSv2 data
  sources that actually consume pushed predicates for filtering, not just
  accept and ignore them.

  Layer 1 (capability-gated):
  - New StringDataSourceV2WithPredicateCapabilities with string schema
  - StringBatchWithV2Filter extracts RLIKE pattern from pushed predicate
    and applies regex filtering in the partition reader
  - Tests: RLIKE pushed and consumed, RLIKE alongside CASE expression

  Layer 2 (custom predicates):
  - CustomPredicateBatch replaces AdvancedBatchWithV2Filter for custom
    predicate tests; consumes COM.TEST.MY_SEARCH to filter out i=0 rows
    in planInputPartitions
  - Added checkAnswer verification to all custom predicate tests
  - Tests: my_search alone (returns 1-9), with standard predicates,
    with CASE argument

  Layer 3 (parser extension):
  - End-to-end test: infix SQL parsed through CustomOperatorParserExtension,
    rewritten SQL executed against Layer 2 data source, predicate pushed
    and consumed, checkAnswer verified
  - CASE expression coexisting with infix operator rewrite
@schenksj schenksj closed this Mar 7, 2026
…predicate pushdown

  Add 8 new tests covering previously untested edge cases:

  - CustomPredicateDescriptor validation: null canonicalName, null sqlName,
    and non-dot-qualified name all throw IllegalArgumentException
  - CustomPredicateDescriptor convenience constructor: derives sqlName from
    last segment of canonicalName
  - Config disabled for Layer 1: RLIKE is NOT translated/pushed when
    spark.sql.dataSource.extendedPredicatePushdown.enabled is false,
    even when the data source declares the capability
  - Without capability, RLIKE not pushed: data source without
    SupportsPushDownPredicateCapabilities does not receive RLIKE predicates
  - ILIKE E2E: capability-gated ILIKE predicate is pushed and consumed
    (uses column-column reference to avoid constant folding)
  - parseQuery delegation: CustomOperatorParserExtension.parseQuery also
    rewrites infix operators, not just parsePlan
@schenksj schenksj reopened this Mar 7, 2026
@schenksj
Copy link
Author

schenksj commented Mar 7, 2026

@cloud-fan @huaxingao -- The ticket auto-closed when i added some extra test cases, so I'm re-tagging in case that is necessary.

@schenksj schenksj changed the title [SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2 [WIP][SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2 Mar 7, 2026
schenksj added 4 commits March 7, 2026 14:40
…e pushdown

  - V2ExpressionSQLBuilder: only render dot-qualified names as SQL function
    calls; fall through to visitUnexpectedExpr for unknown unqualified names
    to preserve existing error behavior
  - ResolveCustomPredicates: collect descriptors from all DSv2 relations
    (not just the first) to support multi-table queries with custom predicates
  - LookupFunctions: add early-return exists() check to skip full plan
    traversal when no DSv2 relations are present
  - CustomOperatorParserExtension: add negative lookbehind for arithmetic
    and comparison operators to prevent incorrect left-operand matching
    in complex expressions (e.g. x + y OP val)
  - CustomPredicateDescriptor: guard against NPE in convenience constructor
    when canonicalName is null
  - PushDownUtils: fix formatting of if-condition
  - DataSourceV2Suite: extract rewriteInfixSql() helper to eliminate
    duplicated capturingExt boilerplate in Layer 3 tests
  - Update PR description with accurate test counts (112/82)
…dd missing tests

  - Add equals(), hashCode(), toString() to CustomPredicateDescriptor
  - Fix StackOverflowError in V2ExpressionSQLBuilder default case where
    visitUnexpectedExpr called String.valueOf(expr) which triggered
    toString() -> build() -> visitUnexpectedExpr infinite recursion;
    revert to visitSQLFunction as catch-all for unknown predicate names
  - Fix AnalysisException constructor in ResolveCustomPredicates to use
    SparkException.internalError() for ambiguous cross-table predicates
  - Remove unused Filter import from EnsureCustomPredicatesPushed
  - Add intent comments to dense business logic in V2ExpressionBuilder
    (null pattern filtering) and ResolveCustomPredicates (Filter-only
    resolution, single-part name matching, implicit cast logic)
  - Add 5 new tests: descriptor equals/hashCode/toString, NOT(RLIKE)
    translation, custom predicate in SELECT context error, NOT(RLIKE)
    E2E correctness, pushed predicate argument inspection
…er, add comments and tests

  - Fix V2ExpressionSQLBuilder default case: add extended predicate names
    (LIKE, RLIKE, ILIKE, IS_NAN, ARRAY_CONTAINS, MAP_CONTAINS_KEY,
    ARRAYS_OVERLAP, LIKE_ALL/ANY, NOT_LIKE_ALL/ANY) to the known switch
    list; use dot-qualified check for custom predicates in default case;
    use `name` instead of `String.valueOf(expr)` in error to prevent
    infinite recursion (toString → build → visitUnexpectedExpr → ...)
  - Add equals(), hashCode(), toString() to CustomPredicateDescriptor
  - Use SparkException.internalError() in CustomPredicateExpression.eval()
    and ResolveCustomPredicates for consistency
  - Extract translateMultiPatternLike helper to deduplicate LikeAll,
    NotLikeAll, LikeAny, NotLikeAny translation logic
  - Document ILIKE non-literal pattern limitation in Predicate.java
  - Add Javadoc to translateFilterV2 explaining intentional omission of
    extraCapabilities, and to collectCustomPredicateNames explaining
    fixed-point convergence
  - Add intent comments to V2ExpressionBuilder (custom predicate
    translation, null pattern filtering), ResolveCustomPredicates
    (Filter-only resolution, single-part name matching, implicit casts)
  - Remove unused Filter import from EnsureCustomPredicatesPushed
  - Add 6 new tests: descriptor equals/hashCode/toString, NOT(RLIKE)
    V2 translation, custom predicate in SELECT context, NOT(RLIKE) E2E,
    pushed predicate argument inspection, ambiguous cross-table predicate
…k masking

  Fix StackOverflowError in V2ExpressionSQLBuilder.visitUnexpectedExpr by
  using expr.getClass().getSimpleName() instead of String.valueOf(expr),
  which triggered infinite recursion through ExpressionWithToString.toString().
  Simplify the default switch case back to visitUnexpectedExpr(expr) now that
  the root cause is resolved.

  Add backtick-quoted identifier masking to CustomOperatorParserExtension
  so custom operators inside backtick-quoted names are not rewritten.
  Add cross-reference to StringUtils.stripComment explaining why we mask
  instead of strip.

  Let me know if you need anything else.
@schenksj schenksj changed the title [WIP][SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2 [SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2 Mar 8, 2026
@schenksj schenksj changed the title [SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2 [WIP][SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2 Mar 8, 2026
@schenksj schenksj changed the title [WIP][SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2 [SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2 Mar 8, 2026
@schenksj schenksj changed the title [SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2 [WIP][SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2 Mar 8, 2026
…ecks

  Replace Unicode em-dashes and arrows in test comments with ASCII
  equivalents to satisfy the nonascii scalastyle rule.
@schenksj schenksj changed the title [WIP][SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2 [SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2 Mar 8, 2026
  Remove the default-escape-only restriction from LIKE and ILIKE
  translation. The escape character is now passed as a third LiteralValue
  argument so data sources can apply correct escaping semantics for any
  escape character, not just the default backslash.

  Update Predicate Javadoc to document the [expr, pattern, escapeChar]
  children format and add unit tests for custom escape characters.
@schenksj schenksj changed the title [SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2 [WIP][SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2 Mar 8, 2026
@schenksj schenksj changed the title [WIP][SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2 [SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2 Mar 8, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant