From c6af76f353e8ab3c7930473481585a0a12ac051f Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Thu, 5 Mar 2026 14:55:21 +0000 Subject: [PATCH 1/2] v1 --- .../catalog/TransactionalCatalogPlugin.java | 39 ++++++ .../catalog/transactions/Transaction.java | 77 +++++++++++ .../catalog/transactions/TransactionInfo.java | 31 +++++ .../transactions/TransactionUtils.scala | 55 ++++++++ .../transactions/TransactionInfoImpl.scala | 20 +++ .../transactions/TransactionUtilsSuite.scala | 124 ++++++++++++++++++ 6 files changed, 346 insertions(+) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TransactionalCatalogPlugin.java create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/TransactionInfo.java create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/transactions/TransactionUtils.scala create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/transactions/TransactionInfoImpl.scala create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/transactions/TransactionUtilsSuite.scala diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TransactionalCatalogPlugin.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TransactionalCatalogPlugin.java new file mode 100644 index 0000000000000..34a4fc68e9649 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TransactionalCatalogPlugin.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +import org.apache.spark.sql.connector.catalog.transactions.Transaction; +import org.apache.spark.sql.connector.catalog.transactions.TransactionInfo; + +/** + * A {@link CatalogPlugin} that supports transactions. + *

+ * Catalogs that implement this interface opt in to transactional query execution. A catalog + * implementing this interface is responsible for starting transactions. + * + * @since 4.2.0 + */ +public interface TransactionalCatalogPlugin extends CatalogPlugin { + + /** + * Begins a new transaction and returns a {@link Transaction} representing it. + * + * @param info metadata about the transaction being started. + */ + Transaction beginTransaction(TransactionInfo info); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java new file mode 100644 index 0000000000000..0bce4cd643a8a --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog.transactions; + +import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.connector.catalog.TransactionalCatalogPlugin; + +import java.io.Closeable; + +/** + * Represents a transaction. + *

+ * Spark begins a transaction with {@link TransactionalCatalogPlugin#beginTransaction} and + * executes read/write operations against the transaction's catalog. On success, Spark + * calls {@link #commit()}; on failure, Spark calls {@link #abort()}. In both cases Spark + * subsequently calls {@link #close()} to release resources. + * + * @since 4.2.0 + */ +public interface Transaction extends Closeable { + + /** + * Returns the catalog associated with this transaction. This catalog is responsible for tracking + * read/write operations that occur within the boundaries of a transaction. This allows + * connectors to perform conflict resolution at commit time. + */ + CatalogPlugin catalog(); + + /** + * Commits the transaction. All writes performed under it become visible to other readers. + *

+ * The connector is responsible for detecting and resolving conflicting commits or throwing + * an exception if resolution is not possible. + *

+ * This method must be called exactly once. Spark calls {@link #close()} immediately after + * this method returns, so implementations should not release resources inside + * {@code commit()} itself. + */ + void commit(); + + /** + * Aborts the transaction, discarding any staged changes. + *

+ * This method must be idempotent. If the transaction has already been committed or aborted, + * invoking it must have no effect. + *

+ * Spark calls {@link #close()} immediately after this method returns. + */ + void abort(); + + /** + * Releases any resources held by this transaction. + *

+ * Spark always calls this method after {@link #commit()} or {@link #abort()}, regardless of + * whether those methods succeed or not. + *

+ * This method must be idempotent. If the transaction has already been closed, + * invoking it must have no effect. + */ + @Override + void close(); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/TransactionInfo.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/TransactionInfo.java new file mode 100644 index 0000000000000..42b42a4864255 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/TransactionInfo.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog.transactions; + +/** + * Metadata about a transaction. + * + * @since 4.2.0 + */ +public interface TransactionInfo { + + /** + * Returns a unique identifier for this transaction. + */ + String id(); +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/transactions/TransactionUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/transactions/TransactionUtils.scala new file mode 100644 index 0000000000000..d160aafdea34e --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/transactions/TransactionUtils.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.transactions + +import java.util.UUID + +import org.apache.spark.sql.connector.catalog.TransactionalCatalogPlugin +import org.apache.spark.sql.connector.catalog.transactions.{Transaction, TransactionInfoImpl} +import org.apache.spark.util.Utils + +object TransactionUtils { + def commit(transaction: Transaction): Unit = { + Utils.tryWithSafeFinally { + transaction.commit() + } { + transaction.close() + } + } + + def abort(transaction: Transaction): Unit = { + Utils.tryWithSafeFinally { + transaction.abort() + } { + transaction.close() + } + } + + def beginTransaction(catalog: TransactionalCatalogPlugin): Transaction = { + val info = TransactionInfoImpl(id = UUID.randomUUID.toString) + val transaction = catalog.beginTransaction(info) + if (transaction.catalog.name != catalog.name) { + abort(transaction) + throw new IllegalStateException( + s"""Transaction catalog name (${transaction.catalog.name}) + |must match original catalog name (${catalog.name}). + |""".stripMargin) + } + transaction + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/transactions/TransactionInfoImpl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/transactions/TransactionInfoImpl.scala new file mode 100644 index 0000000000000..4cb53da0a59e2 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/transactions/TransactionInfoImpl.scala @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog.transactions + +case class TransactionInfoImpl(id: String) extends TransactionInfo diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/transactions/TransactionUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/transactions/TransactionUtilsSuite.scala new file mode 100644 index 0000000000000..d409316e667b1 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/transactions/TransactionUtilsSuite.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.transactions + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.connector.catalog.{CatalogPlugin, TransactionalCatalogPlugin} +import org.apache.spark.sql.connector.catalog.transactions.{Transaction, TransactionInfo} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class TransactionUtilsSuite extends SparkFunSuite { + val testCatalogName = "test_catalog" + + // --- Helpers --------------------------------------------------------------- + private def mockCatalog(catalogName: String): CatalogPlugin = new CatalogPlugin { + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = () + override def name(): String = catalogName + } + + private val emptyFunction = () => () + private class TestTransaction( + catalogName: String, + onCommit: () => Unit = emptyFunction, + onAbort: () => Unit = emptyFunction, + onClose: () => Unit = emptyFunction) extends Transaction { + var committed = false + var aborted = false + var closed = false + + override def catalog(): CatalogPlugin = mockCatalog(catalogName) + override def commit(): Unit = { committed = true; onCommit() } + override def abort(): Unit = { aborted = true; onAbort() } + override def close(): Unit = { closed = true; onClose() } + } + + private def mockTransactionalCatalog( + catalogName: String, + txnCatalogName: String = null): TransactionalCatalogPlugin = { + val resolvedTxnCatalogName = Option(txnCatalogName).getOrElse(catalogName) + new TransactionalCatalogPlugin { + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = () + override def name(): String = catalogName + override def beginTransaction(info: TransactionInfo): Transaction = + new TestTransaction(resolvedTxnCatalogName) + } + } + + // --- Commit ---------------------------------------------------------------- + test("commit: calls commit then close") { + val txn = new TestTransaction(testCatalogName) + TransactionUtils.commit(txn) + assert(txn.committed) + assert(txn.closed) + } + + test("commit: close is called even if commit fails") { + val txn = new TestTransaction( + testCatalogName, onCommit = () => throw new RuntimeException("commit failed")) + intercept[RuntimeException] { TransactionUtils.commit(txn) } + assert(txn.closed) + } + + // --- Abort ----------------------------------------------------------------- + test("abort: calls abort then close") { + val txn = new TestTransaction(testCatalogName) + TransactionUtils.abort(txn) + assert(txn.aborted) + assert(txn.closed) + } + + test("abort: close is called even if abort fails") { + val txn = new TestTransaction(testCatalogName, + onAbort = () => throw new RuntimeException("abort failed")) + intercept[RuntimeException] { TransactionUtils.abort(txn) } + assert(txn.closed) + } + + // --- Begin Transaction ----------------------------------------------------- + test("beginTransaction: returns transaction when catalog names match") { + val catalog = mockTransactionalCatalog(testCatalogName) + val txn = TransactionUtils.beginTransaction(catalog) + assert(txn.catalog().name() == testCatalogName) + } + + test("beginTransaction: fails when transaction catalog name does not match") { + val catalog = mockTransactionalCatalog(catalogName = testCatalogName, txnCatalogName = "other") + val e = intercept[IllegalStateException] { + TransactionUtils.beginTransaction(catalog) + } + assert(e.getMessage.contains("other")) + assert(e.getMessage.contains(testCatalogName)) + } + + test("beginTransaction: aborts and closes transaction on catalog name mismatch") { + var aborted = false + var closed = false + val catalog = new TransactionalCatalogPlugin { + override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = () + override def name(): String = testCatalogName + override def beginTransaction(info: TransactionInfo): Transaction = + new TestTransaction( + "other", + onAbort = () => { aborted = true }, + onClose = () => { closed = true }) + } + intercept[IllegalStateException] { TransactionUtils.beginTransaction(catalog) } + assert(aborted) + assert(closed) + } +} From 4a3681927ee794b7993da4a97daf6e39a406ea04 Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Mon, 9 Mar 2026 07:39:55 +0000 Subject: [PATCH 2/2] Address comments --- .../sql/connector/catalog/transactions/Transaction.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java index 0bce4cd643a8a..a1871445f9e78 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/transactions/Transaction.java @@ -47,9 +47,8 @@ public interface Transaction extends Closeable { * The connector is responsible for detecting and resolving conflicting commits or throwing * an exception if resolution is not possible. *

- * This method must be called exactly once. Spark calls {@link #close()} immediately after - * this method returns, so implementations should not release resources inside - * {@code commit()} itself. + * This method will be called exactly once per transaction. Spark calls {@link #close()} + * immediately after this method returns. */ void commit();