From b9d21ed44f504da46d6673601cf08e67974f022e Mon Sep 17 00:00:00 2001 From: forwardxu Date: Sun, 18 Jan 2026 13:31:26 +0800 Subject: [PATCH 1/2] [spark] Support parser of Spark call procedure command This commit implements support for CALL procedure command in Spark SQL, based on the architecture from apache/paimon PR #1785. Key changes: - Added Procedure interface and ProcedureParameter for defining stored procedures - Implemented ProcedureCatalog interface to load procedures - Created ANTLR grammar (FlussSqlExtensions.g4) for parsing CALL syntax - Added FlussSqlExtensionsParser and FlussSqlExtensionsAstBuilder for SQL parsing - Implemented FlussProcedureResolver for resolving procedure calls - Created FlussStrategy and CallProcedureExec for execution - Added FlussSparkSessionExtensions to register parser, resolver, and execution strategy - Implemented CompactProcedure as a sample procedure - Updated SparkCatalog to implement ProcedureCatalog interface - Added SparkProcedures registry for managing procedures - Added comprehensive documentation in PROCEDURES.md - Added unit tests for procedure functionality The implementation supports: - Named arguments: CALL sys.compact(table => 'db.table') - Positional arguments: CALL sys.compact('db.table') - Parameter validation and type checking - System namespace (sys) for built-in procedures --- fluss-spark/PROCEDURES.md | 96 +++++++++ fluss-spark/fluss-spark-common/pom.xml | 22 ++ .../FlussSqlExtensions.g4 | 195 ++++++++++++++++++ .../apache/fluss/spark/SparkProcedures.java | 53 +++++ .../analysis/NoSuchProcedureException.java | 33 +++ .../fluss/spark/catalog/ProcedureCatalog.java | 37 ++++ .../fluss/spark/procedure/BaseProcedure.java | 123 +++++++++++ .../spark/procedure/CompactProcedure.java | 101 +++++++++ .../fluss/spark/procedure/Procedure.java | 45 ++++ .../spark/procedure/ProcedureBuilder.java | 40 ++++ .../spark/procedure/ProcedureParameter.java | 56 +++++ .../procedure/ProcedureParameterImpl.java | 50 +++++ .../org/apache/fluss/spark/SparkCatalog.scala | 25 ++- .../analysis/FlussProcedureResolver.scala | 188 +++++++++++++++++ .../plans/logical/FlussCallCommand.scala | 52 +++++ .../spark/execution/CallProcedureExec.scala | 49 +++++ .../fluss/spark/execution/FlussStrategy.scala | 36 ++++ .../FlussSparkSessionExtensions.scala | 39 ++++ .../FlussSqlExtensionsAstBuilder.scala | 145 +++++++++++++ .../extensions/FlussSqlExtensionsParser.scala | 143 +++++++++++++ .../extensions/CallStatementParserTest.java | 178 ++++++++++++++++ .../fluss/spark/FlussSparkTestBase.scala | 2 + 22 files changed, 1706 insertions(+), 2 deletions(-) create mode 100644 fluss-spark/PROCEDURES.md create mode 100644 fluss-spark/fluss-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/FlussSqlExtensions.g4 create mode 100644 fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/SparkProcedures.java create mode 100644 fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/analysis/NoSuchProcedureException.java create mode 100644 fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/catalog/ProcedureCatalog.java create mode 100644 fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/BaseProcedure.java create mode 100644 fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/CompactProcedure.java create mode 100644 fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/Procedure.java create mode 100644 fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureBuilder.java create mode 100644 fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureParameter.java create mode 100644 fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureParameterImpl.java create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalyst/analysis/FlussProcedureResolver.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalyst/plans/logical/FlussCallCommand.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/execution/CallProcedureExec.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/execution/FlussStrategy.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/extensions/FlussSparkSessionExtensions.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/FlussSqlExtensionsAstBuilder.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/FlussSqlExtensionsParser.scala create mode 100644 fluss-spark/fluss-spark-ut/src/test/java/org/apache/fluss/spark/extensions/CallStatementParserTest.java diff --git a/fluss-spark/PROCEDURES.md b/fluss-spark/PROCEDURES.md new file mode 100644 index 0000000000..ea01e79c6c --- /dev/null +++ b/fluss-spark/PROCEDURES.md @@ -0,0 +1,96 @@ +# Fluss Spark Procedures + +This document describes the stored procedures available in Fluss for Spark. + +## Overview + +Fluss provides stored procedures to perform administrative and management operations through Spark SQL. All procedures are located in the `sys` namespace and can be invoked using the `CALL` statement. + +## Configuration + +To enable Fluss procedures in Spark, you need to configure the Spark session extensions: + +```scala +spark.conf.set("spark.sql.extensions", "org.apache.fluss.spark.extensions.FlussSparkSessionExtensions") +``` + +Or in `spark-defaults.conf`: + +```properties +spark.sql.extensions=org.apache.fluss.spark.extensions.FlussSparkSessionExtensions +``` + +## Syntax + +The general syntax for calling a procedure is: + +```sql +CALL [catalog_name.]sys.procedure_name( + parameter_name => 'value', + another_parameter => 'value' +) +``` + +### Argument Passing + +Procedures support two ways to pass arguments: + +1. **Named Arguments** (recommended): + ```sql + CALL catalog.sys.procedure_name(parameter => 'value') + ``` + +2. **Positional Arguments**: + ```sql + CALL catalog.sys.procedure_name('value') + ``` + +Note: You cannot mix named and positional arguments in a single procedure call. + +## Available Procedures + +Currently, no procedures are implemented in this PR. This section will be updated when procedures are added. + +## Error Handling + +Procedures will throw exceptions in the following cases: + +- **Missing Required Parameters**: If a required parameter is not provided +- **Invalid Table Name**: If the specified table does not exist +- **Type Mismatch**: If a parameter value cannot be converted to the expected type +- **Permission Denied**: If the user does not have permission to perform the operation + +## Examples + +### Basic Usage + +```scala +// Start Spark with Fluss extensions +val spark = SparkSession.builder() + .config("spark.sql.extensions", "org.apache.fluss.spark.extensions.FlussSparkSessionExtensions") + .config("spark.sql.catalog.fluss_catalog", "org.apache.fluss.spark.SparkCatalog") + .config("spark.sql.catalog.fluss_catalog.bootstrap.servers", "localhost:9092") + .getOrCreate() + +// Create a table +spark.sql(""" + CREATE TABLE fluss_catalog.my_db.my_table ( + id INT, + name STRING, + age INT + ) USING fluss +""") + +// Procedures will be added here when implemented +``` + +## Implementation Notes + +- Procedures are executed synchronously and return results immediately +- The `sys` namespace is reserved for system procedures +- Custom procedures can be added by implementing the `Procedure` interface + +## See Also + +- [Fluss Spark Connector Documentation](../spark-connector.md) +- [Fluss Admin API](../admin-api.md) diff --git a/fluss-spark/fluss-spark-common/pom.xml b/fluss-spark/fluss-spark-common/pom.xml index e285b8bbc2..95b7c3bfbe 100644 --- a/fluss-spark/fluss-spark-common/pom.xml +++ b/fluss-spark/fluss-spark-common/pom.xml @@ -41,10 +41,32 @@ spark-catalyst_${scala.binary.version} ${spark.version} + + + org.antlr + antlr4-runtime + 4.9.3 + + + org.antlr + antlr4-maven-plugin + 4.9.3 + + + + antlr4 + + + + + true + + + org.apache.maven.plugins maven-shade-plugin diff --git a/fluss-spark/fluss-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/FlussSqlExtensions.g4 b/fluss-spark/fluss-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/FlussSqlExtensions.g4 new file mode 100644 index 0000000000..a693427caa --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/FlussSqlExtensions.g4 @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +grammar FlussSqlExtensions; + +@lexer::members { + public boolean isValidDecimal() { + int nextChar = _input.LA(1); + if (nextChar >= 'A' && nextChar <= 'Z' || nextChar >= '0' && nextChar <= '9' || + nextChar == '_') { + return false; + } else { + return true; + } + } + + public boolean isHint() { + int nextChar = _input.LA(1); + if (nextChar == '+') { + return true; + } else { + return false; + } + } +} + +singleStatement + : statement ';'* EOF + ; + +statement + : CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')' #call + ; + +callArgument + : expression #positionalArgument + | identifier '=>' expression #namedArgument + ; + +expression + : constant + ; + +constant + : number #numericLiteral + | booleanValue #booleanLiteral + | STRING+ #stringLiteral + | identifier STRING #typeConstructor + ; + +booleanValue + : TRUE | FALSE + ; + +number + : MINUS? EXPONENT_VALUE #exponentLiteral + | MINUS? DECIMAL_VALUE #decimalLiteral + | MINUS? INTEGER_VALUE #integerLiteral + | MINUS? BIGINT_LITERAL #bigIntLiteral + | MINUS? SMALLINT_LITERAL #smallIntLiteral + | MINUS? TINYINT_LITERAL #tinyIntLiteral + | MINUS? DOUBLE_LITERAL #doubleLiteral + | MINUS? FLOAT_LITERAL #floatLiteral + | MINUS? BIGDECIMAL_LITERAL #bigDecimalLiteral + ; + +multipartIdentifier + : parts+=identifier ('.' parts+=identifier)* + ; + +identifier + : IDENTIFIER #unquotedIdentifier + | quotedIdentifier #quotedIdentifierAlternative + | nonReserved #unquotedIdentifier + ; + +quotedIdentifier + : BACKQUOTED_IDENTIFIER + ; + +nonReserved + : CALL | TRUE | FALSE + ; + +// Keywords +CALL: 'CALL'; +TRUE: 'TRUE'; +FALSE: 'FALSE'; + +// Operators +MINUS: '-'; + +// Literals +STRING + : '\'' ( ~('\''|'\\') | ('\\' .) )* '\'' + | '"' ( ~('"'|'\\') | ('\\' .) )* '"' + ; + +BIGINT_LITERAL + : INTEGER_VALUE 'L' + ; + +SMALLINT_LITERAL + : INTEGER_VALUE 'S' + ; + +TINYINT_LITERAL + : INTEGER_VALUE 'Y' + ; + +INTEGER_VALUE + : DIGIT+ + ; + +EXPONENT_VALUE + : DIGIT+ EXPONENT + | DECIMAL_DIGITS EXPONENT {isValidDecimal()}? + ; + +DECIMAL_VALUE + : DECIMAL_DIGITS {isValidDecimal()}? + ; + +FLOAT_LITERAL + : DIGIT+ EXPONENT? 'F' + | DECIMAL_DIGITS EXPONENT? 'F' {isValidDecimal()}? + ; + +DOUBLE_LITERAL + : DIGIT+ EXPONENT? 'D' + | DECIMAL_DIGITS EXPONENT? 'D' {isValidDecimal()}? + ; + +BIGDECIMAL_LITERAL + : DIGIT+ EXPONENT? 'BD' + | DECIMAL_DIGITS EXPONENT? 'BD' {isValidDecimal()}? + ; + +IDENTIFIER + : (LETTER | DIGIT | '_')+ + ; + +BACKQUOTED_IDENTIFIER + : '`' ( ~'`' | '``' )* '`' + ; + +fragment DECIMAL_DIGITS + : DIGIT+ '.' DIGIT* + | '.' DIGIT+ + ; + +fragment EXPONENT + : 'E' [+-]? DIGIT+ + ; + +fragment DIGIT + : [0-9] + ; + +fragment LETTER + : [A-Z] + ; + +// Whitespace and comments +SIMPLE_COMMENT + : '--' ('\\\n' | ~[\r\n])* '\r'? '\n'? -> channel(HIDDEN) + ; + +BRACKETED_COMMENT + : '/*' {!isHint()}? (BRACKETED_COMMENT|.)*? '*/' -> channel(HIDDEN) + ; + +WS + : [ \r\n\t]+ -> channel(HIDDEN) + ; + +// Catch-all for any characters we didn't match +UNRECOGNIZED + : . + ; diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/SparkProcedures.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/SparkProcedures.java new file mode 100644 index 0000000000..237bf623a4 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/SparkProcedures.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark; + +import org.apache.fluss.spark.procedure.CompactProcedure; +import org.apache.fluss.spark.procedure.Procedure; +import org.apache.fluss.spark.procedure.ProcedureBuilder; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; + +/** The {@link Procedure}s including all the stored procedures. */ +public class SparkProcedures { + + private static final Map> BUILDERS = initProcedureBuilders(); + + private SparkProcedures() {} + + public static ProcedureBuilder newBuilder(String name) { + Supplier builderSupplier = BUILDERS.get(name.toLowerCase(Locale.ROOT)); + return builderSupplier != null ? builderSupplier.get() : null; + } + + public static Set names() { + return BUILDERS.keySet(); + } + + private static Map> initProcedureBuilders() { + Map> procedureBuilders = new HashMap<>(); + procedureBuilders.put("compact", CompactProcedure::builder); + return Collections.unmodifiableMap(procedureBuilders); + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/analysis/NoSuchProcedureException.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/analysis/NoSuchProcedureException.java new file mode 100644 index 0000000000..84d625f53b --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/analysis/NoSuchProcedureException.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.analysis; + +import org.apache.spark.sql.connector.catalog.Identifier; + +/** Exception thrown when a procedure cannot be found. */ +public class NoSuchProcedureException extends Exception { + + public NoSuchProcedureException(Identifier identifier) { + super("Procedure not found: " + identifier); + } + + public NoSuchProcedureException(String message) { + super(message); + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/catalog/ProcedureCatalog.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/catalog/ProcedureCatalog.java new file mode 100644 index 0000000000..a8db5a1016 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/catalog/ProcedureCatalog.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.catalog; + +import org.apache.fluss.spark.analysis.NoSuchProcedureException; +import org.apache.fluss.spark.procedure.Procedure; + +import org.apache.spark.sql.connector.catalog.Identifier; + +/** An interface that loads stored procedures called via CALL statements. */ +public interface ProcedureCatalog { + + /** + * Loads a {@link Procedure stored procedure} by {@link Identifier identifier}. + * + * @param identifier A stored procedure identifier. + * @return The procedure's metadata of given identifier. + * @throws NoSuchProcedureException Thrown, if there is no matching procedure stored. + */ + Procedure loadProcedure(Identifier identifier) throws NoSuchProcedureException; +} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/BaseProcedure.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/BaseProcedure.java new file mode 100644 index 0000000000..1100712a22 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/BaseProcedure.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.procedure; + +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.spark.SparkTable; + +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCatalog; + +/** A base class for procedure. */ +public abstract class BaseProcedure implements Procedure { + + private final TableCatalog tableCatalog; + + protected BaseProcedure(TableCatalog tableCatalog) { + this.tableCatalog = tableCatalog; + } + + protected Identifier toIdentifier(String identifierAsString, String argName) { + if (identifierAsString == null || identifierAsString.isEmpty()) { + throw new IllegalArgumentException( + "Cannot handle an empty identifier for argument " + argName); + } + + SparkSession spark = SparkSession.active(); + String[] multipartIdentifier = identifierAsString.split("\\."); + + if (multipartIdentifier.length == 1) { + String[] defaultNamespace = spark.sessionState().catalogManager().currentNamespace(); + return Identifier.of(defaultNamespace, multipartIdentifier[0]); + } else if (multipartIdentifier.length == 2) { + return Identifier.of(new String[] {multipartIdentifier[0]}, multipartIdentifier[1]); + } else { + throw new IllegalArgumentException( + "Invalid identifier format for argument " + + argName + + ": " + + identifierAsString); + } + } + + protected SparkTable loadSparkTable(Identifier ident) { + try { + Table table = tableCatalog.loadTable(ident); + if (!(table instanceof SparkTable)) { + throw new IllegalArgumentException( + ident + " is not a Fluss table: " + table.getClass().getName()); + } + return (SparkTable) table; + } catch (Exception e) { + String errMsg = + String.format( + "Couldn't load table '%s' in catalog '%s'", ident, tableCatalog.name()); + throw new RuntimeException(errMsg, e); + } + } + + protected Admin getAdmin(SparkTable table) { + if (table instanceof org.apache.fluss.spark.catalog.AbstractSparkTable) { + return ((org.apache.fluss.spark.catalog.AbstractSparkTable) table).admin(); + } + throw new IllegalArgumentException( + "Table is not an AbstractSparkTable: " + table.getClass().getName()); + } + + protected InternalRow newInternalRow(Object... values) { + return new GenericInternalRow(values); + } + + protected TableCatalog tableCatalog() { + return tableCatalog; + } + + protected TablePath toTablePath(Identifier ident) { + if (ident.namespace().length != 1) { + throw new IllegalArgumentException("Only single namespace is supported"); + } + return TablePath.of(ident.namespace()[0], ident.name()); + } + + protected abstract static class Builder implements ProcedureBuilder { + private TableCatalog tableCatalog; + + @Override + public Builder withTableCatalog(TableCatalog newTableCatalog) { + this.tableCatalog = newTableCatalog; + return this; + } + + @Override + public T build() { + return doBuild(); + } + + protected abstract T doBuild(); + + protected TableCatalog tableCatalog() { + return tableCatalog; + } + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/CompactProcedure.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/CompactProcedure.java new file mode 100644 index 0000000000..090b079e46 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/CompactProcedure.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.procedure; + +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.spark.SparkTable; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import static org.apache.spark.sql.types.DataTypes.StringType; + +/** + * Compact procedure. Usage: + * + *

