Skip to content

Conversation

@XuQianJin-Stars
Copy link
Contributor

@XuQianJin-Stars XuQianJin-Stars commented Jan 18, 2026

Purpose

Linked issue: close #2406

This PR introduces the parser and execution framework for Spark's CALL procedure command, allowing users to invoke stored procedures using SQL syntax like CALL sys.procedure_name(args). This provides a foundation for implementing various administrative and maintenance operations.

All implementations are in Scala for better integration with Spark's ecosystem.

Brief change log

Core Framework (Scala):

  • Added Procedure trait in fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/Procedure.scala
  • Added ProcedureParameter trait and case class implementation for parameter definitions in ProcedureParameter.scala
  • Added BaseProcedure abstract class providing common utilities in BaseProcedure.scala
  • Added ProcedureBuilder trait for procedure instantiation in ProcedureBuilder.scala
  • Added ProcedureCatalog trait for catalog integration in catalog/ProcedureCatalog.scala

Parser & SQL Extensions:

  • Created ANTLR grammar FlussSqlExtensions.g4 for CALL statement syntax
  • Implemented FlussSparkSqlParser extending Spark's ParserInterface
  • Implemented FlussSqlExtensionsAstBuilder to convert ANTLR parse tree to logical plans
  • Added custom Origin and CurrentOrigin handling for source position tracking
  • Added Maven ANTLR4 plugin configuration to fluss-spark-common/pom.xml

Logical & Physical Plans:

  • Created FlussCallStatement (unresolved) and FlussCallCommand (resolved) logical plan nodes
  • Created FlussCallArgument, FlussPositionalArgument, and FlussNamedArgument for argument representation
  • Implemented CallProcedureExec physical plan node for execution

Analysis & Execution:

  • Implemented FlussProcedureResolver analyzer rule for procedure resolution and validation
  • Implemented FlussStrategy planner strategy to inject CallProcedureExec
  • Created FlussSparkSessionExtensions to register all custom components

Catalog Integration:

  • Modified SparkCatalog to implement ProcedureCatalog
  • Updated FlussSparkTestBase to enable SQL extensions in test environment

Procedure Registry (Scala):

  • Created SparkProcedures object in fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkProcedures.scala for managing procedure builders
  • Added NoSuchProcedureException class in analysis/NoSuchProcedureException.scala for error handling

Example Implementation (Scala):

  • Implemented CompactProcedure in procedure/CompactProcedure.scala as a sample procedure (skeleton implementation)

Documentation & Tests (Scala):

  • Added PROCEDURES.md documenting the new feature
  • Added CallStatementParserTest.scala in fluss-spark-ut/src/test/scala with comprehensive parser tests

Tests

Unit Tests (ScalaTest):

  • CallStatementParserTest: Tests parsing of CALL statements
    • testCallWithBackticks: Tests backtick-quoted identifiers
    • testCallWithNamedArguments: Tests named argument syntax
    • testCallWithPositionalArguments: Tests positional arguments with various data types
    • testCallWithMixedArguments: Tests mixed named and positional arguments
    • testCallSimpleProcedure: Tests simple procedure call

All existing tests in fluss-spark-ut module pass successfully.

API and Format

New Public APIs (Scala):

  • Procedure trait: Defines contract for stored procedures
  • ProcedureParameter trait: Defines procedure parameters with companion object factory methods
  • ProcedureCatalog trait: Extends Spark's TableCatalog with procedure loading capability

Modified APIs:

  • SparkCatalog now implements ProcedureCatalog trait

No changes to storage format.

Documentation

New feature introduced: Spark CALL procedure command support

Documentation added:

  • fluss-spark/PROCEDURES.md: Comprehensive guide on using the CALL procedure feature
    • Syntax examples
    • Available procedures
    • Usage guidelines
    • Extension points for custom procedures

Configuration required:
Users need to configure Spark session with:
spark.sql.extensions = org.apache.fluss.spark.extensions.FlussSparkSessionExtensions

This commit implements support for CALL procedure command in Spark SQL, based on the architecture from apache/paimon PR apache#1785.

Key changes:
- Added Procedure interface and ProcedureParameter for defining stored procedures
- Implemented ProcedureCatalog interface to load procedures
- Created ANTLR grammar (FlussSqlExtensions.g4) for parsing CALL syntax
- Added FlussSqlExtensionsParser and FlussSqlExtensionsAstBuilder for SQL parsing
- Implemented FlussProcedureResolver for resolving procedure calls
- Created FlussStrategy and CallProcedureExec for execution
- Added FlussSparkSessionExtensions to register parser, resolver, and execution strategy
- Implemented CompactProcedure as a sample procedure
- Updated SparkCatalog to implement ProcedureCatalog interface
- Added SparkProcedures registry for managing procedures
- Added comprehensive documentation in PROCEDURES.md
- Added unit tests for procedure functionality

The implementation supports:
- Named arguments: CALL sys.compact(table => 'db.table')
- Positional arguments: CALL sys.compact('db.table')
- Parameter validation and type checking
- System namespace (sys) for built-in procedures
@XuQianJin-Stars XuQianJin-Stars force-pushed the feature/issue-2406-procedure branch from ff9735b to b9d21ed Compare January 18, 2026 10:31
@XuQianJin-Stars
Copy link
Contributor Author

@wuchong @YannByron hi, Please help review when you got some time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[spark] Supports parser of Spark call procedure command

1 participant