[SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2#54669
Open
schenksj wants to merge 18 commits intoapache:masterfrom
Open
[SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2#54669schenksj wants to merge 18 commits intoapache:masterfrom
schenksj wants to merge 18 commits intoapache:masterfrom
Conversation
…ability-gated predicate pushdown
### What changes were proposed in this pull request?
Introduces `SupportsPushDownPredicateCapabilities`, a new mix-in interface for
`ScanBuilder` that allows DataSource V2 implementations to declare support for
extended predicate types beyond the default always-translated set.
When a ScanBuilder implements this interface, the V2 expression translator will
attempt to translate additional builtin Catalyst expressions (LIKE, RLIKE, ILIKE,
IS_NAN, ARRAY_CONTAINS, MAP_CONTAINS_KEY) that match the declared capabilities.
Changes:
- New `SupportsPushDownPredicateCapabilities` interface in `connector.read`
- `V2ExpressionBuilder` gains `extraCapabilities` parameter with 6 new
capability-gated match cases (ILIKE before LIKE, MAP_CONTAINS_KEY before
ARRAY_CONTAINS for correct pattern precedence)
- `PushDownUtils.pushFilters()` queries capabilities once per scan builder
and threads them through translation
- `DataSourceV2Strategy.translateFilterV2WithMapping()` and
`translateLeafNodeFilterV2()` accept and pass capabilities
- `Predicate.java` Javadoc updated with extended opt-in predicate names
### Why are the changes needed?
DataSource V2 predicate pushdown is limited to a fixed set of expressions
hardcoded in `V2ExpressionBuilder`. Data sources that need predicates like
RLIKE, LIKE, or ARRAY_CONTAINS must resort to fragile workarounds (thread-local
state, logical plan interception) to push these common predicates.
### Does this PR introduce _any_ user-facing change?
Yes. Data source authors can now implement `SupportsPushDownPredicateCapabilities`
to opt in to translation of additional builtin predicates. No changes for
end users unless a data source adopts the new interface.
### How was this patch tested?
- 8 new unit tests in `DataSourceV2StrategySuite` covering each capability-gated
predicate (LIKE, RLIKE, IS_NAN, ARRAY_CONTAINS, MAP_CONTAINS_KEY, ILIKE),
pattern precedence, compound expressions, and non-interference with defaults
- 1 new integration test in `DataSourceV2Suite` with a test ScanBuilder
implementing `SupportsPushDownPredicateCapabilities`
Introduces SupportsCustomPredicates, a mix-in for DSv2 Table that lets
data sources declare custom predicate functions (e.g. spatial search,
full-text indexing) that Spark resolves in SQL and pushes down as V2
Predicates.
New components:
- CustomPredicateDescriptor: describes a custom predicate with a
dot-qualified canonical name, SQL name, parameter types, and
determinism flag.
- SupportsCustomPredicates: Table mix-in that declares descriptors.
- CustomPredicateExpression: Catalyst expression for resolved custom
predicates. Cannot be evaluated by Spark; must be pushed to the
data source.
- ResolveCustomPredicates: Analyzer rule (Resolution batch) that
resolves UnresolvedFunction calls against table-declared descriptors.
- EnsureCustomPredicatesPushed: Post-optimizer rule that fails the
query if any custom predicate remains unpushed.
Also modifies:
- LookupFunctions: skip validation for functions matching custom
predicates from DSv2 tables, so ResolveCustomPredicates can handle
them in the Resolution batch.
- V2ExpressionBuilder: translate CustomPredicateExpression to V2
Predicate using the descriptor's canonical name.
- V2ExpressionSQLBuilder: render unknown GeneralScalarExpression names
as SQL function calls instead of throwing, supporting custom
predicate display.
- SparkOptimizer: register EnsureCustomPredicatesPushed after
V2ScanRelationPushDown.
…or syntax
Adds a helper base class that lets data source authors define custom
infix operators (e.g. `col INDEXQUERY 'param'`) by rewriting them to
function-call syntax before parsing. The rewritten calls flow through
Layer 2's SupportsCustomPredicates for resolution and pushdown.
New components:
- CustomOperatorParserExtension: abstract ParserInterface wrapper
that rewrites `expr OP expr` to `func(expr, expr)` in SQL text.
Masks string literals to avoid rewriting inside quotes. Supports
case-insensitive operator matching.
Also fixes:
- V2ExpressionSQLBuilder: render unknown GeneralScalarExpression
names as SQL function calls instead of falling through to
visitUnexpectedExpr (which caused StackOverflowError via
recursive toString).
- LookupFunctions: skip validation for functions matching custom
predicates declared by DSv2 tables referenced in the plan, so
ResolveCustomPredicates can handle them in the Resolution batch.
…s, and PR text
- Update Predicate.java Javadoc with extended predicate names and
custom predicate conventions (dot-qualified canonical names)
- Add spark.sql.dataSource.extendedPredicatePushdown.enabled config
switch (default true) gating all three layers
- Wire config into PushDownUtils, ResolveCustomPredicates, and
LookupFunctions
- Fix V2ExpressionSQLBuilder StackOverflowError for unknown predicate
names by falling back to visitSQLFunction instead of visitUnexpectedExpr
- Add SPARK-55869-PR.md with pull request description
- All regression suites passing: DataSourceV2Suite, JDBCV2Suite,
AnalysisSuite, V2PredicateSuite, SparkSessionExtensionSuite, and others
Production code fixes:
- Replace assert with IllegalArgumentException in
CustomPredicateDescriptor for dot-qualified name validation
- Add null checks for canonicalName and sqlName parameters
- Defensive-copy parameterTypes array in constructor and getter
- Eliminate double lookup in ResolveCustomPredicates by restructuring
pattern match to bind descriptor directly
- Document single-table limitation of collectCustomPredicates
- Use UUID-based placeholder prefix in CustomOperatorParserExtension
to prevent collision with user SQL containing placeholder-like text
- Add @return nullability docs to SupportsCustomPredicates and
SupportsPushDownPredicateCapabilities interfaces
New tests:
- Config disabled prevents custom predicate resolution
- Custom predicates return correct data (checkAnswer verification)
- Rejected custom predicate fails with clear error via
EnsureCustomPredicatesPushed
- Placeholder collision resistance in parser extension
- Complex WHERE clauses with multiple operator rewrites
Layer 1 Tier 2 predicates:
- ARRAYS_OVERLAP: array intersection check
- LIKE_ALL / NOT_LIKE_ALL: match all/no patterns
- LIKE_ANY / NOT_LIKE_ANY: match any/not-any patterns
- Patterns translated as V2 LiteralValue children alongside the
expression column reference
New tests:
- 5 capability-gated translation tests for Tier 2 predicates
…writing
Extend the nested AND/OR parser extension test to verify the parsed
logical plan tree structure, not just string containment. The test
confirms that:
(col INDEXQUERY 'value' AND col1 XYZQUERY 'value2')
OR (col2 INDEXQUERY 'value' AND
(col3 XYZQUERY 'value3' OR col4 INDEXQUERY 'value4'))
parses into Or(And(indexquery, xyzquery), And(indexquery, Or(xyzquery,
indexquery))) with correct UnresolvedFunction name parts at each node.
Author
|
@cloud-fan @huaxingao -- Would one of you be the right person to work with for a review? |
…mption
Add integration tests demonstrating all three layers with DSv2 data
sources that actually consume pushed predicates for filtering, not just
accept and ignore them.
Layer 1 (capability-gated):
- New StringDataSourceV2WithPredicateCapabilities with string schema
- StringBatchWithV2Filter extracts RLIKE pattern from pushed predicate
and applies regex filtering in the partition reader
- Tests: RLIKE pushed and consumed, RLIKE alongside CASE expression
Layer 2 (custom predicates):
- CustomPredicateBatch replaces AdvancedBatchWithV2Filter for custom
predicate tests; consumes COM.TEST.MY_SEARCH to filter out i=0 rows
in planInputPartitions
- Added checkAnswer verification to all custom predicate tests
- Tests: my_search alone (returns 1-9), with standard predicates,
with CASE argument
Layer 3 (parser extension):
- End-to-end test: infix SQL parsed through CustomOperatorParserExtension,
rewritten SQL executed against Layer 2 data source, predicate pushed
and consumed, checkAnswer verified
- CASE expression coexisting with infix operator rewrite
…predicate pushdown
Add 8 new tests covering previously untested edge cases:
- CustomPredicateDescriptor validation: null canonicalName, null sqlName,
and non-dot-qualified name all throw IllegalArgumentException
- CustomPredicateDescriptor convenience constructor: derives sqlName from
last segment of canonicalName
- Config disabled for Layer 1: RLIKE is NOT translated/pushed when
spark.sql.dataSource.extendedPredicatePushdown.enabled is false,
even when the data source declares the capability
- Without capability, RLIKE not pushed: data source without
SupportsPushDownPredicateCapabilities does not receive RLIKE predicates
- ILIKE E2E: capability-gated ILIKE predicate is pushed and consumed
(uses column-column reference to avoid constant folding)
- parseQuery delegation: CustomOperatorParserExtension.parseQuery also
rewrites infix operators, not just parsePlan
Author
|
@cloud-fan @huaxingao -- The ticket auto-closed when i added some extra test cases, so I'm re-tagging in case that is necessary. |
…e pushdown
- V2ExpressionSQLBuilder: only render dot-qualified names as SQL function
calls; fall through to visitUnexpectedExpr for unknown unqualified names
to preserve existing error behavior
- ResolveCustomPredicates: collect descriptors from all DSv2 relations
(not just the first) to support multi-table queries with custom predicates
- LookupFunctions: add early-return exists() check to skip full plan
traversal when no DSv2 relations are present
- CustomOperatorParserExtension: add negative lookbehind for arithmetic
and comparison operators to prevent incorrect left-operand matching
in complex expressions (e.g. x + y OP val)
- CustomPredicateDescriptor: guard against NPE in convenience constructor
when canonicalName is null
- PushDownUtils: fix formatting of if-condition
- DataSourceV2Suite: extract rewriteInfixSql() helper to eliminate
duplicated capturingExt boilerplate in Layer 3 tests
- Update PR description with accurate test counts (112/82)
…dd missing tests
- Add equals(), hashCode(), toString() to CustomPredicateDescriptor
- Fix StackOverflowError in V2ExpressionSQLBuilder default case where
visitUnexpectedExpr called String.valueOf(expr) which triggered
toString() -> build() -> visitUnexpectedExpr infinite recursion;
revert to visitSQLFunction as catch-all for unknown predicate names
- Fix AnalysisException constructor in ResolveCustomPredicates to use
SparkException.internalError() for ambiguous cross-table predicates
- Remove unused Filter import from EnsureCustomPredicatesPushed
- Add intent comments to dense business logic in V2ExpressionBuilder
(null pattern filtering) and ResolveCustomPredicates (Filter-only
resolution, single-part name matching, implicit cast logic)
- Add 5 new tests: descriptor equals/hashCode/toString, NOT(RLIKE)
translation, custom predicate in SELECT context error, NOT(RLIKE)
E2E correctness, pushed predicate argument inspection
…er, add comments and tests
- Fix V2ExpressionSQLBuilder default case: add extended predicate names
(LIKE, RLIKE, ILIKE, IS_NAN, ARRAY_CONTAINS, MAP_CONTAINS_KEY,
ARRAYS_OVERLAP, LIKE_ALL/ANY, NOT_LIKE_ALL/ANY) to the known switch
list; use dot-qualified check for custom predicates in default case;
use `name` instead of `String.valueOf(expr)` in error to prevent
infinite recursion (toString → build → visitUnexpectedExpr → ...)
- Add equals(), hashCode(), toString() to CustomPredicateDescriptor
- Use SparkException.internalError() in CustomPredicateExpression.eval()
and ResolveCustomPredicates for consistency
- Extract translateMultiPatternLike helper to deduplicate LikeAll,
NotLikeAll, LikeAny, NotLikeAny translation logic
- Document ILIKE non-literal pattern limitation in Predicate.java
- Add Javadoc to translateFilterV2 explaining intentional omission of
extraCapabilities, and to collectCustomPredicateNames explaining
fixed-point convergence
- Add intent comments to V2ExpressionBuilder (custom predicate
translation, null pattern filtering), ResolveCustomPredicates
(Filter-only resolution, single-part name matching, implicit casts)
- Remove unused Filter import from EnsureCustomPredicatesPushed
- Add 6 new tests: descriptor equals/hashCode/toString, NOT(RLIKE)
V2 translation, custom predicate in SELECT context, NOT(RLIKE) E2E,
pushed predicate argument inspection, ambiguous cross-table predicate
…k masking Fix StackOverflowError in V2ExpressionSQLBuilder.visitUnexpectedExpr by using expr.getClass().getSimpleName() instead of String.valueOf(expr), which triggered infinite recursion through ExpressionWithToString.toString(). Simplify the default switch case back to visitUnexpectedExpr(expr) now that the root cause is resolved. Add backtick-quoted identifier masking to CustomOperatorParserExtension so custom operators inside backtick-quoted names are not rewritten. Add cross-reference to StringUtils.stripComment explaining why we mask instead of strip. Let me know if you need anything else.
…ecks Replace Unicode em-dashes and arrows in test comments with ASCII equivalents to satisfy the nonascii scalastyle rule.
Remove the default-escape-only restriction from LIKE and ILIKE translation. The escape character is now passed as a third LiteralValue argument so data sources can apply correct escaping semantics for any escape character, not just the default backslash. Update Predicate Javadoc to document the [expr, pattern, escapeChar] children format and add unit tests for custom escape characters.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
[SPARK-55869][SQL] Extended Predicate Pushdown for DataSource V2
What changes were proposed in this pull request?
This PR extends Spark's DataSource V2 predicate pushdown framework with three layers of new functionality, all gated behind a single config switch (
spark.sql.dataSource.extendedPredicatePushdown.enabled, defaulttrue).Layer 1: Capability-Gated Builtin Predicate Translation
Data sources can now opt in to receiving additional builtin predicates by implementing
SupportsPushDownPredicateCapabilitieson theirScanBuilder. The interface declares a set of predicate names the source can handle:LIKE— full pattern matching (expr1 LIKE expr2)RLIKE— regex matching (expr1 RLIKE expr2)ILIKE— case-insensitive LIKE (expr1 ILIKE expr2)IS_NAN— NaN check (isnan(expr))ARRAY_CONTAINS— array element check (array_contains(expr1, expr2))MAP_CONTAINS_KEY— map key check (map_contains_key(expr1, expr2))ARRAYS_OVERLAP— array intersection check (arrays_overlap(arr1, arr2))LIKE_ALL— match all patterns (expr LIKE ALL (p1, p2, ...))LIKE_ANY— match any pattern (expr LIKE ANY (p1, p2, ...))NOT_LIKE_ALL— negated match-all (expr NOT LIKE ALL (p1, p2, ...))NOT_LIKE_ANY— negated match-any (expr NOT LIKE ANY (p1, p2, ...))V2ExpressionBuilderchecks the declared capabilities before translating these expressions, so sources that don't declare support continue to see existing behavior.Layer 2: Custom Predicate Functions via
SupportsCustomPredicatesTables can declare custom predicate functions by implementing
SupportsCustomPredicates, which returns an array ofCustomPredicateDescriptorobjects. Each descriptor specifies:canonicalName()— dot-qualified name (e.g.com.mycompany.MY_SEARCH) used in the V2PredicatesqlName()— the unqualified name users write in SQL (e.g.my_search)parameterTypes()— optional expected parameter types (enables automatic casting)isDeterministic()— whether the predicate is deterministicUsers write standard SQL function-call syntax:
SELECT * FROM t WHERE my_search(col, 'param'). The analyzer resolves these against the table's descriptors and producesCustomPredicateExpressionnodes, whichV2ExpressionBuildertranslates into V2Predicateobjects using the dot-qualified canonical name.A post-optimizer rule (
EnsureCustomPredicatesPushed) fails the query if any custom predicate remains unpushed, since Spark cannot evaluate them locally.Layer 3: Custom Infix Operator Syntax via Parser Extensions
For data sources that want infix operator syntax (e.g.
col INDEXQUERY 'param'), an abstractCustomOperatorParserExtensionbase class is provided. It rewrites infix expressions to function calls before the standard parser runs:Data source authors extend
CustomOperatorParserExtension, implementcustomOperators, and register viaSparkSessionExtensions.injectParser.How was this patch tested?
Unit tests in
DataSourceV2StrategySuite:CustomOperatorParserExtension(infix rewriting, case-insensitivity, string literal preservation, placeholder collision resistance, complex WHERE clauses, parseQuery delegation, etc.)CustomPredicateDescriptorvalidation (null canonicalName, null sqlName, missing dot, convenience constructor)Integration tests in
DataSourceV2Suite:checkAnswer)EnsureCustomPredicatesPushedRegression suites -- all passing:
DataSourceV2Suite(112 tests)DataSourceV2StrategySuite(82 tests)DataSourceV2FunctionSuite(44 tests)V2PredicateSuite(18 tests)SparkSessionExtensionSuite(31 tests)AnalysisSuite(82 tests)JDBCV2Suite(79 tests)Style checks: scalastyle (catalyst + core) and checkstyle (catalyst) all pass.
Was this patch authored or co-authored using generative AI tooling?
Yes.
New Files
SupportsPushDownPredicateCapabilities.javaScanBuilderdeclaring supported predicate names (Layer 1)SupportsCustomPredicates.javaTabledeclaring custom predicate descriptors (Layer 2)CustomPredicateDescriptor.javaCustomPredicateExpression.scalaResolveCustomPredicates.scalaEnsureCustomPredicatesPushed.scalaCustomOperatorParserExtension.scalaModified Files
Analyzer.scalaResolveCustomPredicatesto Resolution batch; modifiedLookupFunctionsto skip custom predicate namesV2ExpressionBuilder.scalaCustomPredicateExpressiontranslationPushDownUtils.scalaSupportsPushDownPredicateCapabilitiesand pass extra capabilities to translationDataSourceV2Strategy.scalatranslateFilterV2WithMappingV2ExpressionSQLBuilder.javatoString()Predicate.javaSQLConf.scalaspark.sql.dataSource.extendedPredicatePushdown.enabledconfigSparkOptimizer.scalaEnsureCustomPredicatesPushedpost-optimization ruleDataSourceV2StrategySuite.scalaDataSourceV2Suite.scalaConfiguration
spark.sql.dataSource.extendedPredicatePushdown.enabledtrue