From 5db24b3b1f9f3a7c0b40fd1511919fdbbad96100 Mon Sep 17 00:00:00 2001 From: ksirotkin Date: Thu, 26 Feb 2026 10:26:19 +0200 Subject: [PATCH 01/14] IGNITE-27685 Implement exponential backoff for durable finish --- ...asedExponentialBackoffTimeoutStrategy.java | 147 ++++++++++++++++++ .../internal/util/NoopTimeoutStrategy.java | 57 +++++++ .../ignite/internal/util/TimeoutStrategy.java | 91 +++++++++++ .../rebalance/ItRebalanceDistributedTest.java | 4 +- .../partition/replicator/fixtures/Node.java | 4 +- .../runner/app/ItIgniteNodeRestartTest.java | 4 +- .../ignite/internal/app/IgniteImpl.java | 4 +- .../exec/rel/TableScanNodeExecutionTest.java | 4 +- .../ignite/distributed/ItLockTableTest.java | 4 +- ...ributedTestSingleNodeNoCleanupMessage.java | 4 +- .../internal/table/ItColocationTest.java | 4 +- .../ignite/distributed/ItTxTestCluster.java | 7 +- .../table/impl/DummyInternalTableImpl.java | 4 +- .../tx/impl/TxCleanupRequestSender.java | 91 +++++------ .../internal/tx/impl/TxManagerImpl.java | 46 ++++-- .../ignite/internal/tx/TxCleanupTest.java | 4 +- .../ignite/internal/tx/TxManagerTest.java | 4 +- 17 files changed, 401 insertions(+), 82 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/util/KeyBasedExponentialBackoffTimeoutStrategy.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/util/NoopTimeoutStrategy.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/util/TimeoutStrategy.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/KeyBasedExponentialBackoffTimeoutStrategy.java b/modules/core/src/main/java/org/apache/ignite/internal/util/KeyBasedExponentialBackoffTimeoutStrategy.java new file mode 100644 index 000000000000..318c1a3b7181 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/KeyBasedExponentialBackoffTimeoutStrategy.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; + +/** + * a. + */ +public class KeyBasedExponentialBackoffTimeoutStrategy implements TimeoutStrategy { + /** Default backoff coefficient to calculate next timeout based on backoff strategy. */ + private static final double DEFAULT_BACKOFF_COEFFICIENT = 2.0; + + /** Default max timeout that strategy could generate, ms. */ + private static final int DEFAULT_TIMEOUT_MS_MAX = 11_000; + + /** + * a. + */ + private final int initialTimeout; + + /** + * a. + */ + private final int maxTimeout; + + /** + * a. + */ + private final double backoffCoefficient; + + /** + * a. + */ + private final boolean jitter; + + // Thread-safe key → state mapping + private final ConcurrentHashMap registry = new ConcurrentHashMap<>(); + + /** + * a. + * + * @param initialTimeout a. + */ + public KeyBasedExponentialBackoffTimeoutStrategy( + int initialTimeout + ) { + this(initialTimeout, DEFAULT_TIMEOUT_MS_MAX, DEFAULT_BACKOFF_COEFFICIENT); + } + + /** + * a. + * + * @param initialTimeout a. + * @param maxTimeout a. + * @param backoffCoefficient a. + */ + public KeyBasedExponentialBackoffTimeoutStrategy( + int initialTimeout, + int maxTimeout, + double backoffCoefficient + ) { + this(initialTimeout, maxTimeout, backoffCoefficient, false); + } + + /** + * a. + * + * @param initialTimeout a. + * @param maxTimeout a. + * @param backoffCoefficient a. + * @param jitter a. + */ + public KeyBasedExponentialBackoffTimeoutStrategy( + int initialTimeout, + int maxTimeout, + double backoffCoefficient, + boolean jitter + ) { + this.initialTimeout = initialTimeout; + this.maxTimeout = maxTimeout; + this.backoffCoefficient = backoffCoefficient; + this.jitter = jitter; + } + + /** {@inheritDoc} */ + @Override + public TimeoutState getCurrent(String key) { + return registry.getOrDefault(key, new TimeoutState(initialTimeout, 0)); + } + + /** {@inheritDoc} */ + @Override + public int next(String key) { + return registry.compute(key, (k, prev) -> { + int raw = (prev == null) + ? initialTimeout + : Math.min((int) (prev.getCurrentTimeout() * backoffCoefficient), maxTimeout); + + int timeout = jitter ? applyJitter(raw) : raw; + int attempt = (prev == null) ? 1 : prev.getAttempt() + 1; + + return new TimeoutState(timeout, attempt); + }).getCurrentTimeout(); + } + + /** {@inheritDoc} */ + @Override + public void reset(String key) { + registry.remove(key); + } + + /** {@inheritDoc} */ + @Override + public void resetAll() { + registry.clear(); + } + + /** + * a. + * + * @param raw a. + * @return a. + */ + private static int applyJitter(int raw) { + int lo = raw / 2; + int hi = raw + lo; // raw * 1.5 + + return lo + ThreadLocalRandom.current().nextInt(hi - lo + 1); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/NoopTimeoutStrategy.java b/modules/core/src/main/java/org/apache/ignite/internal/util/NoopTimeoutStrategy.java new file mode 100644 index 000000000000..d251473aa013 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/NoopTimeoutStrategy.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +/** + * a. + */ +public class NoopTimeoutStrategy implements TimeoutStrategy { + /** + * a. + */ + private final TimeoutState timeoutState; + + /** + * a. + * + * @param initTimeout a. + */ + public NoopTimeoutStrategy(int initTimeout) { + this.timeoutState = new TimeoutState(initTimeout, 0); + } + + /** {@inheritDoc} */ + @Override + public TimeoutState getCurrent(String key) { + return timeoutState; + } + + /** {@inheritDoc} */ + @Override + public int next(String key) { + return timeoutState.getCurrentTimeout(); + } + + /** {@inheritDoc} */ + @Override + public void reset(String key) {} + + /** {@inheritDoc} */ + @Override + public void resetAll() {} +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/TimeoutStrategy.java b/modules/core/src/main/java/org/apache/ignite/internal/util/TimeoutStrategy.java new file mode 100644 index 000000000000..918428a2d476 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/TimeoutStrategy.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +/** + * a. + */ +public interface TimeoutStrategy { + + /** + * a. + * + * @param key a. + * @return a. + */ + TimeoutState getCurrent(String key); + + /** + * a. + * + * @param key a. + * @return a. + */ + int next(String key); + + /** + * a. + * + * @param key a. + */ + void reset(String key); + + /** + * a. + */ + void resetAll(); + + /** + * a. + */ + class TimeoutState { + /** + * a. + */ + private final int currentTimeout; + /** + * a. + */ + private final int attempt; + + /** + * a. + * + * @param currentTimeout a. + * @param attempt a. + */ + public TimeoutState(int currentTimeout, int attempt) { + this.currentTimeout = currentTimeout; + this.attempt = attempt; + } + + /** + * a. + */ + public int getCurrentTimeout() { + return currentTimeout; + } + + /** + * a. + */ + public int getAttempt() { + return attempt; + } + } +} diff --git a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index ff04b0d95acb..7c5277a1f12d 100644 --- a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -257,6 +257,7 @@ import org.apache.ignite.internal.tx.message.TxMessageGroup; import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage; import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; +import org.apache.ignite.internal.util.KeyBasedExponentialBackoffTimeoutStrategy; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.vault.persistence.PersistentVaultService; import org.apache.ignite.network.NetworkAddress; @@ -1466,7 +1467,8 @@ private class Node { transactionInflights, lowWatermark, commonScheduledExecutorService, - metricManager + metricManager, + new KeyBasedExponentialBackoffTimeoutStrategy(20) ); replicaManager = spy(new ReplicaManager( diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java index 3a0edc1930a7..075873a5a041 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java @@ -194,6 +194,7 @@ import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage; import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage; import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; +import org.apache.ignite.internal.util.KeyBasedExponentialBackoffTimeoutStrategy; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; @@ -634,7 +635,8 @@ public CompletableFuture invoke(Condition condition, Operation success, transactionInflights, lowWatermark, threadPoolsManager.commonScheduler(), - metricManager + metricManager, + new KeyBasedExponentialBackoffTimeoutStrategy(20) ); volatileLogStorageManagerCreator = new VolatileLogStorageManagerCreator(name, workDir.resolve("volatile-log-spillout-" + name)); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 03bab008c475..6cca7666aabf 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -234,6 +234,7 @@ import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; import org.apache.ignite.internal.util.ByteUtils; import org.apache.ignite.internal.util.Cursor; +import org.apache.ignite.internal.util.KeyBasedExponentialBackoffTimeoutStrategy; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.version.DefaultIgniteProductVersionSource; import org.apache.ignite.internal.worker.CriticalWorkerWatchdog; @@ -655,7 +656,8 @@ public CompletableFuture invoke(Condition condition, List su lowWatermark, threadPoolsManager.commonScheduler(), failureProcessor, - metricManager + metricManager, + new KeyBasedExponentialBackoffTimeoutStrategy(20) ); ResourceVacuumManager resourceVacuumManager = new ResourceVacuumManager( diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index f5c29d42a915..fd0a020dd35c 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -291,6 +291,7 @@ import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage; import org.apache.ignite.internal.tx.message.TxMessageGroup; import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage; +import org.apache.ignite.internal.util.KeyBasedExponentialBackoffTimeoutStrategy; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.vault.persistence.PersistentVaultService; import org.apache.ignite.internal.version.DefaultIgniteProductVersionSource; @@ -1078,7 +1079,8 @@ public class IgniteImpl implements Ignite { lowWatermark, threadPoolsManager.commonScheduler(), failureManager, - metricManager + metricManager, + new KeyBasedExponentialBackoffTimeoutStrategy(20) ); sharedTxStateStorage = new TxStateRocksDbSharedStorage( diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java index 5db917e662c7..d3fecc2bc5c9 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java @@ -99,6 +99,7 @@ import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; import org.apache.ignite.internal.type.NativeTypes; import org.apache.ignite.internal.type.StructNativeType; +import org.apache.ignite.internal.util.NoopTimeoutStrategy; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.table.QualifiedName; import org.apache.ignite.table.QualifiedNameHelper; @@ -195,7 +196,8 @@ public void testScanNodeDataPropagation() throws InterruptedException { transactionInflights, new TestLowWatermark(), commonExecutor, - new NoOpMetricManager() + new NoOpMetricManager(), + new NoopTimeoutStrategy(20) ); assertThat(txManager.startAsync(new ComponentContext()), willCompleteSuccessfully()); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java index 57cd8afee8aa..ace617d10046 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java @@ -56,6 +56,7 @@ import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage; import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; import org.apache.ignite.internal.type.NativeTypes; +import org.apache.ignite.internal.util.KeyBasedExponentialBackoffTimeoutStrategy; import org.apache.ignite.raft.jraft.test.TestUtils; import org.apache.ignite.table.RecordView; import org.apache.ignite.table.Tuple; @@ -167,7 +168,8 @@ protected TxManagerImpl newTxManager( transactionInflights, lowWatermark, commonExecutor, - new TestMetricManager() + new TestMetricManager(), + new KeyBasedExponentialBackoffTimeoutStrategy(20) ); } }; diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java index b3cc95884c5e..1c52f19dc710 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java @@ -77,6 +77,7 @@ import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage; import org.apache.ignite.internal.tx.message.TableWriteIntentSwitchReplicaRequest; import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; +import org.apache.ignite.internal.util.KeyBasedExponentialBackoffTimeoutStrategy; import org.apache.ignite.internal.util.Lazy; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.table.QualifiedName; @@ -156,7 +157,8 @@ protected TxManagerImpl newTxManager( transactionInflights, lowWatermark, commonExecutor, - new TestMetricManager() + new TestMetricManager(), + new KeyBasedExponentialBackoffTimeoutStrategy(20) ) { @Override public CompletableFuture executeWriteIntentSwitchAsync(Runnable runnable) { diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java index 6a5167526bbf..24c4f7366b80 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java @@ -120,6 +120,7 @@ import org.apache.ignite.internal.tx.test.TestTransactionIds; import org.apache.ignite.internal.type.NativeTypes; import org.apache.ignite.internal.util.CollectionUtils; +import org.apache.ignite.internal.util.KeyBasedExponentialBackoffTimeoutStrategy; import org.apache.ignite.sql.ColumnType; import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.table.QualifiedName; @@ -224,7 +225,8 @@ static void beforeAllTests() { transactionInflights, new TestLowWatermark(), commonExecutor, - new NoOpMetricManager() + new NoOpMetricManager(), + new KeyBasedExponentialBackoffTimeoutStrategy(20) ) { @Override public CompletableFuture finish( diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java index d108b2372bb0..29dcdb4c4e08 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java @@ -184,6 +184,7 @@ import org.apache.ignite.internal.tx.storage.state.test.TestTxStatePartitionStorage; import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.KeyBasedExponentialBackoffTimeoutStrategy; import org.apache.ignite.internal.util.Lazy; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.internal.util.SafeTimeValuesTracker; @@ -628,7 +629,8 @@ protected TxManagerImpl newTxManager( lowWatermark, executor, new NoOpFailureManager(), - new TestMetricManager() + new TestMetricManager(), + new KeyBasedExponentialBackoffTimeoutStrategy(20) ); } @@ -1294,7 +1296,8 @@ private void initializeClientTxComponents() { lowWatermark, executor, new NoOpFailureManager(), - new TestMetricManager() + new TestMetricManager(), + new KeyBasedExponentialBackoffTimeoutStrategy(20) ); clientResourceVacuumManager = new ResourceVacuumManager( diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java index 3c9cd225edfa..1a1c63d62c62 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java @@ -144,6 +144,7 @@ import org.apache.ignite.internal.tx.storage.state.test.TestTxStateStorage; import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; import org.apache.ignite.internal.util.Lazy; +import org.apache.ignite.internal.util.NoopTimeoutStrategy; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.internal.util.SafeTimeValuesTracker; import org.apache.ignite.network.NetworkAddress; @@ -702,7 +703,8 @@ public static TxManagerImpl txManager( transactionInflights, new TestLowWatermark(), COMMON_SCHEDULER, - new NoOpMetricManager() + new NoOpMetricManager(), + new NoopTimeoutStrategy(20) ); assertThat(txManager.startAsync(new ComponentContext()), willCompleteSuccessfully()); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java index c48d4e655fe9..8e83f47576f1 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.tx.impl; -import static java.lang.Math.min; import static java.util.concurrent.CompletableFuture.allOf; import static java.util.function.Function.identity; import static java.util.stream.Collectors.toList; @@ -57,6 +56,7 @@ import org.apache.ignite.internal.tx.message.TxCleanupMessageResponse; import org.apache.ignite.internal.tx.message.TxMessageGroup; import org.apache.ignite.internal.util.CompletableFutures; +import org.apache.ignite.internal.util.TimeoutStrategy; import org.jetbrains.annotations.Nullable; /** @@ -65,9 +65,6 @@ public class TxCleanupRequestSender { private static final int ATTEMPTS_LOG_THRESHOLD = 100; - private static final int RETRY_INITIAL_TIMEOUT_MS = 20; - private static final int RETRY_MAX_TIMEOUT_MS = 30_000; - private final IgniteThrottledLogger throttledLog; /** Placement driver helper. */ @@ -87,6 +84,8 @@ public class TxCleanupRequestSender { /** Executor that is used to schedule retries of cleanup messages in case of retryable errors. */ private final ScheduledExecutorService retryExecutor; + private final TimeoutStrategy timeoutStrategy; + /** * The constructor. * @@ -95,13 +94,15 @@ public class TxCleanupRequestSender { * @param txStateVolatileStorage Volatile transaction state storage. * @param cleanupExecutor Cleanup executor. * @param commonScheduler Common scheduler. + * @param timeoutStrategy Timout strategy. */ public TxCleanupRequestSender( TxMessageSender txMessageSender, PlacementDriverHelper placementDriverHelper, VolatileTxStateMetaStorage txStateVolatileStorage, ExecutorService cleanupExecutor, - ScheduledExecutorService commonScheduler + ScheduledExecutorService commonScheduler, + TimeoutStrategy timeoutStrategy ) { this.txMessageSender = txMessageSender; this.placementDriverHelper = placementDriverHelper; @@ -109,6 +110,7 @@ public TxCleanupRequestSender( this.cleanupExecutor = cleanupExecutor; this.retryExecutor = commonScheduler; this.throttledLog = toThrottledLogger(Loggers.forClass(TxCleanupRequestSender.class), commonScheduler); + this.timeoutStrategy = timeoutStrategy; } /** @@ -188,7 +190,7 @@ private void markTxnCleanupReplicated(UUID txId, TxState state, ZonePartitionId * @return Completable future of Void. */ public CompletableFuture cleanup(ZonePartitionId commitPartitionId, String node, UUID txId) { - return sendCleanupMessageWithRetries(commitPartitionId, false, null, txId, node, null, RETRY_INITIAL_TIMEOUT_MS, 0); + return sendCleanupMessageWithRetries(commitPartitionId, false, null, txId, node, null); } /** @@ -225,7 +227,7 @@ public CompletableFuture cleanup( enlistedPartitionGroups.add(new EnlistedPartitionGroup(partitionId, partition.tableIds())); }); - return cleanupPartitions(commitPartitionId, partitionsByPrimaryName, commit, commitTimestamp, txId, RETRY_INITIAL_TIMEOUT_MS, 0); + return cleanupPartitions(commitPartitionId, partitionsByPrimaryName, commit, commitTimestamp, txId); } /** @@ -244,18 +246,6 @@ public CompletableFuture cleanup( boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId - ) { - return cleanup(commitPartitionId, partitions, commit, commitTimestamp, txId, RETRY_INITIAL_TIMEOUT_MS, 0); - } - - private CompletableFuture cleanup( - @Nullable ZonePartitionId commitPartitionId, - Collection partitions, - boolean commit, - @Nullable HybridTimestamp commitTimestamp, - UUID txId, - long timeout, - int attemptsMade ) { Map partitionIds = partitions.stream() .collect(toMap(EnlistedPartitionGroup::groupId, identity())); @@ -276,9 +266,7 @@ private CompletableFuture cleanup( commit, commitTimestamp, txId, - toPartitionInfos(partitionData.partitionsWithoutPrimary, partitionIds), - timeout, - attemptsMade + toPartitionInfos(partitionData.partitionsWithoutPrimary, partitionIds) ); Map> partitionsByPrimaryName = toPartitionInfosByPrimaryName( @@ -290,9 +278,7 @@ private CompletableFuture cleanup( partitionsByPrimaryName, commit, commitTimestamp, - txId, - timeout, - attemptsMade + txId ); }); } @@ -319,9 +305,7 @@ private void cleanupPartitionsWithoutPrimary( boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId, - List partitionsWithoutPrimary, - long timeout, - int attemptsMade + List partitionsWithoutPrimary ) { Map partitionIds = partitionsWithoutPrimary.stream() .collect(toMap(EnlistedPartitionGroup::groupId, identity())); @@ -339,9 +323,7 @@ private void cleanupPartitionsWithoutPrimary( partitionsByPrimaryName, commit, commitTimestamp, - txId, - timeout, - attemptsMade + txId ); }); } @@ -351,9 +333,7 @@ private CompletableFuture cleanupPartitions( Map> partitionsByNode, boolean commit, @Nullable HybridTimestamp commitTimestamp, - UUID txId, - long timeout, - int attemptsMade + UUID txId ) { List> cleanupFutures = new ArrayList<>(); @@ -362,7 +342,7 @@ private CompletableFuture cleanupPartitions( List nodePartitions = entry.getValue(); cleanupFutures.add(sendCleanupMessageWithRetries(commitPartitionId, commit, commitTimestamp, txId, node, - commitPartitionId == null ? null : nodePartitions, timeout, attemptsMade)); + commitPartitionId == null ? null : nodePartitions)); } return allOf(cleanupFutures.toArray(new CompletableFuture[0])); @@ -374,9 +354,7 @@ private CompletableFuture sendCleanupMessageWithRetries( @Nullable HybridTimestamp commitTimestamp, UUID txId, String node, - @Nullable Collection partitions, - long timeout, - int attemptsMade + @Nullable Collection partitions ) { return txMessageSender.cleanup(node, partitions, txId, commit, commitTimestamp) .thenApply(response -> { @@ -389,9 +367,11 @@ private CompletableFuture sendCleanupMessageWithRetries( return response; }) .handleAsync((networkMessage, throwable) -> { + String timeoutKey = commitPartitionId == null ? txId.toString() : commitPartitionId.toString(); + if (throwable != null) { if (ReplicatorRecoverableExceptions.isRecoverable(throwable)) { - if (attemptsMade > ATTEMPTS_LOG_THRESHOLD) { + if (timeoutStrategy.getCurrent(node).getAttempt() > ATTEMPTS_LOG_THRESHOLD) { throttledLog.warn( "Unsuccessful transaction cleanup after {} attempts, keep retrying [txId={}]", throwable, @@ -410,15 +390,18 @@ private CompletableFuture sendCleanupMessageWithRetries( if (partitions == null) { // If we don't have any partition, which is the recovery or "unlock only" case, // just try again with the same node. - return sendCleanupMessageWithRetries( - commitPartitionId, - commit, - commitTimestamp, - txId, - node, - partitions, - incrementTimeout(timeout), - attemptsMade + 1 + return scheduleRetry( + () -> sendCleanupMessageWithRetries( + commitPartitionId, + commit, + commitTimestamp, + txId, + node, + partitions + ), + timeoutStrategy.next(timeoutKey), + TimeUnit.MILLISECONDS, + retryExecutor ); } @@ -430,11 +413,9 @@ private CompletableFuture sendCleanupMessageWithRetries( partitions, commit, commitTimestamp, - txId, - incrementTimeout(timeout), - attemptsMade + 1 + txId ), - timeout, + timeoutStrategy.next(timeoutKey), TimeUnit.MILLISECONDS, retryExecutor ); @@ -443,15 +424,13 @@ private CompletableFuture sendCleanupMessageWithRetries( return CompletableFuture.failedFuture(throwable); } + timeoutStrategy.reset(timeoutKey); + return CompletableFutures.nullCompletedFuture(); }, cleanupExecutor) .thenCompose(v -> v); } - private static long incrementTimeout(long currentTimeout) { - return min(currentTimeout * 2, RETRY_MAX_TIMEOUT_MS); - } - private static class CleanupContext { private final ZonePartitionId commitPartitionId; diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index 0ea57518cdb9..e84123778149 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -40,6 +40,7 @@ import static org.apache.ignite.internal.tx.TxStateMeta.builder; import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import static org.apache.ignite.internal.util.IgniteUtils.scheduleRetry; import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; import java.util.ArrayList; @@ -119,6 +120,7 @@ import org.apache.ignite.internal.tx.views.TransactionsViewProvider; import org.apache.ignite.internal.util.CompletableFutures; import org.apache.ignite.internal.util.ExceptionUtils; +import org.apache.ignite.internal.util.TimeoutStrategy; import org.apache.ignite.lang.ErrorGroups.Common; import org.apache.ignite.tx.TransactionException; import org.jetbrains.annotations.Nullable; @@ -240,6 +242,8 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler, SystemVi private final ConcurrentLinkedQueue> stopFuts = new ConcurrentLinkedQueue<>(); + private final TimeoutStrategy timeoutStrategy; + /** * Test-only constructor. * @@ -276,7 +280,8 @@ public TxManagerImpl( TransactionInflights transactionInflights, LowWatermark lowWatermark, ScheduledExecutorService commonScheduler, - MetricManager metricManager + MetricManager metricManager, + TimeoutStrategy timeoutStrategy ) { this( clusterService.nodeName(), @@ -298,7 +303,8 @@ public TxManagerImpl( lowWatermark, commonScheduler, new FailureManager(new NoOpFailureHandler()), - metricManager + metricManager, + timeoutStrategy ); } @@ -322,6 +328,7 @@ public TxManagerImpl( * @param transactionInflights Transaction inflights. * @param lowWatermark Low watermark. * @param metricManager Metric manager. + * @param timeoutStrategy Timeout strategy. */ public TxManagerImpl( String nodeName, @@ -343,7 +350,8 @@ public TxManagerImpl( LowWatermark lowWatermark, ScheduledExecutorService commonScheduler, FailureProcessor failureProcessor, - MetricManager metricManager + MetricManager metricManager, + TimeoutStrategy timeoutStrategy ) { this.txConfig = txConfig; this.systemCfg = systemCfg; @@ -405,10 +413,13 @@ public TxManagerImpl( placementDriverHelper, txStateVolatileStorage, writeIntentSwitchPool, - commonScheduler + commonScheduler, + timeoutStrategy ); txMetrics = new TransactionMetricsSource(clockService); + + this.timeoutStrategy = timeoutStrategy; } @Override @@ -822,7 +833,7 @@ private CompletableFuture trackFuture(CompletableFuture fut) { */ private CompletableFuture durableFinish( HybridTimestampTracker observableTimestampTracker, - ZonePartitionId commitPartition, + @Nullable ZonePartitionId commitPartition, boolean commit, Map enlistedPartitions, UUID txId, @@ -843,6 +854,8 @@ private CompletableFuture durableFinish( txFinishFuture )) .handle((res, ex) -> { + String timeoutKey = commitPartition == null ? txId.toString() : commitPartition.toString(); + if (ex != null) { Throwable cause = ExceptionUtils.unwrapRootCause(ex); @@ -867,14 +880,19 @@ private CompletableFuture durableFinish( LOG.debug("Failed to finish Tx. The operation will be retried {}.", ex, formatTxInfo(txId, txStateVolatileStorage)); - return supplyAsync(() -> durableFinish( - observableTimestampTracker, - commitPartition, - commit, - enlistedPartitions, - txId, - commitTimestamp, - txFinishFuture + return supplyAsync(() -> scheduleRetry( + () -> durableFinish( + observableTimestampTracker, + commitPartition, + commit, + enlistedPartitions, + txId, + commitTimestamp, + txFinishFuture + ), + timeoutStrategy.next(timeoutKey), + MILLISECONDS, + commonScheduler ), partitionOperationsExecutor).thenCompose(identity()); } else { LOG.warn("Failed to finish Tx {}.", ex, @@ -884,6 +902,8 @@ private CompletableFuture durableFinish( } } + timeoutStrategy.reset(timeoutKey); + return CompletableFutures.nullCompletedFuture(); }) .thenCompose(identity())); diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java index 1732625a81ea..bb5ee236d0d5 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java @@ -66,6 +66,7 @@ import org.apache.ignite.internal.tx.impl.TxCleanupRequestSender; import org.apache.ignite.internal.tx.impl.TxMessageSender; import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage; +import org.apache.ignite.internal.util.NoopTimeoutStrategy; import org.apache.ignite.network.NetworkAddress; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -148,7 +149,8 @@ public void setup() { placementDriverHelper, mock(VolatileTxStateMetaStorage.class), testSyncExecutorService(), - testSyncScheduledExecutorService() + testSyncScheduledExecutorService(), + new NoopTimeoutStrategy(20) ); } diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java index 698d5ce324f1..e3ad66acc26b 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java @@ -101,6 +101,7 @@ import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest; import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; import org.apache.ignite.internal.tx.test.TestTransactionIds; +import org.apache.ignite.internal.util.NoopTimeoutStrategy; import org.apache.ignite.lang.ErrorGroups.Transactions; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.tx.MismatchingTransactionOutcomeException; @@ -197,7 +198,8 @@ public void setup() { transactionInflights, lowWatermark, commonScheduler, - new TestMetricManager() + new TestMetricManager(), + new NoopTimeoutStrategy(20) ); assertThat(txManager.startAsync(new ComponentContext()), willCompleteSuccessfully()); From 2d6cca5d97d076896a2fc166ed7399ae512e9b2f Mon Sep 17 00:00:00 2001 From: ksirotkin Date: Wed, 4 Mar 2026 17:19:26 +0200 Subject: [PATCH 02/14] IGNITE-27685 Implement exponential backoff for durable finish --- .idea/codeStyles/Project.xml | 5 +- .../ignite/internal/util/IgniteUtils.java | 29 ---- ...asedExponentialBackoffTimeoutStrategy.java | 147 ------------------ .../ignite/internal/util/TimeoutStrategy.java | 91 ----------- .../util/retry/CommonRetryContext.java | 59 +++++++ .../ExponentialBackoffTimeoutStrategy.java | 104 +++++++++++++ .../util/retry/KeyBasedRetryContext.java | 83 ++++++++++ .../util/{ => retry}/NoopTimeoutStrategy.java | 33 +--- .../ignite/internal/util/retry/RetryUtil.java | 42 +++++ .../internal/util/retry/TimeoutState.java | 56 +++++++ .../internal/util/retry/TimeoutStrategy.java | 36 +++++ .../rebalance/ItRebalanceDistributedTest.java | 4 +- .../partition/replicator/fixtures/Node.java | 4 +- .../runner/app/ItIgniteNodeRestartTest.java | 4 +- .../ignite/internal/app/IgniteImpl.java | 4 +- .../exec/rel/TableScanNodeExecutionTest.java | 2 +- .../ignite/distributed/ItLockTableTest.java | 4 +- ...ributedTestSingleNodeNoCleanupMessage.java | 4 +- .../internal/table/ItColocationTest.java | 4 +- .../ignite/distributed/ItTxTestCluster.java | 6 +- .../table/impl/DummyInternalTableImpl.java | 2 +- .../tx/impl/TxCleanupRequestSender.java | 44 +++--- .../internal/tx/impl/TxManagerImpl.java | 21 +-- .../ignite/internal/tx/TxCleanupTest.java | 2 +- .../ignite/internal/tx/TxManagerTest.java | 2 +- 25 files changed, 443 insertions(+), 349 deletions(-) delete mode 100644 modules/core/src/main/java/org/apache/ignite/internal/util/KeyBasedExponentialBackoffTimeoutStrategy.java delete mode 100644 modules/core/src/main/java/org/apache/ignite/internal/util/TimeoutStrategy.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/util/retry/CommonRetryContext.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategy.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContext.java rename modules/core/src/main/java/org/apache/ignite/internal/util/{ => retry}/NoopTimeoutStrategy.java (58%) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/util/retry/RetryUtil.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/util/retry/TimeoutState.java create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/util/retry/TimeoutStrategy.java diff --git a/.idea/codeStyles/Project.xml b/.idea/codeStyles/Project.xml index bf6498d4532f..a05b377407ba 100644 --- a/.idea/codeStyles/Project.xml +++ b/.idea/codeStyles/Project.xml @@ -42,7 +42,7 @@ - + @@ -112,7 +112,7 @@ - + @@ -200,7 +200,6 @@