+ *  CALL sys.compact(table => 'db.table')
+ * 
+ */ +public class CompactProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] {ProcedureParameter.required("table", StringType)}; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("result", DataTypes.StringType, true, Metadata.empty()) + }); + + protected CompactProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + SparkTable sparkTable = loadSparkTable(tableIdent); + + try { + TablePath tablePath = toTablePath(tableIdent); + Admin admin = getAdmin(sparkTable); + + // TODO: Implement actual compact logic via admin API + // For now, just return a success message + String message = String.format("Compact operation queued for table %s", tablePath); + + return new InternalRow[] { + newInternalRow(org.apache.spark.unsafe.types.UTF8String.fromString(message)) + }; + } catch (Exception e) { + throw new RuntimeException("Failed to compact table: " + e.getMessage(), e); + } + } + + @Override + public String description() { + return "This procedure triggers a compact action on a Fluss table."; + } + + public static ProcedureBuilder builder() { + return new BaseProcedure.Builder() { + @Override + public CompactProcedure doBuild() { + return new CompactProcedure(tableCatalog()); + } + }; + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/Procedure.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/Procedure.java new file mode 100644 index 0000000000..f8f7074171 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/Procedure.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.procedure; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; + +/** An interface that defines an executable stored procedure. */ +public interface Procedure { + + /** Returns the input parameters of stored procedure. */ + ProcedureParameter[] parameters(); + + /** Returns the type of rows produced by stored procedure. */ + StructType outputType(); + + /** + * Executes the given stored procedure. + * + * @param args Input arguments. + * @return The result of executing stored procedure with the given arguments. + */ + InternalRow[] call(InternalRow args); + + /** Returns the description of stored procedure. */ + default String description() { + return this.getClass().toString(); + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureBuilder.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureBuilder.java new file mode 100644 index 0000000000..4d5d4c8c41 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureBuilder.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.procedure; + +import org.apache.spark.sql.connector.catalog.TableCatalog; + +/** A builder of {@link Procedure} that builds a stored procedure. */ +public interface ProcedureBuilder { + + /** + * Returns a {@link ProcedureBuilder procedure builder} via given table catalog. + * + * @param tableCatalog The table catalog. + * @return The procedure builder with given catalog. + */ + ProcedureBuilder withTableCatalog(TableCatalog tableCatalog); + + /** + * Builds a {@link Procedure stored procedure}. + * + * @return The stored procedure. + */ + Procedure build(); +} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureParameter.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureParameter.java new file mode 100644 index 0000000000..b802671495 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureParameter.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.procedure; + +import org.apache.spark.sql.types.DataType; + +/** An input parameter of a {@link Procedure stored procedure}. */ +public interface ProcedureParameter { + + /** + * Creates a required input parameter. + * + * @param name The name of input parameter. + * @param dataType The data type of input parameter. + * @return The constructed stored procedure parameter. + */ + static ProcedureParameter required(String name, DataType dataType) { + return new ProcedureParameterImpl(name, dataType, true); + } + + /** + * Creates an optional input parameter. + * + * @param name The name of input parameter. + * @param dataType The data type of input parameter. + * @return The constructed optional stored procedure parameter. + */ + static ProcedureParameter optional(String name, DataType dataType) { + return new ProcedureParameterImpl(name, dataType, false); + } + + /** Returns the name of input parameter. */ + String name(); + + /** Returns the type of input parameter. */ + DataType dataType(); + + /** Returns whether input parameter is required. */ + boolean required(); +} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureParameterImpl.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureParameterImpl.java new file mode 100644 index 0000000000..6dd2e2aecd --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureParameterImpl.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.procedure; + +import org.apache.spark.sql.types.DataType; + +/** Implementation of {@link ProcedureParameter}. */ +class ProcedureParameterImpl implements ProcedureParameter { + + private final String name; + private final DataType dataType; + private final boolean required; + + ProcedureParameterImpl(String name, DataType dataType, boolean required) { + this.name = name; + this.dataType = dataType; + this.required = required; + } + + @Override + public String name() { + return name; + } + + @Override + public DataType dataType() { + return dataType; + } + + @Override + public boolean required() { + return required; + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala index 842ef9b395..4632a5038d 100644 --- a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala @@ -19,7 +19,9 @@ package org.apache.fluss.spark import org.apache.fluss.exception.{DatabaseNotExistException, TableAlreadyExistException, TableNotExistException} import org.apache.fluss.metadata.TablePath -import org.apache.fluss.spark.catalog.{SupportsFlussNamespaces, WithFlussAdmin} +import org.apache.fluss.spark.analysis.NoSuchProcedureException +import org.apache.fluss.spark.catalog.{ProcedureCatalog, SupportsFlussNamespaces, WithFlussAdmin} +import org.apache.fluss.spark.procedure.{Procedure, ProcedureBuilder} import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog, TableChange} @@ -32,9 +34,14 @@ import java.util.concurrent.ExecutionException import scala.collection.JavaConverters._ -class SparkCatalog extends TableCatalog with SupportsFlussNamespaces with WithFlussAdmin { +class SparkCatalog + extends TableCatalog + with SupportsFlussNamespaces + with WithFlussAdmin + with ProcedureCatalog { private var catalogName: String = "fluss" + private val SYSTEM_NAMESPACE = "sys" override def listTables(namespace: Array[String]): Array[Identifier] = { doNamespaceOperator(namespace) { @@ -104,6 +111,20 @@ class SparkCatalog extends TableCatalog with SupportsFlussNamespaces with WithFl override def name(): String = catalogName + override def loadProcedure(identifier: Identifier): Procedure = { + if (isSystemNamespace(identifier.namespace)) { + val builder: ProcedureBuilder = SparkProcedures.newBuilder(identifier.name) + if (builder != null) { + return builder.withTableCatalog(this).build() + } + } + throw new NoSuchProcedureException(identifier) + } + + private def isSystemNamespace(namespace: Array[String]): Boolean = { + namespace.length == 1 && namespace(0).equalsIgnoreCase(SYSTEM_NAMESPACE) + } + private def toTablePath(ident: Identifier): TablePath = { assert(ident.namespace().length == 1, "Only single namespace is supported") TablePath.of(ident.namespace().head, ident.name) diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalyst/analysis/FlussProcedureResolver.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalyst/analysis/FlussProcedureResolver.scala new file mode 100644 index 0000000000..fb0f197259 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalyst/analysis/FlussProcedureResolver.scala @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.catalyst.analysis + +import org.apache.fluss.spark.catalog.ProcedureCatalog +import org.apache.fluss.spark.catalyst.plans.logical._ +import org.apache.fluss.spark.procedure.ProcedureParameter + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, Literal} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier} + +import java.util.Locale + +/** Resolution rule for Fluss stored procedures. */ +case class FlussProcedureResolver(sparkSession: SparkSession) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { + case FlussCallStatement(nameParts, arguments) if nameParts.nonEmpty => + val (catalog, identifier) = resolveCatalogAndIdentifier(nameParts) + if (catalog == null || !catalog.isInstanceOf[ProcedureCatalog]) { + throw new RuntimeException(s"Catalog ${nameParts.head} is not a ProcedureCatalog") + } + + val procedureCatalog = catalog.asInstanceOf[ProcedureCatalog] + val procedure = procedureCatalog.loadProcedure(identifier) + val parameters = procedure.parameters + val normalizedParameters = normalizeParameters(parameters) + validateParameters(normalizedParameters) + val normalizedArguments = normalizeArguments(arguments) + FlussCallCommand( + procedure, + args = buildArgumentExpressions(normalizedParameters, normalizedArguments)) + + case call @ FlussCallCommand(procedure, arguments) if call.resolved => + val parameters = procedure.parameters + val newArguments = arguments.zipWithIndex.map { + case (argument, index) => + val parameter = parameters(index) + val parameterType = parameter.dataType + val argumentType = argument.dataType + if (parameterType != argumentType && !Cast.canUpCast(argumentType, parameterType)) { + throw new RuntimeException( + s"Cannot cast $argumentType to $parameterType of ${parameter.name}.") + } + if (parameterType != argumentType) { + Cast(argument, parameterType) + } else { + argument + } + } + + if (newArguments != arguments) { + call.copy(args = newArguments) + } else { + call + } + } + + private def resolveCatalogAndIdentifier(nameParts: Seq[String]): (CatalogPlugin, Identifier) = { + val catalogManager = sparkSession.sessionState.catalogManager + if (nameParts.length == 2) { + val catalogName = nameParts.head + val procedureName = nameParts(1) + val catalog = catalogManager.catalog(catalogName) + (catalog, Identifier.of(Array("sys"), procedureName)) + } else if (nameParts.length == 3) { + val catalogName = nameParts.head + val namespace = nameParts(1) + val procedureName = nameParts(2) + val catalog = catalogManager.catalog(catalogName) + (catalog, Identifier.of(Array(namespace), procedureName)) + } else { + throw new RuntimeException(s"Invalid procedure name: ${nameParts.mkString(".")}") + } + } + + private def normalizeParameters(parameters: Seq[ProcedureParameter]): Seq[ProcedureParameter] = { + parameters.map { + parameter => + val normalizedName = parameter.name.toLowerCase(Locale.ROOT) + if (parameter.required) { + ProcedureParameter.required(normalizedName, parameter.dataType) + } else { + ProcedureParameter.optional(normalizedName, parameter.dataType) + } + } + } + + private def validateParameters(parameters: Seq[ProcedureParameter]): Unit = { + val duplicateParamNames = parameters.groupBy(_.name).collect { + case (name, matchingParams) if matchingParams.length > 1 => name + } + if (duplicateParamNames.nonEmpty) { + throw new RuntimeException( + s"Parameter names ${duplicateParamNames.mkString("[", ",", "]")} are duplicated.") + } + parameters.sliding(2).foreach { + case Seq(previousParam, currentParam) if !previousParam.required && currentParam.required => + throw new RuntimeException( + s"Optional parameters should be after required ones but $currentParam is after $previousParam.") + case _ => + } + } + + private def normalizeArguments(arguments: Seq[FlussCallArgument]): Seq[FlussCallArgument] = { + arguments.map { + case a @ FlussNamedArgument(name, _) => a.copy(name = name.toLowerCase(Locale.ROOT)) + case other => other + } + } + + private def buildArgumentExpressions( + parameters: Seq[ProcedureParameter], + arguments: Seq[FlussCallArgument]): Seq[Expression] = { + val nameToPositionMap = parameters.map(_.name).zipWithIndex.toMap + val nameToArgumentMap = buildNameToArgumentMap(parameters, arguments, nameToPositionMap) + val missingParamNames = parameters.filter(_.required).collect { + case parameter if !nameToArgumentMap.contains(parameter.name) => parameter.name + } + if (missingParamNames.nonEmpty) { + throw new RuntimeException( + s"Required parameters ${missingParamNames.mkString("[", ",", "]")} are missing.") + } + val argumentExpressions = new Array[Expression](parameters.size) + nameToArgumentMap.foreach { + case (name, argument) => argumentExpressions(nameToPositionMap(name)) = argument.expr + } + parameters.foreach { + case parameter if !parameter.required && !nameToArgumentMap.contains(parameter.name) => + argumentExpressions(nameToPositionMap(parameter.name)) = + Literal.create(null, parameter.dataType) + case _ => + } + argumentExpressions.toSeq + } + + private def buildNameToArgumentMap( + parameters: Seq[ProcedureParameter], + arguments: Seq[FlussCallArgument], + nameToPositionMap: Map[String, Int]): Map[String, FlussCallArgument] = { + val isNamedArgument = arguments.exists(_.isInstanceOf[FlussNamedArgument]) + val isPositionalArgument = arguments.exists(_.isInstanceOf[FlussPositionalArgument]) + + if (isNamedArgument && isPositionalArgument) { + throw new RuntimeException("Cannot mix named and positional arguments.") + } + + if (isNamedArgument) { + val namedArguments = arguments.asInstanceOf[Seq[FlussNamedArgument]] + val validationErrors = namedArguments.groupBy(_.name).collect { + case (name, procedureArguments) if procedureArguments.size > 1 => + s"Procedure argument $name is duplicated." + case (name, _) if !nameToPositionMap.contains(name) => s"Argument $name is unknown." + } + if (validationErrors.nonEmpty) { + throw new RuntimeException(s"Invalid arguments: ${validationErrors.mkString(", ")}") + } + namedArguments.map(arg => arg.name -> arg).toMap + } else { + if (arguments.size > parameters.size) { + throw new RuntimeException("Too many arguments for procedure") + } + arguments.zipWithIndex.map { + case (argument, position) => + val param = parameters(position) + param.name -> argument + }.toMap + } + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalyst/plans/logical/FlussCallCommand.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalyst/plans/logical/FlussCallCommand.scala new file mode 100644 index 0000000000..f0ea0265d6 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalyst/plans/logical/FlussCallCommand.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.catalyst.plans.logical + +import org.apache.fluss.spark.procedure.Procedure + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.LeafCommand +import org.apache.spark.sql.catalyst.util.truncatedString + +/** A CALL statement that needs to be resolved to a procedure. */ +case class FlussCallStatement(name: Seq[String], args: Seq[FlussCallArgument]) extends LeafCommand { + override def output: Seq[Attribute] = Seq.empty +} + +/** Base trait for CALL statement arguments. */ +sealed trait FlussCallArgument { + def expr: Expression +} + +/** A positional argument in a stored procedure call. */ +case class FlussPositionalArgument(expr: Expression) extends FlussCallArgument + +/** A named argument in a stored procedure call. */ +case class FlussNamedArgument(name: String, expr: Expression) extends FlussCallArgument + +/** A CALL command that has been resolved to a specific procedure. */ +case class FlussCallCommand(procedure: Procedure, args: Seq[Expression]) extends LeafCommand { + + override lazy val output: Seq[Attribute] = + procedure.outputType.map( + field => AttributeReference(field.name, field.dataType, field.nullable, field.metadata)()) + + override def simpleString(maxFields: Int): String = { + s"Call${truncatedString(output, "[", ", ", "]", maxFields)} ${procedure.description}" + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/execution/CallProcedureExec.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/execution/CallProcedureExec.scala new file mode 100644 index 0000000000..1ed8949a21 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/execution/CallProcedureExec.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.execution + +import org.apache.fluss.spark.procedure.Procedure + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, GenericInternalRow, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.execution.SparkPlan + +/** Physical plan node for executing a stored procedure. */ +case class CallProcedureExec(output: Seq[Attribute], procedure: Procedure, args: Seq[Expression]) + extends SparkPlan { + + override def children: Seq[SparkPlan] = Seq.empty + + override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan = { + copy() + } + + override protected def doExecute(): org.apache.spark.rdd.RDD[InternalRow] = { + val argumentValues = new Array[Any](args.length) + args.zipWithIndex.foreach { + case (arg, index) => + argumentValues(index) = arg.eval(null) + } + + val argRow = new GenericInternalRow(argumentValues) + val resultRows = procedure.call(argRow) + + sparkContext.parallelize(resultRows) + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/execution/FlussStrategy.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/execution/FlussStrategy.scala new file mode 100644 index 0000000000..e4d3d4117a --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/execution/FlussStrategy.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.execution + +import org.apache.fluss.spark.catalyst.plans.logical.FlussCallCommand + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} + +/** Execution strategy for Fluss procedure calls. */ +case class FlussStrategy(spark: SparkSession) extends SparkStrategy { + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case call: FlussCallCommand => + CallProcedureExec(call.output, call.procedure, call.args) :: Nil + case _ => Nil + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/extensions/FlussSparkSessionExtensions.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/extensions/FlussSparkSessionExtensions.scala new file mode 100644 index 0000000000..a041e92943 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/extensions/FlussSparkSessionExtensions.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.extensions + +import org.apache.fluss.spark.catalyst.analysis.FlussProcedureResolver +import org.apache.fluss.spark.execution.FlussStrategy + +import org.apache.spark.sql.SparkSessionExtensions +import org.apache.spark.sql.catalyst.parser.extensions.FlussSparkSqlParser + +/** Spark session extensions for Fluss. */ +class FlussSparkSessionExtensions extends (SparkSessionExtensions => Unit) { + + override def apply(extensions: SparkSessionExtensions): Unit = { + // parser extensions + extensions.injectParser { case (_, parser) => new FlussSparkSqlParser(parser) } + + // analyzer extensions + extensions.injectResolutionRule(spark => FlussProcedureResolver(spark)) + + // planner extensions + extensions.injectPlannerStrategy(spark => FlussStrategy(spark)) + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/FlussSqlExtensionsAstBuilder.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/FlussSqlExtensionsAstBuilder.scala new file mode 100644 index 0000000000..f1327a5e49 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/FlussSqlExtensionsAstBuilder.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.parser.extensions + +import org.apache.fluss.spark.catalyst.plans.logical._ + +import org.antlr.v4.runtime._ +import org.antlr.v4.runtime.misc.Interval +import org.antlr.v4.runtime.tree.{ParseTree, TerminalNode} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.parser.extensions.FlussSqlExtensionsParser._ +import org.apache.spark.sql.catalyst.plans.logical._ + +import scala.collection.JavaConverters._ + +/** + * The AST Builder for Fluss SQL extensions. + * + * @param delegate + * The extension parser. + */ +class FlussSqlExtensionsAstBuilder(delegate: ParserInterface) + extends FlussSqlExtensionsBaseVisitor[AnyRef] + with Logging { + + /** Creates a single statement of extension statements. */ + override def visitSingleStatement(ctx: SingleStatementContext): LogicalPlan = withOrigin(ctx) { + visit(ctx.statement).asInstanceOf[LogicalPlan] + } + + /** Creates a [[FlussCallStatement]] for a stored procedure call. */ + override def visitCall(ctx: CallContext): FlussCallStatement = withOrigin(ctx) { + val name = + ctx.multipartIdentifier.parts.asScala.map(part => cleanIdentifier(part.getText)).toSeq + val args = ctx.callArgument.asScala.map(typedVisit[FlussCallArgument]).toSeq + FlussCallStatement(name, args) + } + + /** Creates a positional argument in a stored procedure call. */ + override def visitPositionalArgument(ctx: PositionalArgumentContext): FlussCallArgument = + withOrigin(ctx) { + val expression = typedVisit[Expression](ctx.expression) + FlussPositionalArgument(expression) + } + + /** Creates a named argument in a stored procedure call. */ + override def visitNamedArgument(ctx: NamedArgumentContext): FlussCallArgument = withOrigin(ctx) { + val name = cleanIdentifier(ctx.identifier.getText) + val expression = typedVisit[Expression](ctx.expression) + FlussNamedArgument(name, expression) + } + + /** Creates a [[Expression]] in a positional and named argument. */ + override def visitExpression(ctx: ExpressionContext): Expression = { + val sqlString = reconstructSqlString(ctx) + delegate.parseExpression(sqlString) + } + + /** Returns a multi-part identifier as Seq[String]. */ + override def visitMultipartIdentifier(ctx: MultipartIdentifierContext): Seq[String] = + withOrigin(ctx) { + ctx.parts.asScala.map(part => cleanIdentifier(part.getText)).toSeq + } + + /** Remove backticks from identifier. */ + private def cleanIdentifier(ident: String): String = { + if (ident.startsWith("`") && ident.endsWith("`")) { + ident.substring(1, ident.length - 1) + } else { + ident + } + } + + private def reconstructSqlString(ctx: ParserRuleContext): String = { + ctx.children.asScala + .map { + case c: ParserRuleContext => reconstructSqlString(c) + case t: TerminalNode => t.getText + } + .mkString(" ") + } + + private def typedVisit[T](ctx: ParseTree): T = + ctx.accept(this).asInstanceOf[T] + + private def withOrigin[T](ctx: ParserRuleContext)(f: => T): T = { + val current = CurrentOrigin.get + CurrentOrigin.set(position(ctx.getStart)) + try { + f + } finally { + CurrentOrigin.set(current) + } + } + + private def position(token: Token): Origin = { + val opt = Option(token) + Origin(opt.map(_.getLine), opt.map(_.getCharPositionInLine)) + } +} + +case class Origin( + line: Option[Int] = None, + startPosition: Option[Int] = None, + startIndex: Option[Int] = None, + stopIndex: Option[Int] = None, + sqlText: Option[String] = None, + objectType: Option[String] = None, + objectName: Option[String] = None) + +object CurrentOrigin { + private val value = new ThreadLocal[Origin]() { + override def initialValue: Origin = Origin() + } + + def get: Origin = value.get() + def set(o: Origin): Unit = value.set(o) + def reset(): Unit = value.set(Origin()) + + def withOrigin[A](o: Origin)(f: => A): A = { + val previous = get + set(o) + val ret = + try f + finally { set(previous) } + ret + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/FlussSqlExtensionsParser.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/FlussSqlExtensionsParser.scala new file mode 100644 index 0000000000..57d8d0f346 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/FlussSqlExtensionsParser.scala @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.parser.extensions + +import org.antlr.v4.runtime._ +import org.antlr.v4.runtime.atn.PredictionMode +import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException} +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.types.{DataType, StructType} + +/** + * Parser extension for Fluss SQL extensions. + * + * @param delegate + * The main Spark SQL parser. + */ +class FlussSparkSqlParser(delegate: ParserInterface) extends ParserInterface { + + private lazy val astBuilder = new FlussSqlExtensionsAstBuilder(delegate) + + override def parsePlan(sqlText: String): LogicalPlan = { + try { + parse(sqlText)(parser => astBuilder.visitSingleStatement(parser.singleStatement())) + } catch { + case _: ParseException | _: ParseCancellationException => + delegate.parsePlan(sqlText) + } + } + + override def parseQuery(sqlText: String): LogicalPlan = parsePlan(sqlText) + + override def parseExpression(sqlText: String): Expression = { + delegate.parseExpression(sqlText) + } + + override def parseTableIdentifier(sqlText: String): TableIdentifier = { + delegate.parseTableIdentifier(sqlText) + } + + override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = { + delegate.parseFunctionIdentifier(sqlText) + } + + override def parseMultipartIdentifier(sqlText: String): Seq[String] = { + delegate.parseMultipartIdentifier(sqlText) + } + + override def parseTableSchema(sqlText: String): StructType = { + delegate.parseTableSchema(sqlText) + } + + override def parseDataType(sqlText: String): DataType = { + delegate.parseDataType(sqlText) + } + + private def parse[T](sqlText: String)( + toResult: org.apache.spark.sql.catalyst.parser.extensions.FlussSqlExtensionsParser => T) + : T = { + val lexer = new FlussSqlExtensionsLexer( + new UpperCaseCharStream(CharStreams.fromString(sqlText))) + lexer.removeErrorListeners() + lexer.addErrorListener(FlussParseErrorListener) + + val tokenStream = new CommonTokenStream(lexer) + val parser = + new org.apache.spark.sql.catalyst.parser.extensions.FlussSqlExtensionsParser(tokenStream) + parser.removeErrorListeners() + parser.addErrorListener(FlussParseErrorListener) + + try { + try { + parser.getInterpreter.setPredictionMode(PredictionMode.SLL) + toResult(parser) + } catch { + case _: ParseCancellationException => + tokenStream.seek(0) + parser.reset() + parser.getInterpreter.setPredictionMode(PredictionMode.LL) + toResult(parser) + } + } catch { + case e: ParseException if e.command.isDefined => + throw e + case e: ParseException => + throw e.withCommand(sqlText) + case e: AnalysisException => + val position = org.apache.spark.sql.catalyst.trees.Origin(e.line, e.startPosition) + throw new ParseException(Option(sqlText), e.message, position, position) + } + } +} + +class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream { + override def consume(): Unit = wrapped.consume() + override def getSourceName: String = wrapped.getSourceName + override def index(): Int = wrapped.index() + override def mark(): Int = wrapped.mark() + override def release(marker: Int): Unit = wrapped.release(marker) + override def seek(where: Int): Unit = wrapped.seek(where) + override def size(): Int = wrapped.size() + + override def getText(interval: Interval): String = { + wrapped.getText(interval) + } + + override def LA(i: Int): Int = { + val la = wrapped.LA(i) + if (la == 0 || la == IntStream.EOF) la + else Character.toUpperCase(la) + } +} + +object FlussParseErrorListener extends BaseErrorListener { + override def syntaxError( + recognizer: Recognizer[_, _], + offendingSymbol: scala.Any, + line: Int, + charPositionInLine: Int, + msg: String, + e: RecognitionException): Unit = { + val position = org.apache.spark.sql.catalyst.trees.Origin(Some(line), Some(charPositionInLine)) + throw new ParseException(None, msg, position, position) + } +} diff --git a/fluss-spark/fluss-spark-ut/src/test/java/org/apache/fluss/spark/extensions/CallStatementParserTest.java b/fluss-spark/fluss-spark-ut/src/test/java/org/apache/fluss/spark/extensions/CallStatementParserTest.java new file mode 100644 index 0000000000..c7523816f6 --- /dev/null +++ b/fluss-spark/fluss-spark-ut/src/test/java/org/apache/fluss/spark/extensions/CallStatementParserTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.extensions; + +import org.apache.fluss.spark.catalyst.plans.logical.FlussCallArgument; +import org.apache.fluss.spark.catalyst.plans.logical.FlussCallStatement; +import org.apache.fluss.spark.catalyst.plans.logical.FlussNamedArgument; +import org.apache.fluss.spark.catalyst.plans.logical.FlussPositionalArgument; + +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.expressions.Literal$; +import org.apache.spark.sql.catalyst.parser.ParseException; +import org.apache.spark.sql.catalyst.parser.ParserInterface; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.Arrays; + +import scala.Option; +import scala.collection.JavaConverters; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link FlussCallStatement} of {@link FlussSparkSessionExtensions}. */ +public class CallStatementParserTest { + + private SparkSession spark = null; + private ParserInterface parser = null; + + @BeforeEach + public void startSparkSession() { + // Stops and clears active session to avoid loading previous non-stopped session. + Option optionalSession = + SparkSession.getActiveSession().orElse(SparkSession::getDefaultSession); + if (!optionalSession.isEmpty()) { + optionalSession.get().stop(); + } + SparkSession.clearActiveSession(); + spark = + SparkSession.builder() + .master("local[2]") + .config("spark.sql.extensions", FlussSparkSessionExtensions.class.getName()) + .getOrCreate(); + parser = spark.sessionState().sqlParser(); + } + + @AfterEach + public void stopSparkSession() { + if (spark != null) { + spark.stop(); + spark = null; + parser = null; + } + } + + @Test + public void testCallWithBackticks() throws ParseException { + FlussCallStatement call = + (FlussCallStatement) parser.parsePlan("CALL cat.`system`.`no_args_func`()"); + assertThat(JavaConverters.seqAsJavaList(call.name())) + .isEqualTo(Arrays.asList("cat", "system", "no_args_func")); + assertThat(call.args().size()).isEqualTo(0); + } + + @Test + public void testCallWithNamedArguments() throws ParseException { + FlussCallStatement callStatement = + (FlussCallStatement) + parser.parsePlan( + "CALL catalog.system.named_args_func(arg1 => 1, arg2 => 'test', arg3 => true)"); + assertThat(JavaConverters.seqAsJavaList(callStatement.name())) + .isEqualTo(Arrays.asList("catalog", "system", "named_args_func")); + assertThat(callStatement.args().size()).isEqualTo(3); + assertArgument(callStatement, 0, "arg1", 1, DataTypes.IntegerType); + assertArgument(callStatement, 1, "arg2", "test", DataTypes.StringType); + assertArgument(callStatement, 2, "arg3", true, DataTypes.BooleanType); + } + + @Test + public void testCallWithPositionalArguments() throws ParseException { + FlussCallStatement callStatement = + (FlussCallStatement) + parser.parsePlan( + "CALL catalog.system.positional_args_func(1, '${spark.sql.extensions}', 2L, true, 3.0D, 4.0e1, 500e-1BD, TIMESTAMP '2017-02-03T10:37:30.00Z')"); + assertThat(JavaConverters.seqAsJavaList(callStatement.name())) + .isEqualTo(Arrays.asList("catalog", "system", "positional_args_func")); + assertThat(callStatement.args().size()).isEqualTo(8); + assertArgument(callStatement, 0, 1, DataTypes.IntegerType); + assertArgument( + callStatement, + 1, + FlussSparkSessionExtensions.class.getName(), + DataTypes.StringType); + assertArgument(callStatement, 2, 2L, DataTypes.LongType); + assertArgument(callStatement, 3, true, DataTypes.BooleanType); + assertArgument(callStatement, 4, 3.0D, DataTypes.DoubleType); + assertArgument(callStatement, 5, 4.0e1, DataTypes.DoubleType); + assertArgument( + callStatement, 6, new BigDecimal("500e-1"), DataTypes.createDecimalType(3, 1)); + assertArgument( + callStatement, + 7, + Timestamp.from(Instant.parse("2017-02-03T10:37:30.00Z")), + DataTypes.TimestampType); + } + + @Test + public void testCallWithMixedArguments() throws ParseException { + FlussCallStatement callStatement = + (FlussCallStatement) + parser.parsePlan("CALL catalog.system.mixed_args_func(arg1 => 1, 'test')"); + assertThat(JavaConverters.seqAsJavaList(callStatement.name())) + .isEqualTo(Arrays.asList("catalog", "system", "mixed_args_func")); + assertThat(callStatement.args().size()).isEqualTo(2); + assertArgument(callStatement, 0, "arg1", 1, DataTypes.IntegerType); + assertArgument(callStatement, 1, "test", DataTypes.StringType); + } + + @Test + public void testCallSimpleProcedure() throws ParseException { + FlussCallStatement callStatement = + (FlussCallStatement) + parser.parsePlan("CALL system.simple_procedure(table => 'db.table')"); + assertThat(JavaConverters.seqAsJavaList(callStatement.name())) + .isEqualTo(Arrays.asList("system", "simple_procedure")); + assertThat(callStatement.args().size()).isEqualTo(1); + assertArgument(callStatement, 0, "table", "db.table", DataTypes.StringType); + } + + private void assertArgument( + FlussCallStatement call, int index, Object expectedValue, DataType expectedType) { + assertArgument(call, index, null, expectedValue, expectedType); + } + + private void assertArgument( + FlussCallStatement callStatement, + int index, + String expectedName, + Object expectedValue, + DataType expectedType) { + if (expectedName == null) { + FlussCallArgument callArgument = callStatement.args().apply(index); + assertCast(callArgument, FlussPositionalArgument.class); + } else { + FlussNamedArgument namedArgument = + assertCast(callStatement.args().apply(index), FlussNamedArgument.class); + assertThat(namedArgument.name()).isEqualTo(expectedName); + } + assertThat(callStatement.args().apply(index).expr()) + .isEqualTo(Literal$.MODULE$.create(expectedValue, expectedType)); + } + + private T assertCast(Object value, Class expectedClass) { + assertThat(value).isInstanceOf(expectedClass); + return expectedClass.cast(value); + } +} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala index 2de158b7db..23b62f7824 100644 --- a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala @@ -25,6 +25,7 @@ import org.apache.fluss.config.{ConfigOptions, Configuration} import org.apache.fluss.metadata.{TableDescriptor, TablePath} import org.apache.fluss.row.InternalRow import org.apache.fluss.server.testutils.FlussClusterExtension +import org.apache.fluss.spark.extensions.FlussSparkSessionExtensions import org.apache.spark.SparkConf import org.apache.spark.sql.QueryTest @@ -50,6 +51,7 @@ class FlussSparkTestBase extends QueryTest with SharedSparkSession { .set(s"spark.sql.catalog.$DEFAULT_CATALOG", classOf[SparkCatalog].getName) .set(s"spark.sql.catalog.$DEFAULT_CATALOG.bootstrap.servers", bootstrapServers) .set("spark.sql.defaultCatalog", DEFAULT_CATALOG) + .set("spark.sql.extensions", classOf[FlussSparkSessionExtensions].getName) } override protected def beforeAll(): Unit = { From 0b2788cd7df6e96bec385d737f1dc8f4c24a9a1f Mon Sep 17 00:00:00 2001 From: forwardxu Date: Mon, 19 Jan 2026 11:03:28 +0800 Subject: [PATCH 2/2] [spark] Support parser of Spark call procedure command --- .../apache/fluss/spark/SparkProcedures.java | 53 ------ .../analysis/NoSuchProcedureException.java | 33 ---- .../fluss/spark/catalog/ProcedureCatalog.java | 37 ---- .../fluss/spark/procedure/BaseProcedure.java | 123 ------------ .../spark/procedure/CompactProcedure.java | 101 ---------- .../fluss/spark/procedure/Procedure.java | 45 ----- .../spark/procedure/ProcedureBuilder.java | 40 ---- .../spark/procedure/ProcedureParameter.java | 56 ------ .../procedure/ProcedureParameterImpl.java | 50 ----- .../apache/fluss/spark/SparkProcedures.scala | 40 ++++ .../analysis/NoSuchProcedureException.scala | 27 +++ .../spark/catalog/ProcedureCatalog.scala | 29 +++ .../fluss/spark/procedure/BaseProcedure.scala | 104 ++++++++++ .../spark/procedure/CompactProcedure.scala | 76 ++++++++ .../fluss/spark/procedure/Procedure.scala | 32 ++++ .../spark/procedure/ProcedureBuilder.scala | 27 +++ .../spark/procedure/ProcedureParameter.scala | 51 +++++ .../extensions/CallStatementParserTest.java | 178 ------------------ .../extensions/CallStatementParserTest.scala | 153 +++++++++++++++ 19 files changed, 539 insertions(+), 716 deletions(-) delete mode 100644 fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/SparkProcedures.java delete mode 100644 fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/analysis/NoSuchProcedureException.java delete mode 100644 fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/catalog/ProcedureCatalog.java delete mode 100644 fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/BaseProcedure.java delete mode 100644 fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/CompactProcedure.java delete mode 100644 fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/Procedure.java delete mode 100644 fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureBuilder.java delete mode 100644 fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureParameter.java delete mode 100644 fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureParameterImpl.java create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkProcedures.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/analysis/NoSuchProcedureException.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/ProcedureCatalog.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/BaseProcedure.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/CompactProcedure.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/Procedure.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ProcedureBuilder.scala create mode 100644 fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ProcedureParameter.scala delete mode 100644 fluss-spark/fluss-spark-ut/src/test/java/org/apache/fluss/spark/extensions/CallStatementParserTest.java create mode 100644 fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/extensions/CallStatementParserTest.scala diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/SparkProcedures.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/SparkProcedures.java deleted file mode 100644 index 237bf623a4..0000000000 --- a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/SparkProcedures.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.spark; - -import org.apache.fluss.spark.procedure.CompactProcedure; -import org.apache.fluss.spark.procedure.Procedure; -import org.apache.fluss.spark.procedure.ProcedureBuilder; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.function.Supplier; - -/** The {@link Procedure}s including all the stored procedures. */ -public class SparkProcedures { - - private static final Map> BUILDERS = initProcedureBuilders(); - - private SparkProcedures() {} - - public static ProcedureBuilder newBuilder(String name) { - Supplier builderSupplier = BUILDERS.get(name.toLowerCase(Locale.ROOT)); - return builderSupplier != null ? builderSupplier.get() : null; - } - - public static Set names() { - return BUILDERS.keySet(); - } - - private static Map> initProcedureBuilders() { - Map> procedureBuilders = new HashMap<>(); - procedureBuilders.put("compact", CompactProcedure::builder); - return Collections.unmodifiableMap(procedureBuilders); - } -} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/analysis/NoSuchProcedureException.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/analysis/NoSuchProcedureException.java deleted file mode 100644 index 84d625f53b..0000000000 --- a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/analysis/NoSuchProcedureException.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.spark.analysis; - -import org.apache.spark.sql.connector.catalog.Identifier; - -/** Exception thrown when a procedure cannot be found. */ -public class NoSuchProcedureException extends Exception { - - public NoSuchProcedureException(Identifier identifier) { - super("Procedure not found: " + identifier); - } - - public NoSuchProcedureException(String message) { - super(message); - } -} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/catalog/ProcedureCatalog.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/catalog/ProcedureCatalog.java deleted file mode 100644 index a8db5a1016..0000000000 --- a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/catalog/ProcedureCatalog.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.spark.catalog; - -import org.apache.fluss.spark.analysis.NoSuchProcedureException; -import org.apache.fluss.spark.procedure.Procedure; - -import org.apache.spark.sql.connector.catalog.Identifier; - -/** An interface that loads stored procedures called via CALL statements. */ -public interface ProcedureCatalog { - - /** - * Loads a {@link Procedure stored procedure} by {@link Identifier identifier}. - * - * @param identifier A stored procedure identifier. - * @return The procedure's metadata of given identifier. - * @throws NoSuchProcedureException Thrown, if there is no matching procedure stored. - */ - Procedure loadProcedure(Identifier identifier) throws NoSuchProcedureException; -} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/BaseProcedure.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/BaseProcedure.java deleted file mode 100644 index 1100712a22..0000000000 --- a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/BaseProcedure.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.spark.procedure; - -import org.apache.fluss.client.admin.Admin; -import org.apache.fluss.metadata.TablePath; -import org.apache.fluss.spark.SparkTable; - -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.sql.connector.catalog.Identifier; -import org.apache.spark.sql.connector.catalog.Table; -import org.apache.spark.sql.connector.catalog.TableCatalog; - -/** A base class for procedure. */ -public abstract class BaseProcedure implements Procedure { - - private final TableCatalog tableCatalog; - - protected BaseProcedure(TableCatalog tableCatalog) { - this.tableCatalog = tableCatalog; - } - - protected Identifier toIdentifier(String identifierAsString, String argName) { - if (identifierAsString == null || identifierAsString.isEmpty()) { - throw new IllegalArgumentException( - "Cannot handle an empty identifier for argument " + argName); - } - - SparkSession spark = SparkSession.active(); - String[] multipartIdentifier = identifierAsString.split("\\."); - - if (multipartIdentifier.length == 1) { - String[] defaultNamespace = spark.sessionState().catalogManager().currentNamespace(); - return Identifier.of(defaultNamespace, multipartIdentifier[0]); - } else if (multipartIdentifier.length == 2) { - return Identifier.of(new String[] {multipartIdentifier[0]}, multipartIdentifier[1]); - } else { - throw new IllegalArgumentException( - "Invalid identifier format for argument " - + argName - + ": " - + identifierAsString); - } - } - - protected SparkTable loadSparkTable(Identifier ident) { - try { - Table table = tableCatalog.loadTable(ident); - if (!(table instanceof SparkTable)) { - throw new IllegalArgumentException( - ident + " is not a Fluss table: " + table.getClass().getName()); - } - return (SparkTable) table; - } catch (Exception e) { - String errMsg = - String.format( - "Couldn't load table '%s' in catalog '%s'", ident, tableCatalog.name()); - throw new RuntimeException(errMsg, e); - } - } - - protected Admin getAdmin(SparkTable table) { - if (table instanceof org.apache.fluss.spark.catalog.AbstractSparkTable) { - return ((org.apache.fluss.spark.catalog.AbstractSparkTable) table).admin(); - } - throw new IllegalArgumentException( - "Table is not an AbstractSparkTable: " + table.getClass().getName()); - } - - protected InternalRow newInternalRow(Object... values) { - return new GenericInternalRow(values); - } - - protected TableCatalog tableCatalog() { - return tableCatalog; - } - - protected TablePath toTablePath(Identifier ident) { - if (ident.namespace().length != 1) { - throw new IllegalArgumentException("Only single namespace is supported"); - } - return TablePath.of(ident.namespace()[0], ident.name()); - } - - protected abstract static class Builder implements ProcedureBuilder { - private TableCatalog tableCatalog; - - @Override - public Builder withTableCatalog(TableCatalog newTableCatalog) { - this.tableCatalog = newTableCatalog; - return this; - } - - @Override - public T build() { - return doBuild(); - } - - protected abstract T doBuild(); - - protected TableCatalog tableCatalog() { - return tableCatalog; - } - } -} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/CompactProcedure.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/CompactProcedure.java deleted file mode 100644 index 090b079e46..0000000000 --- a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/CompactProcedure.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.spark.procedure; - -import org.apache.fluss.client.admin.Admin; -import org.apache.fluss.metadata.TablePath; -import org.apache.fluss.spark.SparkTable; - -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.connector.catalog.Identifier; -import org.apache.spark.sql.connector.catalog.TableCatalog; -import org.apache.spark.sql.types.DataTypes; -import org.apache.spark.sql.types.Metadata; -import org.apache.spark.sql.types.StructField; -import org.apache.spark.sql.types.StructType; - -import static org.apache.spark.sql.types.DataTypes.StringType; - -/** - * Compact procedure. Usage: - * - *

- *  CALL sys.compact(table => 'db.table')
- * 
- */ -public class CompactProcedure extends BaseProcedure { - - private static final ProcedureParameter[] PARAMETERS = - new ProcedureParameter[] {ProcedureParameter.required("table", StringType)}; - - private static final StructType OUTPUT_TYPE = - new StructType( - new StructField[] { - new StructField("result", DataTypes.StringType, true, Metadata.empty()) - }); - - protected CompactProcedure(TableCatalog tableCatalog) { - super(tableCatalog); - } - - @Override - public ProcedureParameter[] parameters() { - return PARAMETERS; - } - - @Override - public StructType outputType() { - return OUTPUT_TYPE; - } - - @Override - public InternalRow[] call(InternalRow args) { - Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - SparkTable sparkTable = loadSparkTable(tableIdent); - - try { - TablePath tablePath = toTablePath(tableIdent); - Admin admin = getAdmin(sparkTable); - - // TODO: Implement actual compact logic via admin API - // For now, just return a success message - String message = String.format("Compact operation queued for table %s", tablePath); - - return new InternalRow[] { - newInternalRow(org.apache.spark.unsafe.types.UTF8String.fromString(message)) - }; - } catch (Exception e) { - throw new RuntimeException("Failed to compact table: " + e.getMessage(), e); - } - } - - @Override - public String description() { - return "This procedure triggers a compact action on a Fluss table."; - } - - public static ProcedureBuilder builder() { - return new BaseProcedure.Builder() { - @Override - public CompactProcedure doBuild() { - return new CompactProcedure(tableCatalog()); - } - }; - } -} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/Procedure.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/Procedure.java deleted file mode 100644 index f8f7074171..0000000000 --- a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/Procedure.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.spark.procedure; - -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.types.StructType; - -/** An interface that defines an executable stored procedure. */ -public interface Procedure { - - /** Returns the input parameters of stored procedure. */ - ProcedureParameter[] parameters(); - - /** Returns the type of rows produced by stored procedure. */ - StructType outputType(); - - /** - * Executes the given stored procedure. - * - * @param args Input arguments. - * @return The result of executing stored procedure with the given arguments. - */ - InternalRow[] call(InternalRow args); - - /** Returns the description of stored procedure. */ - default String description() { - return this.getClass().toString(); - } -} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureBuilder.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureBuilder.java deleted file mode 100644 index 4d5d4c8c41..0000000000 --- a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureBuilder.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.spark.procedure; - -import org.apache.spark.sql.connector.catalog.TableCatalog; - -/** A builder of {@link Procedure} that builds a stored procedure. */ -public interface ProcedureBuilder { - - /** - * Returns a {@link ProcedureBuilder procedure builder} via given table catalog. - * - * @param tableCatalog The table catalog. - * @return The procedure builder with given catalog. - */ - ProcedureBuilder withTableCatalog(TableCatalog tableCatalog); - - /** - * Builds a {@link Procedure stored procedure}. - * - * @return The stored procedure. - */ - Procedure build(); -} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureParameter.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureParameter.java deleted file mode 100644 index b802671495..0000000000 --- a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureParameter.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.spark.procedure; - -import org.apache.spark.sql.types.DataType; - -/** An input parameter of a {@link Procedure stored procedure}. */ -public interface ProcedureParameter { - - /** - * Creates a required input parameter. - * - * @param name The name of input parameter. - * @param dataType The data type of input parameter. - * @return The constructed stored procedure parameter. - */ - static ProcedureParameter required(String name, DataType dataType) { - return new ProcedureParameterImpl(name, dataType, true); - } - - /** - * Creates an optional input parameter. - * - * @param name The name of input parameter. - * @param dataType The data type of input parameter. - * @return The constructed optional stored procedure parameter. - */ - static ProcedureParameter optional(String name, DataType dataType) { - return new ProcedureParameterImpl(name, dataType, false); - } - - /** Returns the name of input parameter. */ - String name(); - - /** Returns the type of input parameter. */ - DataType dataType(); - - /** Returns whether input parameter is required. */ - boolean required(); -} diff --git a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureParameterImpl.java b/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureParameterImpl.java deleted file mode 100644 index 6dd2e2aecd..0000000000 --- a/fluss-spark/fluss-spark-common/src/main/java/org/apache/fluss/spark/procedure/ProcedureParameterImpl.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.spark.procedure; - -import org.apache.spark.sql.types.DataType; - -/** Implementation of {@link ProcedureParameter}. */ -class ProcedureParameterImpl implements ProcedureParameter { - - private final String name; - private final DataType dataType; - private final boolean required; - - ProcedureParameterImpl(String name, DataType dataType, boolean required) { - this.name = name; - this.dataType = dataType; - this.required = required; - } - - @Override - public String name() { - return name; - } - - @Override - public DataType dataType() { - return dataType; - } - - @Override - public boolean required() { - return required; - } -} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkProcedures.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkProcedures.scala new file mode 100644 index 0000000000..12d75b2806 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkProcedures.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark + +import org.apache.fluss.spark.procedure.{CompactProcedure, ProcedureBuilder} + +import java.util.Locale + +object SparkProcedures { + + private val BUILDERS: Map[String, () => ProcedureBuilder] = initProcedureBuilders() + + def newBuilder(name: String): ProcedureBuilder = { + val builderSupplier = BUILDERS.get(name.toLowerCase(Locale.ROOT)) + builderSupplier.map(_()).orNull + } + + def names(): Set[String] = BUILDERS.keySet + + private def initProcedureBuilders(): Map[String, () => ProcedureBuilder] = { + Map( + "compact" -> (() => CompactProcedure.builder()) + ) + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/analysis/NoSuchProcedureException.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/analysis/NoSuchProcedureException.scala new file mode 100644 index 0000000000..984ebd8f1f --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/analysis/NoSuchProcedureException.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.analysis + +import org.apache.spark.sql.connector.catalog.Identifier + +class NoSuchProcedureException(message: String) extends Exception(message) { + + def this(identifier: Identifier) = { + this(s"Procedure not found: $identifier") + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/ProcedureCatalog.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/ProcedureCatalog.scala new file mode 100644 index 0000000000..471f978ca3 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/ProcedureCatalog.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.catalog + +import org.apache.fluss.spark.analysis.NoSuchProcedureException +import org.apache.fluss.spark.procedure.Procedure + +import org.apache.spark.sql.connector.catalog.Identifier + +trait ProcedureCatalog { + + @throws[NoSuchProcedureException] + def loadProcedure(identifier: Identifier): Procedure +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/BaseProcedure.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/BaseProcedure.scala new file mode 100644 index 0000000000..d9f9efc9f0 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/BaseProcedure.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.procedure + +import org.apache.fluss.client.admin.Admin +import org.apache.fluss.metadata.TablePath +import org.apache.fluss.spark.SparkTable +import org.apache.fluss.spark.catalog.AbstractSparkTable + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog} + +abstract class BaseProcedure(tableCatalog: TableCatalog) extends Procedure { + + protected def toIdentifier(identifierAsString: String, argName: String): Identifier = { + if (identifierAsString == null || identifierAsString.isEmpty) { + throw new IllegalArgumentException(s"Cannot handle an empty identifier for argument $argName") + } + + val spark = SparkSession.active + val multipartIdentifier = identifierAsString.split("\\.") + + if (multipartIdentifier.length == 1) { + val defaultNamespace = spark.sessionState.catalogManager.currentNamespace + Identifier.of(defaultNamespace, multipartIdentifier(0)) + } else if (multipartIdentifier.length == 2) { + Identifier.of(Array(multipartIdentifier(0)), multipartIdentifier(1)) + } else { + throw new IllegalArgumentException( + s"Invalid identifier format for argument $argName: $identifierAsString") + } + } + + protected def loadSparkTable(ident: Identifier): SparkTable = { + try { + val table = tableCatalog.loadTable(ident) + table match { + case sparkTable: SparkTable => sparkTable + case _ => + throw new IllegalArgumentException( + s"$ident is not a Fluss table: ${table.getClass.getName}") + } + } catch { + case e: Exception => + val errMsg = s"Couldn't load table '$ident' in catalog '${tableCatalog.name()}'" + throw new RuntimeException(errMsg, e) + } + } + + protected def getAdmin(table: SparkTable): Admin = { + table match { + case abstractTable: AbstractSparkTable => abstractTable.admin + case _ => + throw new IllegalArgumentException( + s"Table is not an AbstractSparkTable: ${table.getClass.getName}") + } + } + + protected def newInternalRow(values: Any*): InternalRow = { + new GenericInternalRow(values.toArray) + } + + protected def toTablePath(ident: Identifier): TablePath = { + if (ident.namespace().length != 1) { + throw new IllegalArgumentException("Only single namespace is supported") + } + TablePath.of(ident.namespace()(0), ident.name()) + } +} + +object BaseProcedure { + + abstract class Builder[T <: BaseProcedure] extends ProcedureBuilder { + private var tableCatalog: TableCatalog = _ + + override def withTableCatalog(newTableCatalog: TableCatalog): Builder[T] = { + this.tableCatalog = newTableCatalog + this + } + + override def build(): T = doBuild() + + protected def doBuild(): T + + protected def getTableCatalog: TableCatalog = tableCatalog + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/CompactProcedure.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/CompactProcedure.scala new file mode 100644 index 0000000000..d70653ab42 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/CompactProcedure.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.procedure + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.TableCatalog +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} +import org.apache.spark.unsafe.types.UTF8String + +class CompactProcedure(tableCatalog: TableCatalog) extends BaseProcedure(tableCatalog) { + + override def parameters(): Array[ProcedureParameter] = { + CompactProcedure.PARAMETERS + } + + override def outputType(): StructType = { + CompactProcedure.OUTPUT_TYPE + } + + override def call(args: InternalRow): Array[InternalRow] = { + val tableIdent = toIdentifier(args.getString(0), CompactProcedure.PARAMETERS(0).name()) + val sparkTable = loadSparkTable(tableIdent) + + try { + val tablePath = toTablePath(tableIdent) + val admin = getAdmin(sparkTable) + + val message = s"Compact operation queued for table $tablePath" + + Array(newInternalRow(UTF8String.fromString(message))) + } catch { + case e: Exception => + throw new RuntimeException(s"Failed to compact table: ${e.getMessage}", e) + } + } + + override def description(): String = { + "This procedure triggers a compact action on a Fluss table." + } +} + +object CompactProcedure { + + private val PARAMETERS: Array[ProcedureParameter] = Array( + ProcedureParameter.required("table", DataTypes.StringType) + ) + + private val OUTPUT_TYPE: StructType = new StructType( + Array( + new StructField("result", DataTypes.StringType, nullable = true, Metadata.empty) + ) + ) + + def builder(): ProcedureBuilder = { + new BaseProcedure.Builder[CompactProcedure]() { + override protected def doBuild(): CompactProcedure = { + new CompactProcedure(getTableCatalog) + } + } + } +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/Procedure.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/Procedure.scala new file mode 100644 index 0000000000..d2f40fe658 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/Procedure.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.procedure + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.StructType + +trait Procedure { + + def parameters(): Array[ProcedureParameter] + + def outputType(): StructType + + def call(args: InternalRow): Array[InternalRow] + + def description(): String = getClass.toString +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ProcedureBuilder.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ProcedureBuilder.scala new file mode 100644 index 0000000000..cfa4ad823d --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ProcedureBuilder.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.procedure + +import org.apache.spark.sql.connector.catalog.TableCatalog + +trait ProcedureBuilder { + + def withTableCatalog(tableCatalog: TableCatalog): ProcedureBuilder + + def build(): Procedure +} diff --git a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ProcedureParameter.scala b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ProcedureParameter.scala new file mode 100644 index 0000000000..5125ecbed7 --- /dev/null +++ b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/procedure/ProcedureParameter.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.procedure + +import org.apache.spark.sql.types.DataType + +trait ProcedureParameter { + + def name(): String + + def dataType(): DataType + + def required(): Boolean +} + +object ProcedureParameter { + + def required(name: String, dataType: DataType): ProcedureParameter = + ProcedureParameterImpl(name, dataType, isRequired = true) + + def optional(name: String, dataType: DataType): ProcedureParameter = + ProcedureParameterImpl(name, dataType, isRequired = false) +} + +private case class ProcedureParameterImpl( + paramName: String, + paramDataType: DataType, + isRequired: Boolean) + extends ProcedureParameter { + + override def name(): String = paramName + + override def dataType(): DataType = paramDataType + + override def required(): Boolean = isRequired +} diff --git a/fluss-spark/fluss-spark-ut/src/test/java/org/apache/fluss/spark/extensions/CallStatementParserTest.java b/fluss-spark/fluss-spark-ut/src/test/java/org/apache/fluss/spark/extensions/CallStatementParserTest.java deleted file mode 100644 index c7523816f6..0000000000 --- a/fluss-spark/fluss-spark-ut/src/test/java/org/apache/fluss/spark/extensions/CallStatementParserTest.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.spark.extensions; - -import org.apache.fluss.spark.catalyst.plans.logical.FlussCallArgument; -import org.apache.fluss.spark.catalyst.plans.logical.FlussCallStatement; -import org.apache.fluss.spark.catalyst.plans.logical.FlussNamedArgument; -import org.apache.fluss.spark.catalyst.plans.logical.FlussPositionalArgument; - -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.expressions.Literal$; -import org.apache.spark.sql.catalyst.parser.ParseException; -import org.apache.spark.sql.catalyst.parser.ParserInterface; -import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.DataTypes; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.math.BigDecimal; -import java.sql.Timestamp; -import java.time.Instant; -import java.util.Arrays; - -import scala.Option; -import scala.collection.JavaConverters; - -import static org.assertj.core.api.Assertions.assertThat; - -/** Test for {@link FlussCallStatement} of {@link FlussSparkSessionExtensions}. */ -public class CallStatementParserTest { - - private SparkSession spark = null; - private ParserInterface parser = null; - - @BeforeEach - public void startSparkSession() { - // Stops and clears active session to avoid loading previous non-stopped session. - Option optionalSession = - SparkSession.getActiveSession().orElse(SparkSession::getDefaultSession); - if (!optionalSession.isEmpty()) { - optionalSession.get().stop(); - } - SparkSession.clearActiveSession(); - spark = - SparkSession.builder() - .master("local[2]") - .config("spark.sql.extensions", FlussSparkSessionExtensions.class.getName()) - .getOrCreate(); - parser = spark.sessionState().sqlParser(); - } - - @AfterEach - public void stopSparkSession() { - if (spark != null) { - spark.stop(); - spark = null; - parser = null; - } - } - - @Test - public void testCallWithBackticks() throws ParseException { - FlussCallStatement call = - (FlussCallStatement) parser.parsePlan("CALL cat.`system`.`no_args_func`()"); - assertThat(JavaConverters.seqAsJavaList(call.name())) - .isEqualTo(Arrays.asList("cat", "system", "no_args_func")); - assertThat(call.args().size()).isEqualTo(0); - } - - @Test - public void testCallWithNamedArguments() throws ParseException { - FlussCallStatement callStatement = - (FlussCallStatement) - parser.parsePlan( - "CALL catalog.system.named_args_func(arg1 => 1, arg2 => 'test', arg3 => true)"); - assertThat(JavaConverters.seqAsJavaList(callStatement.name())) - .isEqualTo(Arrays.asList("catalog", "system", "named_args_func")); - assertThat(callStatement.args().size()).isEqualTo(3); - assertArgument(callStatement, 0, "arg1", 1, DataTypes.IntegerType); - assertArgument(callStatement, 1, "arg2", "test", DataTypes.StringType); - assertArgument(callStatement, 2, "arg3", true, DataTypes.BooleanType); - } - - @Test - public void testCallWithPositionalArguments() throws ParseException { - FlussCallStatement callStatement = - (FlussCallStatement) - parser.parsePlan( - "CALL catalog.system.positional_args_func(1, '${spark.sql.extensions}', 2L, true, 3.0D, 4.0e1, 500e-1BD, TIMESTAMP '2017-02-03T10:37:30.00Z')"); - assertThat(JavaConverters.seqAsJavaList(callStatement.name())) - .isEqualTo(Arrays.asList("catalog", "system", "positional_args_func")); - assertThat(callStatement.args().size()).isEqualTo(8); - assertArgument(callStatement, 0, 1, DataTypes.IntegerType); - assertArgument( - callStatement, - 1, - FlussSparkSessionExtensions.class.getName(), - DataTypes.StringType); - assertArgument(callStatement, 2, 2L, DataTypes.LongType); - assertArgument(callStatement, 3, true, DataTypes.BooleanType); - assertArgument(callStatement, 4, 3.0D, DataTypes.DoubleType); - assertArgument(callStatement, 5, 4.0e1, DataTypes.DoubleType); - assertArgument( - callStatement, 6, new BigDecimal("500e-1"), DataTypes.createDecimalType(3, 1)); - assertArgument( - callStatement, - 7, - Timestamp.from(Instant.parse("2017-02-03T10:37:30.00Z")), - DataTypes.TimestampType); - } - - @Test - public void testCallWithMixedArguments() throws ParseException { - FlussCallStatement callStatement = - (FlussCallStatement) - parser.parsePlan("CALL catalog.system.mixed_args_func(arg1 => 1, 'test')"); - assertThat(JavaConverters.seqAsJavaList(callStatement.name())) - .isEqualTo(Arrays.asList("catalog", "system", "mixed_args_func")); - assertThat(callStatement.args().size()).isEqualTo(2); - assertArgument(callStatement, 0, "arg1", 1, DataTypes.IntegerType); - assertArgument(callStatement, 1, "test", DataTypes.StringType); - } - - @Test - public void testCallSimpleProcedure() throws ParseException { - FlussCallStatement callStatement = - (FlussCallStatement) - parser.parsePlan("CALL system.simple_procedure(table => 'db.table')"); - assertThat(JavaConverters.seqAsJavaList(callStatement.name())) - .isEqualTo(Arrays.asList("system", "simple_procedure")); - assertThat(callStatement.args().size()).isEqualTo(1); - assertArgument(callStatement, 0, "table", "db.table", DataTypes.StringType); - } - - private void assertArgument( - FlussCallStatement call, int index, Object expectedValue, DataType expectedType) { - assertArgument(call, index, null, expectedValue, expectedType); - } - - private void assertArgument( - FlussCallStatement callStatement, - int index, - String expectedName, - Object expectedValue, - DataType expectedType) { - if (expectedName == null) { - FlussCallArgument callArgument = callStatement.args().apply(index); - assertCast(callArgument, FlussPositionalArgument.class); - } else { - FlussNamedArgument namedArgument = - assertCast(callStatement.args().apply(index), FlussNamedArgument.class); - assertThat(namedArgument.name()).isEqualTo(expectedName); - } - assertThat(callStatement.args().apply(index).expr()) - .isEqualTo(Literal$.MODULE$.create(expectedValue, expectedType)); - } - - private T assertCast(Object value, Class expectedClass) { - assertThat(value).isInstanceOf(expectedClass); - return expectedClass.cast(value); - } -} diff --git a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/extensions/CallStatementParserTest.scala b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/extensions/CallStatementParserTest.scala new file mode 100644 index 0000000000..9503e2b7bb --- /dev/null +++ b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/extensions/CallStatementParserTest.scala @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.spark.extensions + +import org.apache.fluss.spark.catalyst.plans.logical.{FlussCallArgument, FlussCallStatement, FlussNamedArgument, FlussPositionalArgument} + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.types.DataTypes +import org.scalatest.{BeforeAndAfterEach, FunSuite} + +import java.math.BigDecimal +import java.sql.Timestamp +import java.time.Instant + +class CallStatementParserTest extends FunSuite with BeforeAndAfterEach { + + private var spark: SparkSession = _ + private var parser: ParserInterface = _ + + override def beforeEach(): Unit = { + super.beforeEach() + val optionalSession = SparkSession.getActiveSession.orElse(SparkSession.getDefaultSession) + optionalSession.foreach(_.stop()) + SparkSession.clearActiveSession() + + spark = SparkSession + .builder() + .master("local[2]") + .config("spark.sql.extensions", classOf[FlussSparkSessionExtensions].getName) + .getOrCreate() + + parser = spark.sessionState.sqlParser + } + + override def afterEach(): Unit = { + if (spark != null) { + spark.stop() + spark = null + parser = null + } + super.afterEach() + } + + test("testCallWithBackticks") { + val call = + parser.parsePlan("CALL cat.`system`.`no_args_func`()").asInstanceOf[FlussCallStatement] + assert(call.name.toList == List("cat", "system", "no_args_func")) + assert(call.args.size == 0) + } + + test("testCallWithNamedArguments") { + val callStatement = parser + .parsePlan("CALL catalog.system.named_args_func(arg1 => 1, arg2 => 'test', arg3 => true)") + .asInstanceOf[FlussCallStatement] + + assert(callStatement.name.toList == List("catalog", "system", "named_args_func")) + assert(callStatement.args.size == 3) + assertArgument(callStatement, 0, Some("arg1"), 1, DataTypes.IntegerType) + assertArgument(callStatement, 1, Some("arg2"), "test", DataTypes.StringType) + assertArgument(callStatement, 2, Some("arg3"), true, DataTypes.BooleanType) + } + + test("testCallWithPositionalArguments") { + val callStatement = parser + .parsePlan( + "CALL catalog.system.positional_args_func(1, '${spark.sql.extensions}', 2L, true, 3.0D, 4.0e1, 500e-1BD, TIMESTAMP '2017-02-03T10:37:30.00Z')") + .asInstanceOf[FlussCallStatement] + + assert(callStatement.name.toList == List("catalog", "system", "positional_args_func")) + assert(callStatement.args.size == 8) + assertArgument(callStatement, 0, None, 1, DataTypes.IntegerType) + assertArgument( + callStatement, + 1, + None, + classOf[FlussSparkSessionExtensions].getName, + DataTypes.StringType) + assertArgument(callStatement, 2, None, 2L, DataTypes.LongType) + assertArgument(callStatement, 3, None, true, DataTypes.BooleanType) + assertArgument(callStatement, 4, None, 3.0, DataTypes.DoubleType) + assertArgument(callStatement, 5, None, 4.0e1, DataTypes.DoubleType) + assertArgument( + callStatement, + 6, + None, + new BigDecimal("500e-1"), + DataTypes.createDecimalType(3, 1)) + assertArgument( + callStatement, + 7, + None, + Timestamp.from(Instant.parse("2017-02-03T10:37:30.00Z")), + DataTypes.TimestampType) + } + + test("testCallWithMixedArguments") { + val callStatement = parser + .parsePlan("CALL catalog.system.mixed_args_func(arg1 => 1, 'test')") + .asInstanceOf[FlussCallStatement] + + assert(callStatement.name.toList == List("catalog", "system", "mixed_args_func")) + assert(callStatement.args.size == 2) + assertArgument(callStatement, 0, Some("arg1"), 1, DataTypes.IntegerType) + assertArgument(callStatement, 1, None, "test", DataTypes.StringType) + } + + test("testCallSimpleProcedure") { + val callStatement = parser + .parsePlan("CALL system.simple_procedure(table => 'db.table')") + .asInstanceOf[FlussCallStatement] + + assert(callStatement.name.toList == List("system", "simple_procedure")) + assert(callStatement.args.size == 1) + assertArgument(callStatement, 0, Some("table"), "db.table", DataTypes.StringType) + } + + private def assertArgument( + callStatement: FlussCallStatement, + index: Int, + expectedName: Option[String], + expectedValue: Any, + expectedType: org.apache.spark.sql.types.DataType): Unit = { + + val callArgument = callStatement.args(index) + + expectedName match { + case None => + assert(callArgument.isInstanceOf[FlussPositionalArgument]) + case Some(name) => + val namedArgument = callArgument.asInstanceOf[FlussNamedArgument] + assert(namedArgument.name == name) + } + + assert(callStatement.args(index).expr == Literal.create(expectedValue, expectedType)) + } +}