Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog;

import org.apache.spark.sql.connector.catalog.transactions.Transaction;
import org.apache.spark.sql.connector.catalog.transactions.TransactionInfo;

/**
* A {@link CatalogPlugin} that supports transactions.
* <p>
* Catalogs that implement this interface opt in to transactional query execution. A catalog
* implementing this interface is responsible for starting transactions.
*
* @since 4.2.0
*/
public interface TransactionalCatalogPlugin extends CatalogPlugin {

/**
* Begins a new transaction and returns a {@link Transaction} representing it.
*
* @param info metadata about the transaction being started.
*/
Transaction beginTransaction(TransactionInfo info);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog.transactions;

import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.TransactionalCatalogPlugin;

import java.io.Closeable;

/**
* Represents a transaction.
* <p>
* Spark begins a transaction with {@link TransactionalCatalogPlugin#beginTransaction} and
* executes read/write operations against the transaction's catalog. On success, Spark
* calls {@link #commit()}; on failure, Spark calls {@link #abort()}. In both cases Spark
* subsequently calls {@link #close()} to release resources.
*
* @since 4.2.0
*/
public interface Transaction extends Closeable {

/**
* Returns the catalog associated with this transaction. This catalog is responsible for tracking
* read/write operations that occur within the boundaries of a transaction. This allows
* connectors to perform conflict resolution at commit time.
*/
CatalogPlugin catalog();

/**
* Commits the transaction. All writes performed under it become visible to other readers.
* <p>
* The connector is responsible for detecting and resolving conflicting commits or throwing
* an exception if resolution is not possible.
* <p>
* This method must be called exactly once. Spark calls {@link #close()} immediately after
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Must be called exactly once" sounds like it is controlled by connector. I think a better way to say "will be called exactly once".

I am also not sure about the last sentence on releasing resources. Instead of saying "should not release resources", I think we better describe the sequencing of calls so that connectors make an informed call on their end when to do what.

* this method returns, so implementations should not release resources inside
* {@code commit()} itself.
*/
void commit();

/**
* Aborts the transaction, discarding any staged changes.
* <p>
* This method must be idempotent. If the transaction has already been committed or aborted,
* invoking it must have no effect.
* <p>
* Spark calls {@link #close()} immediately after this method returns.
*/
void abort();

/**
* Releases any resources held by this transaction.
* <p>
* Spark always calls this method after {@link #commit()} or {@link #abort()}, regardless of
* whether those methods succeed or not.
* <p>
* This method must be idempotent. If the transaction has already been closed,
* invoking it must have no effect.
*/
@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog.transactions;

/**
* Metadata about a transaction.
*
* @since 4.2.0
*/
public interface TransactionInfo {

/**
* Returns a unique identifier for this transaction.
*/
String id();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.transactions

import java.util.UUID

import org.apache.spark.sql.connector.catalog.TransactionalCatalogPlugin
import org.apache.spark.sql.connector.catalog.transactions.{Transaction, TransactionInfoImpl}
import org.apache.spark.util.Utils

object TransactionUtils {
def commit(transaction: Transaction): Unit = {
Utils.tryWithSafeFinally {
transaction.commit()
} {
transaction.close()
}
}

def abort(transaction: Transaction): Unit = {
Utils.tryWithSafeFinally {
transaction.abort()
} {
transaction.close()
}
}

def beginTransaction(catalog: TransactionalCatalogPlugin): Transaction = {
val info = TransactionInfoImpl(id = UUID.randomUUID.toString)
val transaction = catalog.beginTransaction(info)
if (transaction.catalog.name != catalog.name) {
abort(transaction)
throw new IllegalStateException(
s"""Transaction catalog name (${transaction.catalog.name})
|must match original catalog name (${catalog.name}).
|""".stripMargin)
}
transaction
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog.transactions

case class TransactionInfoImpl(id: String) extends TransactionInfo
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.transactions

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.connector.catalog.{CatalogPlugin, TransactionalCatalogPlugin}
import org.apache.spark.sql.connector.catalog.transactions.{Transaction, TransactionInfo}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

class TransactionUtilsSuite extends SparkFunSuite {
val testCatalogName = "test_catalog"

// --- Helpers ---------------------------------------------------------------
private def mockCatalog(catalogName: String): CatalogPlugin = new CatalogPlugin {
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = ()
override def name(): String = catalogName
}

private val emptyFunction = () => ()
private class TestTransaction(
catalogName: String,
onCommit: () => Unit = emptyFunction,
onAbort: () => Unit = emptyFunction,
onClose: () => Unit = emptyFunction) extends Transaction {
var committed = false
var aborted = false
var closed = false

override def catalog(): CatalogPlugin = mockCatalog(catalogName)
override def commit(): Unit = { committed = true; onCommit() }
override def abort(): Unit = { aborted = true; onAbort() }
override def close(): Unit = { closed = true; onClose() }
}

private def mockTransactionalCatalog(
catalogName: String,
txnCatalogName: String = null): TransactionalCatalogPlugin = {
val resolvedTxnCatalogName = Option(txnCatalogName).getOrElse(catalogName)
new TransactionalCatalogPlugin {
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = ()
override def name(): String = catalogName
override def beginTransaction(info: TransactionInfo): Transaction =
new TestTransaction(resolvedTxnCatalogName)
}
}

// --- Commit ----------------------------------------------------------------
test("commit: calls commit then close") {
val txn = new TestTransaction(testCatalogName)
TransactionUtils.commit(txn)
assert(txn.committed)
assert(txn.closed)
}

test("commit: close is called even if commit fails") {
val txn = new TestTransaction(
testCatalogName, onCommit = () => throw new RuntimeException("commit failed"))
intercept[RuntimeException] { TransactionUtils.commit(txn) }
assert(txn.closed)
}

// --- Abort -----------------------------------------------------------------
test("abort: calls abort then close") {
val txn = new TestTransaction(testCatalogName)
TransactionUtils.abort(txn)
assert(txn.aborted)
assert(txn.closed)
}

test("abort: close is called even if abort fails") {
val txn = new TestTransaction(testCatalogName,
onAbort = () => throw new RuntimeException("abort failed"))
intercept[RuntimeException] { TransactionUtils.abort(txn) }
assert(txn.closed)
}

// --- Begin Transaction -----------------------------------------------------
test("beginTransaction: returns transaction when catalog names match") {
val catalog = mockTransactionalCatalog(testCatalogName)
val txn = TransactionUtils.beginTransaction(catalog)
assert(txn.catalog().name() == testCatalogName)
}

test("beginTransaction: fails when transaction catalog name does not match") {
val catalog = mockTransactionalCatalog(catalogName = testCatalogName, txnCatalogName = "other")
val e = intercept[IllegalStateException] {
TransactionUtils.beginTransaction(catalog)
}
assert(e.getMessage.contains("other"))
assert(e.getMessage.contains(testCatalogName))
}

test("beginTransaction: aborts and closes transaction on catalog name mismatch") {
var aborted = false
var closed = false
val catalog = new TransactionalCatalogPlugin {
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = ()
override def name(): String = testCatalogName
override def beginTransaction(info: TransactionInfo): Transaction =
new TestTransaction(
"other",
onAbort = () => { aborted = true },
onClose = () => { closed = true })
}
intercept[IllegalStateException] { TransactionUtils.beginTransaction(catalog) }
assert(aborted)
assert(closed)
}
}