[spark] Support parser of Spark call procedure command #2408
+1,529
−2
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Purpose
Linked issue: close #2406
This PR introduces the parser and execution framework for Spark's CALL procedure command, allowing users to invoke stored procedures using SQL syntax like
CALL sys.procedure_name(args). This provides a foundation for implementing various administrative and maintenance operations.All implementations are in Scala for better integration with Spark's ecosystem.
Brief change log
Core Framework (Scala):
Proceduretrait influss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/Procedure.scalaProcedureParametertrait and case class implementation for parameter definitions inProcedureParameter.scalaBaseProcedureabstract class providing common utilities inBaseProcedure.scalaProcedureBuildertrait for procedure instantiation inProcedureBuilder.scalaProcedureCatalogtrait for catalog integration incatalog/ProcedureCatalog.scalaParser & SQL Extensions:
FlussSqlExtensions.g4for CALL statement syntaxFlussSparkSqlParserextending Spark'sParserInterfaceFlussSqlExtensionsAstBuilderto convert ANTLR parse tree to logical plansOriginandCurrentOriginhandling for source position trackingfluss-spark-common/pom.xmlLogical & Physical Plans:
FlussCallStatement(unresolved) andFlussCallCommand(resolved) logical plan nodesFlussCallArgument,FlussPositionalArgument, andFlussNamedArgumentfor argument representationCallProcedureExecphysical plan node for executionAnalysis & Execution:
FlussProcedureResolveranalyzer rule for procedure resolution and validationFlussStrategyplanner strategy to injectCallProcedureExecFlussSparkSessionExtensionsto register all custom componentsCatalog Integration:
SparkCatalogto implementProcedureCatalogFlussSparkTestBaseto enable SQL extensions in test environmentProcedure Registry (Scala):
SparkProceduresobject influss-spark-common/src/main/scala/org/apache/fluss/spark/SparkProcedures.scalafor managing procedure buildersNoSuchProcedureExceptionclass inanalysis/NoSuchProcedureException.scalafor error handlingExample Implementation (Scala):
CompactProcedureinprocedure/CompactProcedure.scalaas a sample procedure (skeleton implementation)Documentation & Tests (Scala):
PROCEDURES.mddocumenting the new featureCallStatementParserTest.scalainfluss-spark-ut/src/test/scalawith comprehensive parser testsTests
Unit Tests (ScalaTest):
CallStatementParserTest: Tests parsing of CALL statementstestCallWithBackticks: Tests backtick-quoted identifierstestCallWithNamedArguments: Tests named argument syntaxtestCallWithPositionalArguments: Tests positional arguments with various data typestestCallWithMixedArguments: Tests mixed named and positional argumentstestCallSimpleProcedure: Tests simple procedure callAll existing tests in
fluss-spark-utmodule pass successfully.API and Format
New Public APIs (Scala):
Proceduretrait: Defines contract for stored proceduresProcedureParametertrait: Defines procedure parameters with companion object factory methodsProcedureCatalogtrait: Extends Spark'sTableCatalogwith procedure loading capabilityModified APIs:
SparkCatalognow implementsProcedureCatalogtraitNo changes to storage format.
Documentation
New feature introduced: Spark CALL procedure command support
Documentation added:
fluss-spark/PROCEDURES.md: Comprehensive guide on using the CALL procedure featureConfiguration required:
Users need to configure Spark session with:
spark.sql.extensions = org.apache.fluss.spark.extensions.FlussSparkSessionExtensions