diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index f67cb78720cd..63cf65f443dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -51,7 +51,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -60,7 +59,6 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; @@ -1341,35 +1339,6 @@ public static Object[] flatArray(Object... arguments) { return list.toArray(); } - /** - * Schedules the provided operation to be retried after the specified delay. - * - * @param operation Operation. - * @param delay Delay. - * @param unit Time unit of the delay. - * @param executor Executor to schedule the retry in. - * @return Future that is completed when the operation is successful or failed with an exception. - */ - public static CompletableFuture scheduleRetry( - Callable> operation, - long delay, - TimeUnit unit, - ScheduledExecutorService executor - ) { - CompletableFuture future = new CompletableFuture<>(); - - executor.schedule(() -> operation.call() - .whenComplete((res, e) -> { - if (e == null) { - future.complete(res); - } else { - future.completeExceptionally(e); - } - }), delay, unit); - - return future; - } - private static CompletableFuture startAsync(ComponentContext componentContext, Stream components) { return allOf(components .filter(Objects::nonNull) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategy.java b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategy.java new file mode 100644 index 000000000000..94aa81425a97 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategy.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.retry; + +import java.util.concurrent.ThreadLocalRandom; + +/** + * A {@link TimeoutStrategy} that increases retry timeouts exponentially on each attempt. + * + *

Each call to {@link #next(int)} multiplies the current timeout by {@code backoffCoefficient}, + * capping the result at {@link #maxTimeout()}. Optionally, random jitter can be applied to spread + * retry attempts across time and avoid thundering herd problems under high concurrency. + * + *

When jitter is enabled, the returned timeout is randomized within the range + * {@code [raw / 2, raw * 1.5]}, then capped at {@link #maxTimeout()}. + * + *

This class is stateless and thread-safe. A single instance can be shared across + * multiple retry contexts. + */ +public class ExponentialBackoffTimeoutStrategy implements TimeoutStrategy { + /** Default backoff coefficient applied on each retry step. Doubles the timeout per attempt. */ + private static final double DEFAULT_BACKOFF_COEFFICIENT = 2.0; + + /** + * Multiplier applied to the current timeout on each call to {@link #next(int)}. + * Must be greater than {@code 1.0} to produce a growing sequence. + */ + private final double backoffCoefficient; + + /** + * Whether to apply random jitter to the computed timeout. + * When {@code true}, the result is randomized within {@code [raw / 2, raw * 1.5]}. + */ + private final boolean jitter; + + /** + * Upper bound for the timeout produced by this strategy, in milliseconds. + * The result of {@link #next(int)} is always capped at this value. + */ + private final int maxTimeout; + + /** + * Creates a strategy with default max timeout and backoff coefficient, and no jitter. + * + * @see TimeoutStrategy#DEFAULT_TIMEOUT_MS_MAX + */ + public ExponentialBackoffTimeoutStrategy() { + this(DEFAULT_TIMEOUT_MS_MAX, DEFAULT_BACKOFF_COEFFICIENT); + } + + /** + * Creates a strategy with the given max timeout and backoff coefficient, and no jitter. + * + * @param maxTimeout maximum timeout this strategy may produce, in milliseconds. + * @param backoffCoefficient multiplier applied to the current timeout on each step. + * Must be greater than {@code 1.0}. + */ + public ExponentialBackoffTimeoutStrategy( + int maxTimeout, + double backoffCoefficient + ) { + this(maxTimeout, backoffCoefficient, false); + } + + /** + * Creates a strategy with the given max timeout, backoff coefficient, and jitter setting. + * + * @param maxTimeout maximum timeout this strategy may produce, in milliseconds. + * @param backoffCoefficient multiplier applied to the current timeout on each step. + * Must be greater than {@code 1.0}. + * @param jitter if {@code true}, random jitter is applied to each computed timeout. + */ + public ExponentialBackoffTimeoutStrategy( + int maxTimeout, + double backoffCoefficient, + boolean jitter + ) { + this.maxTimeout = maxTimeout; + this.backoffCoefficient = backoffCoefficient; + this.jitter = jitter; + } + + /** + * Computes the next retry timeout by multiplying {@code currentTimeout} by + * {@link #backoffCoefficient}, then capping at {@link #maxTimeout}. + * If jitter is enabled, the result is further randomized via {@link #applyJitter(int)}. + * + * @param currentTimeout current retry timeout in milliseconds. + * @return next retry timeout in milliseconds, capped at {@link #maxTimeout}. + */ + @Override + public int next(int currentTimeout) { + int raw = (int) Math.min((currentTimeout * backoffCoefficient), maxTimeout); + + return jitter ? applyJitter(raw) : raw; + } + + /** {@inheritDoc} */ + @Override + public int maxTimeout() { + return maxTimeout; + } + + /** + * Applies random jitter to the given timeout value. + * + *

The result is uniformly distributed within {@code [raw / 2, raw * 1.5]}, + * then capped at {@link #maxTimeout} to ensure the strategy ceiling is never exceeded. + * + * @param raw computed timeout before jitter, in milliseconds. + * @return jittered timeout in milliseconds, capped at {@link #maxTimeout}. + */ + private int applyJitter(int raw) { + int lo = raw / 2; + int hi = raw + lo; + + return Math.min(lo + ThreadLocalRandom.current().nextInt(hi - lo + 1), maxTimeout); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContext.java new file mode 100644 index 000000000000..4e0edf2fd95e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContext.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.retry; + +import static java.util.Collections.unmodifiableMap; +import static java.util.Optional.of; +import static java.util.Optional.ofNullable; +import static org.apache.ignite.internal.util.retry.TimeoutState.attempt; +import static org.apache.ignite.internal.util.retry.TimeoutState.timeout; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import org.jetbrains.annotations.TestOnly; + +/** + * A retry context that tracks timeout state independently per key. + * + *

Each key maps to its own {@link TimeoutState}, allowing separate backoff progression + * for different retry targets — for example, different replication group IDs or transaction IDs. + * State updates are performed atomically per key using {@link ConcurrentHashMap#compute}. + * + *

To prevent unbounded memory growth, the registry is capped at {@link #REGISTRY_SIZE_LIMIT} + * entries. Once the limit is reached, untracked keys receive a fixed {@link #fallbackTimeoutState} + * that always returns {@link TimeoutStrategy#maxTimeout()}. The limit is a soft cap and may be + * slightly exceeded under concurrent insertions. + * + *

This class is thread-safe. + */ +public class KeyBasedRetryContext { + /** + * Maximum number of keys tracked in {@link #registry}. + * Can be slightly exceeded under concurrent insertions. See class-level Javadoc. + */ + private static final int REGISTRY_SIZE_LIMIT = 1_000; + + /** + * Timeout used when creating a new {@link TimeoutState} for a key that has no prior state. + * Also used as the reset value when a key's state is removed. + */ + private final int initialTimeout; + + /** Strategy used to compute the next timeout from the current one on each advancement. */ + private final TimeoutStrategy timeoutStrategy; + + /** + * Sentinel state returned for keys that cannot be tracked because the registry is full. + * Initialized with {@link TimeoutStrategy#maxTimeout()} and attempt {@code -1} to distinguish + * it from legitimately tracked states. + */ + private final TimeoutState fallbackTimeoutState; + + /** Per-key timeout state registry. Keys are typically transaction IDs or replication group IDs. */ + private final ConcurrentHashMap registry = new ConcurrentHashMap<>(); + + /** + * Creates a new context with the given initial timeout and strategy. + * + * @param initialTimeout timeout used for the first retry attempt of any new key, in milliseconds. + * @param timeoutStrategy strategy used to compute subsequent timeout values. + */ + public KeyBasedRetryContext(int initialTimeout, TimeoutStrategy timeoutStrategy) { + this.initialTimeout = initialTimeout; + this.timeoutStrategy = timeoutStrategy; + + this.fallbackTimeoutState = new TimeoutState(timeoutStrategy.maxTimeout(), -1); + } + + /** + * Returns the current {@link TimeoutState} for the given key, if tracked. + * + *

Returns an empty {@link Optional} if the key has no recorded state yet. + * If the registry is full and the key is not already tracked, returns + * {@link Optional} containing the {@link #fallbackTimeoutState}. + * + *

This method does not insert the key into the registry. + * + * @param key the key to look up, typically a transaction ID or replication group ID. + * @return current state for the key, fallback state if registry is full, or empty if not tracked. + */ + public Optional getState(String key) { + if (!registry.containsKey(key) && registry.size() >= REGISTRY_SIZE_LIMIT) { + return of(fallbackTimeoutState); + } + + return ofNullable(registry.get(key)); + } + + /** + * Atomically advances the retry state for the given key and returns the updated state. + * + *

If the key has no prior state, a new {@link TimeoutState} is created with + * {@link #initialTimeout} and attempt count {@code 1}. Otherwise, the timeout is + * advanced using {@link TimeoutStrategy#next(int)} and the attempt count is incremented. + * + *

The update is performed inside {@link ConcurrentHashMap#compute}, which holds + * an exclusive per-key lock for the duration of the lambda. The CAS on the + * {@link TimeoutState}'s internal {@link AtomicLong} is therefore always expected + * to succeed on the first attempt within the lambda. + * + *

If the registry is full and the key is not already tracked, returns + * {@link #fallbackTimeoutState} without modifying the registry. + * + * @param key the key to advance state for, typically a transaction ID or replication group ID. + * @return updated {@link TimeoutState} for the key, or {@link #fallbackTimeoutState} + * if the registry is full. + */ + public TimeoutState updateAndGetState(String key) { + if (!registry.containsKey(key) && registry.size() >= REGISTRY_SIZE_LIMIT) { + return fallbackTimeoutState; + } + + return registry.compute(key, (k, state) -> { + if (state == null) { + return new TimeoutState(initialTimeout, 1); + } + + long currentState = state.getRawState(); + state.update(currentState, timeoutStrategy.next(timeout(currentState)), attempt(currentState) + 1); + + return state; + }); + } + + /** + * Removes the retry state for the given key, resetting it as if no retries had occurred. + * + *

The next call to {@link #updateAndGetState(String)} for this key after a reset + * will create fresh state starting from {@link #initialTimeout}. + * + * @param key the key whose state should be removed. + */ + public void resetState(String key) { + registry.remove(key); + } + + /** + * Returns an unmodifiable snapshot of the current registry contents. + * + *

The snapshot is a point-in-time copy of the registry map. The returned + * {@link TimeoutState} values are live references — their internal state may + * continue to change concurrently after the snapshot is taken. + * + *

This method is intended for testing only and should not be used in production code. + * + * @return unmodifiable copy of the current key-to-state mappings. + */ + @TestOnly + public Map snapshot() { + return unmodifiableMap(new HashMap<>(registry)); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/retry/NoopTimeoutStrategy.java b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/NoopTimeoutStrategy.java new file mode 100644 index 000000000000..ed3268365e26 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/NoopTimeoutStrategy.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.retry; + +/** + * A {@link TimeoutStrategy} that returns the current timeout unchanged on every call. + * + *

Useful when retry backoff is not desired — for example, in tests or when a flat + * retry interval is intentional. The timeout passed to {@link #next(int)} is returned + * as-is, so the retry interval remains constant across all attempts. + * + *

This class is stateless and thread-safe. + */ +public class NoopTimeoutStrategy implements TimeoutStrategy { + /** + * Returns {@code currentTimeout} unchanged. + * + * @param currentTimeout current retry timeout in milliseconds. + * @return the same {@code currentTimeout} value, unmodified. + */ + @Override + public int next(int currentTimeout) { + return currentTimeout; + } + + /** + * {@inheritDoc} + */ + @Override + public int maxTimeout() { + return DEFAULT_TIMEOUT_MS_MAX; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/retry/RetryUtil.java b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/RetryUtil.java new file mode 100644 index 000000000000..1c0aa63e3160 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/RetryUtil.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.retry; + +import static java.util.Optional.empty; + +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Utility class for scheduling asynchronous retry operations. + * + *

Provides overloaded {@code scheduleRetry} methods that schedule an async operation + * to run after a delay, optionally invoking callbacks on success or failure. The returned + * {@link CompletableFuture} is completed when the scheduled operation completes. + * + *

This class is not intended to manage the retry loop itself — it schedules a single + * delayed execution. The caller is responsible for chaining successive calls to build a + * full retry loop, typically driven by a retry context and a timeout strategy. + */ +public class RetryUtil { + /** + * Schedules the provided operation to run once after the specified delay. + * + *

No callbacks are invoked on completion. Equivalent to calling + * {@link #scheduleRetry(Callable, long, TimeUnit, ScheduledExecutorService, Optional, Optional)} + * with both optional callbacks empty. + * + * @param result type of the operation. + * @param operation the async operation to schedule. Must return a non-null {@link CompletableFuture}. + * @param delay delay before execution. + * @param unit time unit of {@code delay}. + * @param executor executor used to schedule the operation. + * @return a {@link CompletableFuture} completed with the operation's result on success, + * or completed exceptionally if the operation fails. + */ + public static CompletableFuture scheduleRetry( + Callable> operation, + long delay, + TimeUnit unit, + ScheduledExecutorService executor + ) { + return scheduleRetry(operation, delay, unit, executor, empty(), empty()); + } + + /** + * Schedules the provided operation to run once after the specified delay, + * invoking a callback on successful completion. + * + *

The {@code onSuccessfulComplete} callback is invoked only if the operation + * completes without an exception. Equivalent to calling the full overload with + * an empty {@code onFailure}. + * + * @param result type of the operation. + * @param operation the async operation to schedule. + * @param delay delay before execution. + * @param unit time unit of {@code delay}. + * @param executor executor used to schedule the operation. + * @param onSuccessfulComplete optional callback invoked when the operation succeeds, + * for example to reset a retry context. + * @return a {@link CompletableFuture} completed with the operation's result on success, + * or completed exceptionally if the operation fails. + */ + public static CompletableFuture scheduleRetry( + Callable> operation, + long delay, + TimeUnit unit, + ScheduledExecutorService executor, + Optional onSuccessfulComplete + ) { + return scheduleRetry(operation, delay, unit, executor, onSuccessfulComplete, empty()); + } + + /** + * Schedules the provided operation to run once after the specified delay, + * invoking separate callbacks on success and failure. + * + *

{@code onSuccessfulComplete} is called only if the operation's future completes + * without an exception. {@code onFailure} is called only if it completes exceptionally. + * Neither callback is invoked if the other fires. + * + * @param result type of the operation. + * @param operation the async operation to schedule. Must return a non-null + * {@link CompletableFuture}. + * @param delay delay before execution. + * @param unit time unit of {@code delay}. + * @param executor executor used to schedule the operation. + * @param onSuccessfulComplete optional callback invoked on success, for example to + * reset a retry context after a successful attempt. + * @param onFailure optional callback invoked on failure, for example to + * record a failed attempt or trigger alerting. + * @return a {@link CompletableFuture} completed with the operation's result on success, + * or completed exceptionally if the operation fails. + */ + public static CompletableFuture scheduleRetry( + Callable> operation, + long delay, + TimeUnit unit, + ScheduledExecutorService executor, + Optional onSuccessfulComplete, + Optional onFailure + ) { + CompletableFuture future = new CompletableFuture<>(); + + executor.schedule(() -> operation.call() + .whenComplete((res, e) -> { + if (e == null) { + future.complete(res); + + onSuccessfulComplete.ifPresent(Runnable::run); + } else { + future.completeExceptionally(e); + + onFailure.ifPresent(Runnable::run); + } + }), delay, unit); + + return future; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/retry/SharedRetryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/SharedRetryContext.java new file mode 100644 index 000000000000..49e73f02b3c8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/SharedRetryContext.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.retry; + +import static java.util.Optional.ofNullable; +import static org.apache.ignite.internal.util.retry.TimeoutState.attempt; +import static org.apache.ignite.internal.util.retry.TimeoutState.timeout; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A retry context that tracks a single shared timeout state across all callers. + * + *

Unlike {@link KeyBasedRetryContext}, this context does not distinguish between + * retry targets — all callers advance and observe the same {@link TimeoutState}. + * This is appropriate when a single backoff sequence should govern all retries + * regardless of which operation is being retried. + * + *

The state is initialized lazily on the first call to {@link #updateAndGetState()}, + * and can be reset to {@code null} via {@link #resetState()}, allowing the progression + * to restart from scratch. {@link #getState()} returns an empty {@link Optional} before + * the first call and after a reset. + * + *

Concurrent calls to {@link #updateAndGetState()} and {@link #resetState()} are safe. + * The {@link AtomicReference} handles structural transitions ({@code null ↔ initialized}), + * while the {@link TimeoutState}'s internal {@link AtomicLong} CAS handles concurrent + * value updates without allocating new objects on the hot path. + * + *

This class is thread-safe. + */ +public class SharedRetryContext { + /** Timeout used to initialize the state on the first call to {@link #updateAndGetState()}. */ + private final int initialTimeout; + + /** Strategy used to compute the next timeout from the current one on each advancement. */ + private final TimeoutStrategy timeoutStrategy; + + /** + * Holds the single shared retry state. {@code null} before the first update and after reset. + * The {@link TimeoutState} object itself is reused across updates to avoid allocation pressure. + */ + private final AtomicReference timeoutState = new AtomicReference<>(); + + /** + * Creates a new context with the given initial timeout and strategy. + * + * @param initialTimeout timeout returned on the very first call to {@link #updateAndGetState()}, + * in milliseconds. + * @param timeoutStrategy strategy used to compute subsequent timeout values. + */ + public SharedRetryContext(int initialTimeout, TimeoutStrategy timeoutStrategy) { + this.initialTimeout = initialTimeout; + this.timeoutStrategy = timeoutStrategy; + } + + /** + * Returns the current shared {@link TimeoutState}, if initialized. + * + *

Returns an empty {@link Optional} if {@link #updateAndGetState()} has never been called, + * or if {@link #resetState()} was called after the last update. + * + * @return current state, or empty if not yet initialized or already reset. + */ + public Optional getState() { + return ofNullable(timeoutState.get()); + } + + /** + * Atomically advances the shared retry state and returns it. + * + *

On the first call (or after a reset), lazily initializes the {@link TimeoutState} + * with {@link #initialTimeout} and returns it with attempt count {@code 1} and the + * initial timeout. On subsequent calls, advances the timeout using + * {@link TimeoutStrategy#next(int)} and increments the attempt count. + * + *

Initialization uses {@link AtomicReference#compareAndSet} to handle concurrent + * first calls safely — only one thread creates the state object. Advancement uses + * a CAS loop on the {@link TimeoutState}'s internal {@link AtomicLong}, so no new + * objects are allocated on the hot path. + * + *

If {@link #resetState()} races with this method between initialization and + * advancement, the outer loop retries transparently. + * + * @return the shared {@link TimeoutState} after advancement. + */ + public TimeoutState updateAndGetState() { + while (true) { + if (timeoutState.get() == null) { + timeoutState.compareAndSet(null, new TimeoutState(initialTimeout, 0)); + } + + TimeoutState state = timeoutState.get(); + if (state == null) { + continue; // reset raced us, retry + } + + long raw; + int nextTimeout; + do { + raw = state.getRawState(); + nextTimeout = attempt(raw) == 0 + ? initialTimeout + : timeoutStrategy.next(timeout(raw)); + } while (!state.update(raw, nextTimeout, attempt(raw) + 1)); + + return state; + } + } + + /** + * Resets the shared state to {@code null}, as if no retries had ever occurred. + * + *

The next call to {@link #updateAndGetState()} after a reset will re-initialize + * the state starting from {@link #initialTimeout} with attempt count {@code 1}. + */ + public void resetState() { + timeoutState.set(null); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/retry/TimeoutState.java b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/TimeoutState.java new file mode 100644 index 000000000000..50e29bec5414 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/TimeoutState.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.retry; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Mutable, thread-safe holder for retry timeout and attempt count. + * + *

Both fields are packed into a single {@link AtomicLong} to allow atomic + * compare-and-set updates of the combined state. The high 32 bits store the + * timeout in milliseconds; the low 32 bits store the attempt count. + * + *

This class intentionally does not override {@link Object#equals(Object)} or + * {@link Object#hashCode()}. Because the state is mutable, value-based equality + * would break the contracts required by {@link java.util.HashMap} and similar + * collections. Reference equality (the {@link Object} default) is correct here. + * + *

The static helper methods {@link #timeout(long)} and {@link #attempt(long)} + * are package-private to allow callers that hold a raw snapshot to extract fields + * without additional reads. + */ +public class TimeoutState { + /** + * Packed representation of timeout and attempt count. + * High 32 bits: timeout (ms). Low 32 bits: attempt count. + */ + private final AtomicLong state = new AtomicLong(); + + /** + * Creates a new {@code TimeoutState} with the given initial timeout and attempt count. + * + * @param timeout initial timeout in milliseconds. + * @param initialAttempt initial attempt count. Use {@code 0} as a sentinel to indicate + * "initialized but not yet advanced" when lazy initialization is needed. + */ + public TimeoutState(int timeout, int initialAttempt) { + state.set(pack(timeout, initialAttempt)); + } + + /** + * Returns the raw packed {@code long} representing the current state. + * + *

Callers that need both timeout and attempt count atomically should read this + * once and pass it to {@link #timeout(long)} and {@link #attempt(long)}, rather + * than calling {@link #getTimeout()} and {@link #getAttempt()} separately. + * + * @return raw packed state value. + */ + public long getRawState() { + return state.get(); + } + + /** + * Returns the current retry timeout in milliseconds. + * + *

This is a single atomic read. If both timeout and attempt are needed + * consistently, use {@link #getRawState()} instead. + * + * @return current timeout in milliseconds. + */ + public int getTimeout() { + return timeout(state.get()); + } + + /** + * Returns the current attempt count. + * + *

This is a single atomic read. If both timeout and attempt are needed + * consistently, use {@link #getRawState()} instead. + * + * @return current attempt count. + */ + public int getAttempt() { + return attempt(state.get()); + } + + /** + * Atomically updates this state if it still matches the expected raw snapshot. + * + *

This is a standard CAS (compare-and-set) operation. If the internal state + * has changed since {@code currentState} was read, the update is rejected and + * the caller is expected to retry by reading a fresh snapshot via {@link #getRawState()}. + * + * @param currentState expected current raw state, obtained from a prior {@link #getRawState()} call. + * @param newTimeout new timeout value in milliseconds. + * @param newAttempt new attempt count. + * @return {@code true} if the update succeeded; {@code false} if the state was + * concurrently modified and the update was rejected. + */ + public boolean update(long currentState, int newTimeout, int newAttempt) { + return state.compareAndSet(currentState, pack(newTimeout, newAttempt)); + } + + /** + * Packs timeout and attempt count into a single {@code long}. + * Timeout occupies the high 32 bits; attempt occupies the low 32 bits. + * + * @param timeout timeout in milliseconds. + * @param attempt attempt count. + * @return packed {@code long} value. + */ + static long pack(int timeout, int attempt) { + return ((long) timeout << 32) | (attempt & 0xFFFFFFFFL); + } + + /** + * Extracts the timeout from a packed raw state value. + * + * @param packed raw state value obtained from {@link #getRawState()} or {@link #pack(int, int)}. + * @return timeout in milliseconds. + */ + static int timeout(long packed) { + return (int) (packed >>> 32); + } + + /** + * Extracts the attempt count from a packed raw state value. + * + * @param packed raw state value obtained from {@link #getRawState()} or {@link #pack(int, int)}. + * @return attempt count. + */ + static int attempt(long packed) { + return (int) packed; + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/retry/TimeoutStrategy.java b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/TimeoutStrategy.java new file mode 100644 index 000000000000..7e56977d8e8d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/retry/TimeoutStrategy.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.retry; + +/** + * Stateless strategy for computing the next retry timeout based on the current one. + * + *

Implementations must be stateless and thread-safe — the same instance is expected + * to be shared across multiple callers and retry contexts. All mutable state (current + * timeout, attempt count) is maintained externally by the caller or a retry context. + * + * @see ExponentialBackoffTimeoutStrategy + * @see NoopTimeoutStrategy + */ +public interface TimeoutStrategy { + /** Default maximum timeout that a strategy may produce, in milliseconds. */ + int DEFAULT_TIMEOUT_MS_MAX = 11_000; + + /** + * Computes the next retry timeout based on the current one. + * + *

Implementations must not produce a value exceeding {@link #maxTimeout()}. + * The returned value is used directly as the delay before the next retry attempt. + * + * @param currentTimeout current retry timeout in milliseconds. + * @return next retry timeout in milliseconds, capped at {@link #maxTimeout()}. + */ + int next(int currentTimeout); + + /** + * Returns the maximum timeout this strategy can produce, in milliseconds. + * + *

Once the timeout reaches this ceiling, further calls to {@link #next(int)} + * must continue to return this value rather than exceeding it. + * + * @return maximum timeout in milliseconds. + */ + int maxTimeout(); +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategyTest.java new file mode 100644 index 000000000000..502494d2e415 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/retry/ExponentialBackoffTimeoutStrategyTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.retry; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Unit tests for {@link ExponentialBackoffTimeoutStrategy}. + * + *

Verifies the correctness of exponential timeout progression, the maximum timeout + * ceiling, and optional jitter behavior. Tests use predictable integer arithmetic to + * make expected values easy to verify by hand. + */ +public class ExponentialBackoffTimeoutStrategyTest { + /** Initial timeout passed to {@link TimeoutStrategy#next(int)} as the starting value. */ + private static final int INITIAL_TIMEOUT = 20; + + /** Maximum timeout the strategy is allowed to produce. */ + private static final int MAX_TIMEOUT = 100; + + /** Backoff coefficient used by the default strategy instance. Doubles each timeout step. */ + private static final double BACKOFF_COEFFICIENT = 2.0; + + /** Strategy instance under test, recreated before each test. */ + private TimeoutStrategy timeoutStrategy; + + /** + * Creates a fresh {@link ExponentialBackoffTimeoutStrategy} with {@link #MAX_TIMEOUT} + * and {@link #BACKOFF_COEFFICIENT}, without jitter, before each test. + */ + @BeforeEach + void setUp() { + timeoutStrategy = new ExponentialBackoffTimeoutStrategy(MAX_TIMEOUT, BACKOFF_COEFFICIENT); + } + + /** + * Verifies that a single call to {@link TimeoutStrategy#next(int)} returns + * {@code currentTimeout * backoffCoefficient}. + * + *

This is the core contract of the exponential strategy — each step multiplies + * the current timeout by the configured coefficient. + */ + @Test + void testGettingNextTimeout() { + assertEquals(BACKOFF_COEFFICIENT * INITIAL_TIMEOUT, timeoutStrategy.next(INITIAL_TIMEOUT)); + } + + /** + * Verifies that the timeout progression reaches {@link #MAX_TIMEOUT} within the + * expected number of steps and does not exceed it on subsequent calls. + * + *

The upper bound on steps is computed from the coefficient and the ratio of + * {@code MAX_TIMEOUT} to {@code INITIAL_TIMEOUT}. If the strategy fails to reach + * the cap within this bound, the test fails with a descriptive message. Once the + * cap is reached, a further call to {@link TimeoutStrategy#next(int)} must return + * exactly {@link #MAX_TIMEOUT}. + */ + @Test + void testMaxTimeoutNotExceeded() { + int maxSteps = 3; + int steps = 0; + + int timeout = INITIAL_TIMEOUT; + do { + timeout = timeoutStrategy.next(timeout); + + assertTrue(++steps <= maxSteps, + "Strategy did not reach MAX_TIMEOUT within expected steps, last timeout: " + timeout); + } while (timeout < MAX_TIMEOUT); + + assertEquals(MAX_TIMEOUT, timeout); + assertEquals(MAX_TIMEOUT, timeoutStrategy.next(timeout)); + } + + /** + * Verifies that when jitter is enabled, the returned timeout falls within the + * expected randomized range {@code [initialTimeout, initialTimeout * coefficient^2)}. + * + *

A separate strategy instance with jitter enabled is created for this test. + * The lower bound confirms the jitter does not produce values below the initial + * timeout; the upper bound confirms it does not jump more than two coefficient + * steps in a single call. + */ + @Test + void testJitterApplying() { + timeoutStrategy = new ExponentialBackoffTimeoutStrategy(MAX_TIMEOUT, BACKOFF_COEFFICIENT, true); + + int timeout = timeoutStrategy.next(INITIAL_TIMEOUT); + assertTrue( + INITIAL_TIMEOUT <= timeout && timeout < INITIAL_TIMEOUT * BACKOFF_COEFFICIENT * BACKOFF_COEFFICIENT, + "Timeout is out of range: " + timeout + ); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContextTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContextTest.java new file mode 100644 index 000000000000..70465f93b6f7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/retry/KeyBasedRetryContextTest.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.retry; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.toList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.stream.IntStream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +/** + * Unit tests for {@link KeyBasedRetryContext}. + * + *

Verifies per-key state isolation, sequential timeout progression, reset behavior, + * registry size limit enforcement, fallback state for overflow keys, and thread safety + * under concurrent updates to both the same and different keys. + * + *

A deterministic {@link TestProgressiveTimeoutStrategy} with a fixed multiplier is + * used to make expected timeout values easy to compute by hand. + */ +public class KeyBasedRetryContextTest { + /** Message included in exceptions thrown when an expected state is absent. */ + private static final String MISSING_STATE_MESSAGE = "TimeoutState is missing!"; + + /** + * Multiplier applied by {@link TestProgressiveTimeoutStrategy} on each step. + * Used to compute expected timeout values in assertions. + */ + private static final int MULTIPLYING_COEFFICIENT = 4; + + /** Initial timeout passed to the {@link KeyBasedRetryContext} under test. */ + private static final int INITIAL_TIMEOUT = 20; + + /** + * Maximum timeout configured in {@link TestProgressiveTimeoutStrategy}. + * The progression is capped at this value. + */ + private static final int MAX_TIMEOUT = 1_000; + + /** + * Registry size limit mirrored from {@link KeyBasedRetryContext}. + * Used in tests that fill the registry to verify fallback behavior. + */ + private static final int REGISTRY_SIZE_LIMIT = 1_000; + + /** Primary key used in single-key tests. */ + private static final String KEY = "key"; + + /** Secondary key used in isolation and reset tests. */ + private static final String OTHER_KEY = "other-key"; + + /** Retry context under test, recreated before each test. */ + private KeyBasedRetryContext retryContext; + + /** + * Creates a fresh {@link KeyBasedRetryContext} with {@link #INITIAL_TIMEOUT} and + * a {@link TestProgressiveTimeoutStrategy} before each test. + */ + @BeforeEach + void setUp() { + retryContext = new KeyBasedRetryContext(INITIAL_TIMEOUT, new TestProgressiveTimeoutStrategy()); + } + + /** + * Verifies that {@link KeyBasedRetryContext#getState(String)} returns an empty + * {@link java.util.Optional} for an untracked key, and that after the first call + * to {@link KeyBasedRetryContext#updateAndGetState(String)}, the state is present + * with {@link #INITIAL_TIMEOUT} and attempt count {@code 1}. + */ + @Test + void testGettingState() { + assertFalse(retryContext.getState(KEY).isPresent()); + + retryContext.updateAndGetState(KEY); + + assertTrue(retryContext.getState(KEY).isPresent()); + + TimeoutState state = retryContext.getState(KEY).get(); + + assertEquals(INITIAL_TIMEOUT, state.getTimeout()); + assertEquals(1, state.getAttempt()); + } + + /** + * Verifies that {@link KeyBasedRetryContext#updateAndGetState(String)} returns the + * same object reference as {@link KeyBasedRetryContext#getState(String)}, and that + * the timeout advances correctly after multiple calls for the same key. + * + *

After three updates, the expected timeout is + * {@code INITIAL_TIMEOUT * MULTIPLYING_COEFFICIENT^2} with attempt count {@code 3}. + */ + @Test + void testUpdatingAndGettingState() { + retryContext.updateAndGetState(KEY); + retryContext.updateAndGetState(KEY); + TimeoutState returnedState = retryContext.updateAndGetState(KEY); + TimeoutState observedState = retryContext.getState(KEY) + .orElseThrow(() -> new IllegalStateException(MISSING_STATE_MESSAGE)); + + assertSame(returnedState, observedState); + + checkRetryContextState(KEY, INITIAL_TIMEOUT * MULTIPLYING_COEFFICIENT * MULTIPLYING_COEFFICIENT, 3); + } + + /** + * Verifies that each key maintains its own independent backoff progression. + * + *

Advances {@link #KEY} three times and {@link #OTHER_KEY} once, then asserts + * that each key holds the timeout and attempt count corresponding only to its own + * update history, with no cross-key interference. + */ + @Test + void testStatesAreIsolatedPerKey() { + retryContext.updateAndGetState(KEY); + retryContext.updateAndGetState(KEY); + retryContext.updateAndGetState(KEY); + + retryContext.updateAndGetState(OTHER_KEY); + + checkRetryContextState(KEY, INITIAL_TIMEOUT * MULTIPLYING_COEFFICIENT * MULTIPLYING_COEFFICIENT, 3); + checkRetryContextState(OTHER_KEY, INITIAL_TIMEOUT, 1); + } + + /** + * Verifies that {@link KeyBasedRetryContext#resetState(String)} removes the state + * for the given key, causing {@link KeyBasedRetryContext#getState(String)} to return + * an empty {@link java.util.Optional} after the reset. + */ + @Test + void testResettingState() { + retryContext.updateAndGetState(KEY); + retryContext.updateAndGetState(KEY); + retryContext.updateAndGetState(KEY); + + checkRetryContextState(KEY, INITIAL_TIMEOUT * MULTIPLYING_COEFFICIENT * MULTIPLYING_COEFFICIENT, 3); + + retryContext.resetState(KEY); + + assertFalse(retryContext.getState(KEY).isPresent()); + } + + /** + * Verifies that resetting one key does not affect the state of other keys. + * + *

Advances {@link #KEY} twice and {@link #OTHER_KEY} once, resets {@link #KEY}, + * then asserts that {@link #KEY} is absent while {@link #OTHER_KEY} retains its state. + */ + @Test + void testResettingStateDoesNotAffectOtherKeys() { + retryContext.updateAndGetState(KEY); + retryContext.updateAndGetState(KEY); + retryContext.updateAndGetState(OTHER_KEY); + + retryContext.resetState(KEY); + + assertFalse(retryContext.getState(KEY).isPresent()); + checkRetryContextState(OTHER_KEY, INITIAL_TIMEOUT, 1); + } + + /** + * Verifies that when the registry is at capacity, a new key passed to + * {@link KeyBasedRetryContext#updateAndGetState(String)} receives the fallback + * {@link TimeoutState} with {@link #MAX_TIMEOUT} and attempt {@code -1}. + */ + @Test + void testFallbackWhenRegistryIsFull() { + // Fill registry to the limit + IntStream.range(0, REGISTRY_SIZE_LIMIT) + .mapToObj(i -> "key-" + i) + .forEach(retryContext::updateAndGetState); + + // New key should get fallback + TimeoutState state = retryContext.updateAndGetState("new-key-beyond-limit"); + + assertEquals(MAX_TIMEOUT, state.getTimeout()); + assertEquals(-1, state.getAttempt()); + } + + /** + * Verifies that a key already tracked in the registry continues to progress normally + * even when the registry has reached its size limit. + * + *

This guards against the regression where the overflow check incorrectly + * blocked updates for existing keys rather than only for new ones. + */ + @Test + void testExistingKeyStillProgressesWhenRegistryIsFull() { + retryContext.updateAndGetState(KEY); + + // Fill registry to the limit with other keys + IntStream.range(0, REGISTRY_SIZE_LIMIT - 1) + .mapToObj(i -> "key-" + i) + .forEach(retryContext::updateAndGetState); + + // Existing key should still progress normally + retryContext.updateAndGetState(KEY); + retryContext.updateAndGetState(KEY); + + checkRetryContextState(KEY, INITIAL_TIMEOUT * MULTIPLYING_COEFFICIENT * MULTIPLYING_COEFFICIENT, 3); + } + + /** + * Verifies that {@link KeyBasedRetryContext#getState(String)} returns the fallback + * {@link TimeoutState} (with {@link #MAX_TIMEOUT} and attempt {@code -1}) for a key + * that is not tracked when the registry is full. + */ + @Test + void testGetStateReturnsFallbackWhenRegistryIsFull() { + IntStream.range(0, REGISTRY_SIZE_LIMIT) + .mapToObj(i -> "key-" + i) + .forEach(retryContext::updateAndGetState); + + Optional state = retryContext.getState("new-key-beyond-limit"); + + assertTrue(state.isPresent()); + assertEquals(MAX_TIMEOUT, state.get().getTimeout()); + assertEquals(-1, state.get().getAttempt()); + } + + /** + * Verifies that concurrent calls to {@link KeyBasedRetryContext#updateAndGetState(String)} + * for the same key from multiple threads all succeed, and that the final state reflects + * exactly {@code attemptsNumber} advancements. + * + *

This exercises the per-key locking in {@link java.util.concurrent.ConcurrentHashMap#compute} + * and the CAS loop inside {@link TimeoutState}. + * + * @throws Exception if the thread pool is interrupted during shutdown. + */ + @Test + @Timeout(value = 5, unit = SECONDS) + void testConcurrentStateUpdatingSameKey() throws Exception { + ExecutorService threadPool = Executors.newFixedThreadPool(5); + + int attemptsNumber = 20; + + List> futures = IntStream.range(0, attemptsNumber) + .mapToObj(i -> threadPool.submit(() -> retryContext.updateAndGetState(KEY))) + .collect(toList()); + + try { + futures.forEach(fut -> { + try { + fut.get(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + }); + + checkRetryContextState(KEY, MAX_TIMEOUT, attemptsNumber); + } finally { + threadPool.shutdown(); + threadPool.awaitTermination(5, SECONDS); + } + } + + /** + * Verifies that concurrent calls to {@link KeyBasedRetryContext#updateAndGetState(String)} + * for different keys do not interfere with each other. + * + *

Each key is updated exactly once from a dedicated task. After all tasks complete, + * each key must hold {@link #INITIAL_TIMEOUT} and attempt count {@code 1}, confirming + * no cross-key state contamination under concurrency. + * + * @throws Exception if the thread pool is interrupted during shutdown. + */ + @Test + @Timeout(value = 5, unit = SECONDS) + void testConcurrentStateUpdatingDifferentKeys() throws Exception { + ExecutorService threadPool = Executors.newFixedThreadPool(5); + + List keys = List.of("key-1", "key-2", "key-3", "key-4", "key-5"); + + List> futures = keys.stream() + .map(key -> threadPool.submit(() -> retryContext.updateAndGetState(key))) + .collect(toList()); + + try { + futures.forEach(KeyBasedRetryContextTest::getQuietly); + + keys.forEach(key -> checkRetryContextState(key, INITIAL_TIMEOUT, 1)); + } finally { + threadPool.shutdown(); + threadPool.awaitTermination(5, SECONDS); + } + } + + /** + * Asserts that the retry context holds the expected timeout and attempt count for + * the given key. + * + *

Fails with {@link #MISSING_STATE_MESSAGE} if the state is absent. + * + * @param key the key whose state to check. + * @param expectedTimeout expected current timeout in milliseconds. + * @param expectedAttempts expected current attempt count. + */ + private void checkRetryContextState(String key, int expectedTimeout, int expectedAttempts) { + retryContext.getState(key).ifPresentOrElse(state -> { + assertEquals(expectedTimeout, state.getTimeout()); + assertEquals(expectedAttempts, state.getAttempt()); + }, () -> { + throw new IllegalStateException(MISSING_STATE_MESSAGE); + }); + } + + /** + * A deterministic {@link TimeoutStrategy} that multiplies the current timeout by + * {@link #MULTIPLYING_COEFFICIENT} on each step, capped at {@link #MAX_TIMEOUT}. + * + *

Using integer multiplication rather than a floating-point coefficient avoids + * rounding ambiguity, making expected values in test assertions exact and easy to + * compute by hand. + */ + private static class TestProgressiveTimeoutStrategy implements TimeoutStrategy { + /** + * {@inheritDoc} + * + *

Multiplies {@code currentTimeout} by {@link #MULTIPLYING_COEFFICIENT}, + * capped at {@link #MAX_TIMEOUT}. + */ + @Override + public int next(int currentTimeout) { + return Math.min(currentTimeout * MULTIPLYING_COEFFICIENT, MAX_TIMEOUT); + } + + /** + * {@inheritDoc} + * + *

Returns {@link #MAX_TIMEOUT}. + */ + @Override + public int maxTimeout() { + return MAX_TIMEOUT; + } + } + + /** + * Waits for the given {@link Future} to complete and returns its result. + * + *

Wraps checked exceptions as {@link AssertionError} so they propagate cleanly + * through {@link java.util.function.Consumer} lambdas in test code without requiring + * explicit try-catch blocks. + * + *

    + *
  • {@link ExecutionException} — wraps the cause as an {@link AssertionError}, + * preserving the original exception for diagnosis.
  • + *
  • {@link InterruptedException} — restores the interrupt flag and wraps + * as an {@link AssertionError}.
  • + *
+ * + * @param the future's result type. + * @param future the future to wait for. + * @return the future's result. + * @throws AssertionError if the future completed exceptionally or the thread was interrupted. + */ + private static T getQuietly(Future future) { + try { + return future.get(); + } catch (ExecutionException e) { + throw new AssertionError("Future completed exceptionally", e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError("Interrupted while waiting for future", e); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/retry/SharedRetryContextTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/retry/SharedRetryContextTest.java new file mode 100644 index 000000000000..820215d7b375 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/retry/SharedRetryContextTest.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.retry; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.toList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.stream.IntStream; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +/** + * Unit tests for {@link SharedRetryContext}. + * + *

Verifies lazy initialization, sequential timeout progression, reset behavior, + * and thread safety of concurrent updates. A deterministic {@link TestProgressiveTimeoutStrategy} + * with a fixed multiplier is used to make expected timeout values easy to compute by hand. + */ +public class SharedRetryContextTest { + /** Message included in exceptions thrown when an expected state is absent. */ + private static final String MISSING_STATE_MESSAGE = "TimeoutState is missing!"; + + /** + * Multiplier applied by {@link TestProgressiveTimeoutStrategy} on each step. + * Used to compute expected timeout values in assertions. + */ + private static final int MULTIPLYING_COEFFICIENT = 4; + + /** Initial timeout passed to the {@link SharedRetryContext} under test. */ + private static final int INITIAL_TIMEOUT = 20; + + /** + * Maximum timeout configured in {@link TestProgressiveTimeoutStrategy}. + * The progression is capped at this value. + */ + private static final int MAX_TIMEOUT = 1_000; + + /** Retry context under test, recreated before each test. */ + private SharedRetryContext retryContext; + + /** + * Creates a fresh {@link SharedRetryContext} with {@link #INITIAL_TIMEOUT} and + * a {@link TestProgressiveTimeoutStrategy} before each test. + */ + @BeforeEach + void setUp() { + retryContext = new SharedRetryContext(INITIAL_TIMEOUT, new TestProgressiveTimeoutStrategy()); + } + + /** + * Verifies lazy initialization behavior of {@link SharedRetryContext}. + * + *

Before any call to {@link SharedRetryContext#updateAndGetState()}, {@link + * SharedRetryContext#getState()} must return an empty {@link java.util.Optional}. + * After the first update, the state must be present with {@link #INITIAL_TIMEOUT} + * and attempt count {@code 1}. + */ + @Test + void testGettingState() { + assertFalse(retryContext.getState().isPresent()); + + retryContext.updateAndGetState(); + + assertTrue(retryContext.getState().isPresent()); + + TimeoutState state = retryContext.getState().get(); + + assertEquals(INITIAL_TIMEOUT, state.getTimeout()); + assertEquals(1, state.getAttempt()); + } + + /** + * Verifies that {@link SharedRetryContext#updateAndGetState()} returns the same + * object reference as {@link SharedRetryContext#getState()}, and that the timeout + * advances correctly after multiple calls. + * + *

After three updates, the expected timeout is + * {@code INITIAL_TIMEOUT * MULTIPLYING_COEFFICIENT^2} with attempt count {@code 3}, + * reflecting that the first update returns the initial timeout and subsequent updates + * apply the coefficient. + */ + @Test + void testUpdatingAndGettingState() { + retryContext.updateAndGetState(); + retryContext.updateAndGetState(); + TimeoutState returnedState = retryContext.updateAndGetState(); + TimeoutState observedState = retryContext.getState().orElseThrow(() -> new IllegalStateException(MISSING_STATE_MESSAGE)); + + assertSame(returnedState, observedState); + + checkRetryContextState(INITIAL_TIMEOUT * MULTIPLYING_COEFFICIENT * MULTIPLYING_COEFFICIENT, 3); + } + + /** + * Verifies that {@link SharedRetryContext#resetState()} clears the shared state, + * causing {@link SharedRetryContext#getState()} to return an empty + * {@link java.util.Optional} after the reset. + */ + @Test + void testResettingState() { + retryContext.updateAndGetState(); + retryContext.updateAndGetState(); + retryContext.updateAndGetState(); + + checkRetryContextState(INITIAL_TIMEOUT * MULTIPLYING_COEFFICIENT * MULTIPLYING_COEFFICIENT, 3); + + retryContext.resetState(); + + assertFalse(retryContext.getState().isPresent()); + } + + /** + * Verifies that concurrent calls to {@link SharedRetryContext#updateAndGetState()} + * from multiple threads all succeed, and that the final state reflects exactly + * {@code attemptsNumber} advancements. + * + *

Submits {@code attemptsNumber} tasks to a 5-thread pool, waits for all to + * complete, then asserts that the timeout has reached {@link #MAX_TIMEOUT} and + * the attempt count equals the number of submitted tasks. + * + * @throws Exception if the thread pool is interrupted during shutdown. + */ + @Test + @Timeout(value = 5, unit = SECONDS) + void testConcurrentStateUpdating() throws Exception { + ExecutorService threadPool = Executors.newFixedThreadPool(5); + + int attemptsNumber = 20; + + List> futures = IntStream.range(0, attemptsNumber) + .mapToObj(i -> threadPool.submit(() -> retryContext.updateAndGetState())) + .collect(toList()); + + try { + futures.forEach(SharedRetryContextTest::getQuietly); + + checkRetryContextState(MAX_TIMEOUT, attemptsNumber); + } finally { + threadPool.shutdown(); + threadPool.awaitTermination(5, SECONDS); + } + } + + /** + * Asserts that the shared retry context holds the expected timeout and attempt count. + * + *

Fails with {@link #MISSING_STATE_MESSAGE} if the state is absent. + * + * @param expectedTimeout expected current timeout in milliseconds. + * @param expectedAttempts expected current attempt count. + */ + private void checkRetryContextState(int expectedTimeout, int expectedAttempts) { + retryContext.getState().ifPresentOrElse(state -> { + assertEquals(expectedTimeout, state.getTimeout()); + assertEquals(expectedAttempts, state.getAttempt()); + }, () -> { + throw new IllegalStateException(MISSING_STATE_MESSAGE); + }); + } + + /** + * A deterministic {@link TimeoutStrategy} that multiplies the current timeout by + * {@link #MULTIPLYING_COEFFICIENT} on each step, capped at {@link #MAX_TIMEOUT}. + * + *

Using integer multiplication rather than a floating-point coefficient avoids + * rounding ambiguity, making expected values in test assertions exact and easy to + * compute by hand. + */ + private static class TestProgressiveTimeoutStrategy implements TimeoutStrategy { + /** + * {@inheritDoc} + * + *

Multiplies {@code currentTimeout} by {@link #MULTIPLYING_COEFFICIENT}, + * capped at {@link #MAX_TIMEOUT}. + */ + @Override + public int next(int currentTimeout) { + return Math.min(currentTimeout * MULTIPLYING_COEFFICIENT, MAX_TIMEOUT); + } + + /** + * {@inheritDoc} + * + *

Returns {@link #MAX_TIMEOUT}. + */ + @Override + public int maxTimeout() { + return MAX_TIMEOUT; + } + } + + /** + * Waits for the given {@link Future} to complete and returns its result. + * + *

Wraps checked exceptions as {@link AssertionError} so they propagate cleanly + * through {@link java.util.function.Consumer} lambdas in test code without requiring + * explicit try-catch blocks. + * + *

    + *
  • {@link ExecutionException} — wraps the cause as an {@link AssertionError}, + * preserving the original exception for diagnosis.
  • + *
  • {@link InterruptedException} — restores the interrupt flag and wraps + * as an {@link AssertionError}.
  • + *
+ * + * @param the future's result type. + * @param future the future to wait for. + * @return the future's result. + * @throws AssertionError if the future completed exceptionally or the thread was interrupted. + */ + private static T getQuietly(Future future) { + try { + return future.get(); + } catch (ExecutionException e) { + throw new AssertionError("Future completed exceptionally", e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError("Interrupted while waiting for future", e); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/retry/TimeoutStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/retry/TimeoutStateTest.java new file mode 100644 index 000000000000..07ba6ebd064d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/retry/TimeoutStateTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.retry; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Unit tests for {@link TimeoutState}. + * + *

Verifies the correctness of initial state construction, atomic CAS updates, + * stale-snapshot rejection, and the consistency between the raw packed {@code long} + * and the individual accessor methods. + */ +public class TimeoutStateTest { + /** Timeout value used to construct the shared {@link TimeoutState} instance. */ + private static final int TIMEOUT = 20; + + /** Attempt count used to construct the shared {@link TimeoutState} instance. */ + private static final int ATTEMPT = 10; + + /** Shared state instance recreated before each test. */ + private TimeoutState state; + + /** + * Creates a fresh {@link TimeoutState} with {@link #TIMEOUT} and {@link #ATTEMPT} + * before each test to ensure full isolation. + */ + @BeforeEach + void setUp() { + state = new TimeoutState(TIMEOUT, ATTEMPT); + } + + /** + * Verifies that a newly constructed {@link TimeoutState} returns the timeout and + * attempt values it was initialized with. + */ + @Test + void testInitialState() { + assertEquals(TIMEOUT, state.getTimeout()); + assertEquals(ATTEMPT, state.getAttempt()); + } + + /** + * Verifies that {@link TimeoutState#update(long, int, int)} succeeds when the + * provided snapshot matches the current internal state, and that both fields + * are updated atomically. + */ + @Test + void testUpdate() { + int newTimeout = 40; + int newAttempt = 11; + + long raw = state.getRawState(); + boolean updated = state.update(raw, newTimeout, newAttempt); + + assertTrue(updated); + assertEquals(newTimeout, state.getTimeout()); + assertEquals(newAttempt, state.getAttempt()); + } + + /** + * Verifies that {@link TimeoutState#update(long, int, int)} rejects an update + * when the snapshot is stale — i.e., the internal state has been modified by + * another call since the snapshot was taken. + * + *

After a successful update advances the state, a second update using the + * original snapshot must return {@code false} and leave the state unchanged + * at the first update's values. + */ + @Test + void testUpdateFailsOnStaleSnapshot() { + long staleSnapshot = state.getRawState(); + + // advance state so staleSnapshot is no longer current + state.update(staleSnapshot, 40, 11); + + // update with stale snapshot must fail + boolean updated = state.update(staleSnapshot, 80, 12); + + assertFalse(updated); + // state must remain at the first update's values + assertEquals(40, state.getTimeout()); + assertEquals(11, state.getAttempt()); + } + + /** + * Verifies that {@link TimeoutState#getTimeout()} and {@link TimeoutState#getAttempt()} + * are consistent with the raw packed value returned by {@link TimeoutState#getRawState()}. + * + *

This confirms that the pack/unpack bit manipulation is symmetric and that + * callers who take a raw snapshot and decompose it manually get the same result + * as callers who use the individual accessors. + */ + @Test + void testGetTimeoutAndGetAttemptAreConsistentWithRawState() { + long raw = state.getRawState(); + + assertEquals(state.getTimeout(), TimeoutState.timeout(raw)); + assertEquals(state.getAttempt(), TimeoutState.attempt(raw)); + } + + /** + * Verifies that {@link TimeoutState#pack(int, int)} followed by + * {@link TimeoutState#timeout(long)} and {@link TimeoutState#attempt(long)} + * recovers the original values exactly. + * + *

Tests the bit-level correctness of the packing scheme independently of + * the {@link TimeoutState} object lifecycle. + */ + @Test + void testPackUnpackRoundtrip() { + long packed = TimeoutState.pack(TIMEOUT, ATTEMPT); + + assertEquals(TIMEOUT, TimeoutState.timeout(packed)); + assertEquals(ATTEMPT, TimeoutState.attempt(packed)); + } +} diff --git a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index fc0e4728fa89..c2ac20fce5c9 100644 --- a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -257,6 +257,7 @@ import org.apache.ignite.internal.tx.message.TxMessageGroup; import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage; import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; +import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.vault.persistence.PersistentVaultService; import org.apache.ignite.network.NetworkAddress; @@ -1465,7 +1466,8 @@ private class Node { transactionInflights, lowWatermark, commonScheduledExecutorService, - metricManager + metricManager, + new NoopTimeoutStrategy() ); replicaManager = spy(new ReplicaManager( diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java index 65b9de944a1d..2990e7d60dd2 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java @@ -196,6 +196,7 @@ import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage; import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage; import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; +import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; @@ -634,7 +635,8 @@ public CompletableFuture invoke(Condition condition, Operation success, transactionInflights, lowWatermark, threadPoolsManager.commonScheduler(), - metricManager + metricManager, + new NoopTimeoutStrategy() ); volatileLogStorageManagerCreator = new VolatileLogStorageManagerCreator(name, workDir.resolve("volatile-log-spillout-" + name)); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 15bf03e44b7a..dc1d3999453c 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -237,6 +237,7 @@ import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; import org.apache.ignite.internal.util.ByteUtils; import org.apache.ignite.internal.util.Cursor; +import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.version.DefaultIgniteProductVersionSource; import org.apache.ignite.internal.worker.CriticalWorkerWatchdog; @@ -680,7 +681,8 @@ public CompletableFuture invoke(Condition condition, List su lowWatermark, threadPoolsManager.commonScheduler(), failureProcessor, - metricManager + metricManager, + new NoopTimeoutStrategy() ); ResourceVacuumManager resourceVacuumManager = new ResourceVacuumManager( diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 7ba2b4d4cd3c..5d567f181e0e 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -296,6 +296,7 @@ import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage; import org.apache.ignite.internal.tx.message.TxMessageGroup; import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage; +import org.apache.ignite.internal.util.retry.ExponentialBackoffTimeoutStrategy; import org.apache.ignite.internal.vault.VaultManager; import org.apache.ignite.internal.vault.persistence.PersistentVaultService; import org.apache.ignite.internal.version.DefaultIgniteProductVersionSource; @@ -1094,7 +1095,8 @@ public class IgniteImpl implements Ignite { lowWatermark, threadPoolsManager.commonScheduler(), failureManager, - metricManager + metricManager, + new ExponentialBackoffTimeoutStrategy() ); sharedTxStateStorage = new TxStateRocksDbSharedStorage( diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java index fdc914c291b4..c971c7830c98 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java @@ -100,6 +100,7 @@ import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; import org.apache.ignite.internal.type.NativeTypes; import org.apache.ignite.internal.type.StructNativeType; +import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.table.QualifiedName; import org.apache.ignite.table.QualifiedNameHelper; @@ -196,7 +197,8 @@ public void testScanNodeDataPropagation() throws InterruptedException { transactionInflights, new TestLowWatermark(), commonExecutor, - new NoOpMetricManager() + new NoOpMetricManager(), + new NoopTimeoutStrategy() ); assertThat(txManager.startAsync(new ComponentContext()), willCompleteSuccessfully()); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java index d144d5c037b6..d6ce39077218 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java @@ -56,6 +56,7 @@ import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage; import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; import org.apache.ignite.internal.type.NativeTypes; +import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy; import org.apache.ignite.raft.jraft.test.TestUtils; import org.apache.ignite.table.RecordView; import org.apache.ignite.table.Tuple; @@ -167,7 +168,8 @@ protected TxManagerImpl newTxManager( transactionInflights, lowWatermark, commonExecutor, - new NoOpMetricManager() + new NoOpMetricManager(), + new NoopTimeoutStrategy() ); } }; diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java index f4af6166011f..6e5ca14465fd 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java @@ -80,6 +80,7 @@ import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; import org.apache.ignite.internal.util.Lazy; import org.apache.ignite.internal.util.PendingComparableValuesTracker; +import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy; import org.apache.ignite.table.QualifiedName; import org.apache.ignite.table.Tuple; import org.apache.ignite.tx.TransactionException; @@ -157,7 +158,8 @@ protected TxManagerImpl newTxManager( transactionInflights, lowWatermark, commonExecutor, - new NoOpMetricManager() + new NoOpMetricManager(), + new NoopTimeoutStrategy() ) { @Override public Executor writeIntentSwitchExecutor() { diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java index f436f2a2bb0d..339e140ccb82 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java @@ -120,6 +120,7 @@ import org.apache.ignite.internal.tx.test.TestTransactionIds; import org.apache.ignite.internal.type.NativeTypes; import org.apache.ignite.internal.util.CollectionUtils; +import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy; import org.apache.ignite.sql.ColumnType; import org.apache.ignite.sql.IgniteSql; import org.apache.ignite.table.QualifiedName; @@ -224,7 +225,8 @@ static void beforeAllTests() { transactionInflights, new TestLowWatermark(), commonExecutor, - new NoOpMetricManager() + new NoOpMetricManager(), + new NoopTimeoutStrategy() ) { @Override public CompletableFuture finish( diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java index 816dd602018c..5b2802fbc40d 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java @@ -189,6 +189,7 @@ import org.apache.ignite.internal.util.Lazy; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.internal.util.SafeTimeValuesTracker; +import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; import org.apache.ignite.sql.IgniteSql; @@ -637,7 +638,8 @@ protected TxManagerImpl newTxManager( lowWatermark, executor, new NoOpFailureManager(), - new TestMetricManager() + new TestMetricManager(), + new NoopTimeoutStrategy() ); } @@ -1321,7 +1323,8 @@ private void initializeClientTxComponents() { lowWatermark, executor, new NoOpFailureManager(), - new TestMetricManager() + new TestMetricManager(), + new NoopTimeoutStrategy() ); clientResourceVacuumManager = new ResourceVacuumManager( diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java index dfe814ccef13..684adb3712f6 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java @@ -147,6 +147,7 @@ import org.apache.ignite.internal.util.Lazy; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.internal.util.SafeTimeValuesTracker; +import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.table.QualifiedName; import org.apache.ignite.table.QualifiedNameHelper; @@ -709,7 +710,8 @@ public static TxManagerImpl txManager( transactionInflights, new TestLowWatermark(), COMMON_SCHEDULER, - new NoOpMetricManager() + new NoOpMetricManager(), + new NoopTimeoutStrategy() ); assertThat(txManager.startAsync(new ComponentContext()), willCompleteSuccessfully()); diff --git a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItDurableFinishFailureTest.java b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItDurableFinishFailureTest.java new file mode 100644 index 000000000000..6c82902bb592 --- /dev/null +++ b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItDurableFinishFailureTest.java @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tx.distributed; + +import static java.util.concurrent.Executors.newFixedThreadPool; +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; +import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.IntStream; +import org.apache.ignite.internal.ClusterPerTestIntegrationTest; +import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.metrics.LongMetric; +import org.apache.ignite.internal.metrics.Metric; +import org.apache.ignite.internal.tx.impl.TxManagerImpl; +import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest; +import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource; +import org.apache.ignite.internal.util.retry.KeyBasedRetryContext; +import org.apache.ignite.internal.util.retry.TimeoutState; +import org.apache.ignite.tx.Transaction; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Integration tests verifying the retry behavior of durable transaction finish + * ({@link TxFinishReplicaRequest}) in a 3-node Apache Ignite cluster. + * + *

Each test drops or intercepts {@link TxFinishReplicaRequest} messages on all nodes + * to simulate network failures, then verifies that: + *

    + *
  • the finish operation is retried as expected;
  • + *
  • retry timeouts grow exponentially between attempts;
  • + *
  • the retry context is cleaned up after success;
  • + *
  • no unnecessary retries occur when the first attempt succeeds.
  • + *
+ * + *

Each test creates a single-partition, 3-replica zone and table in {@code @BeforeEach}. + * Tests that require additional zones/tables create them locally. + */ +public class ItDurableFinishFailureTest extends ClusterPerTestIntegrationTest { + /** + * Thread name fragment identifying the scheduler thread that sends retry attempts. + * Used to distinguish retried messages from the original commit attempt in message interceptors. + */ + private static final String RETRY_THREAD_NAME = "common-scheduler"; + + /** Metric name for the total number of committed transactions, used to verify commit completion. */ + private static final String TOTAL_COMMITED_TRANSACTIONS_METRIC_NAME = "TotalCommits"; + + /** Name of the default test table created in {@code @BeforeEach}. */ + private static final String TABLE_NAME = "test_table"; + + /** Number of replicas for all test zones. */ + private static final int REPLICAS = 3; + + /** + * Creates a single-partition, 3-replica zone and a test table before each test. + * Tests that require additional tables or zones create them locally. + */ + @BeforeEach + public void setup() { + String zoneSql = "create zone test_zone (partitions 1, replicas " + REPLICAS + + ") storage profiles ['" + DEFAULT_STORAGE_PROFILE + "']"; + String tableSql = "create table " + TABLE_NAME + " (key bigint primary key, val varchar(20)) zone TEST_ZONE"; + + sql(zoneSql); + sql(tableSql); + } + + /** + * Verifies that no retry occurs when the durable finish succeeds on the first attempt. + * + *

Installs a message interceptor that counts {@link TxFinishReplicaRequest} messages + * without dropping any of them. After the commit completes, asserts that exactly one + * finish message was sent across all nodes. + */ + @Test + public void testNoRetryOnSuccessfulFinish() { + IgniteImpl node = anyNode(); + Transaction tx = node.transactions().begin(); + node.sql().execute(tx, "insert into " + TABLE_NAME + " (key, val) values (1, 'val-1')"); + + AtomicInteger finishAttempts = new AtomicInteger(); + + for (IgniteImpl n : runningNodesIter()) { + n.dropMessages((dest, msg) -> { + if (msg instanceof TxFinishReplicaRequest) { + finishAttempts.incrementAndGet(); + } + + return false; + }); + } + + tx.commitAsync(); + + await().timeout(5, SECONDS).until(() -> commitedTransactions(node) == 1); + + await().timeout(1, SECONDS).until(() -> finishAttempts.get() >= 1); + + assertEquals(1, finishAttempts.get()); + } + + /** + * Verifies that the durable finish is retried after a single failure. + * + *

Drops the first {@link TxFinishReplicaRequest} on all nodes to simulate a transient + * network failure. On the subsequent retry, captures a snapshot of the retry context + * across all nodes and asserts that exactly one node has an active retry entry for the + * transaction. After the commit completes, asserts that the retry context is fully cleaned up. + */ + @Test + public void testDurableFinishRetry() { + IgniteImpl node = anyNode(); + Transaction tx = node.transactions().begin(); + node.sql().execute(tx, "insert into " + TABLE_NAME + " (key, val) values (1, 'val-1')"); + + AtomicInteger failedFinishAttempts = new AtomicInteger(); + + List retryContexts = new ArrayList<>(); + + AtomicLong expectedSizeOfRetryContext = new AtomicLong(0); + + for (IgniteImpl n : runningNodesIter()) { + retryContexts.add(((TxManagerImpl) n.txManager()).retryContext()); + + n.dropMessages((dest, msg) -> { + if (msg instanceof TxFinishReplicaRequest) { + if (failedFinishAttempts.get() == 0) { + // Makes durable finish fail with replication timeout, on the first attempt. + return failedFinishAttempts.incrementAndGet() == 1; + } + + expectedSizeOfRetryContext.compareAndSet(0, retryContexts.stream() + .map(KeyBasedRetryContext::snapshot) + .filter(retryContextSnapshot -> retryContextSnapshot.size() == 1) + .count()); + } + + return false; + }); + } + + tx.commitAsync(); + + await().timeout(5, SECONDS).until(() -> failedFinishAttempts.get() == 1); + + await().timeout(5, SECONDS).until(() -> commitedTransactions(node) == 1); + + assertEquals(1, expectedSizeOfRetryContext.get()); + + await().timeout(5, SECONDS).until(() -> retryContexts.stream() + .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty)); + } + + /** + * Verifies that retry timeouts grow monotonically between consecutive retry attempts. + * + *

Drops the first 3 {@link TxFinishReplicaRequest} messages. For each retry attempt + * arriving on the scheduler thread, captures the current timeout from the retry context + * snapshot. After all drops and the final successful commit, asserts that the captured + * timeout sequence is strictly increasing, confirming exponential backoff behavior. + * + *

Timeout samples are captured only on the retry scheduler thread + * (identified by {@link #RETRY_THREAD_NAME}) to ensure we observe post-advancement values + * rather than the stale pre-drop state. + */ + @Test + public void testRetryWithGrowingTimeout() { + IgniteImpl node = anyNode(); + Transaction tx = node.transactions().begin(); + node.sql().execute(tx, "insert into " + TABLE_NAME + " (key, val) values (1, 'val-1')"); + + AtomicInteger failedFinishAttempts = new AtomicInteger(); + + List retryContexts = new ArrayList<>(); + + List timeoutSamples = new CopyOnWriteArrayList<>(); + + for (IgniteImpl n : runningNodesIter()) { + TxManagerImpl txManager = (TxManagerImpl) n.txManager(); + retryContexts.add(txManager.retryContext()); + + n.dropMessages((dest, msg) -> { + if (msg instanceof TxFinishReplicaRequest) { + if (Thread.currentThread().getName().contains(RETRY_THREAD_NAME)) { + retryContexts.stream() + .map(KeyBasedRetryContext::snapshot) + .filter(retryContextSnapshot -> retryContextSnapshot.size() == 1) + .flatMap(retryContextSnapshot -> retryContextSnapshot.values().stream()) + .forEach(timeoutState -> timeoutSamples.add(timeoutState.getTimeout())); + } + + if (failedFinishAttempts.getAndIncrement() < 3) { + return true; + } + + return false; + } + + return false; + }); + } + + tx.commitAsync(); + + await().timeout(5, SECONDS).until(() -> failedFinishAttempts.get() == 3); + + await().timeout(5, SECONDS).until(() -> commitedTransactions(node) == 1); + + assertTrue(timeoutSamples.size() > 1, "Expected at least 2 timeout samples, got: " + timeoutSamples.size()); + + for (int i = 1; i < timeoutSamples.size(); i++) { + assertTrue(timeoutSamples.get(i - 1) < timeoutSamples.get(i), "timeout increasing is expected!"); + } + + await().timeout(5, SECONDS).until(() -> retryContexts.stream() + .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty)); + } + + /** + * Verifies that 100 concurrent transactions are all eventually committed after transient failures. + * + *

Creates a zone with 25 partitions to distribute load. For each transaction, drops + * the first {@link TxFinishReplicaRequest} (keyed by transaction ID) to force one retry per + * transaction. After all tasks are submitted and the thread pool shuts down, waits for all + * 100 transactions to complete and for all retry context entries to be cleaned up. + * + * @throws Exception if the thread pool is interrupted during shutdown. + */ + @Test + public void testRetryManyConcurrentDurableFinishes() throws Exception { + IgniteImpl node = anyNode(); + + String zone1Sql = "create zone test_zone_1 (partitions 25, replicas " + REPLICAS + + ") storage profiles ['" + DEFAULT_STORAGE_PROFILE + "']"; + String table1Sql = "create table test_table_1 (key bigint primary key, val varchar(20)) zone TEST_ZONE_1"; + + sql(zone1Sql); + sql(table1Sql); + + Map failedCleanupAttempts = new ConcurrentHashMap<>(); + + List retryContexts = new ArrayList<>(); + + for (IgniteImpl n : runningNodesIter()) { + TxManagerImpl txManager = (TxManagerImpl) n.txManager(); + retryContexts.add(txManager.retryContext()); + + n.dropMessages((dest, msg) -> { + if (msg instanceof TxFinishReplicaRequest) { + String txId = ((TxFinishReplicaRequest) msg).txId().toString(); + + if (failedCleanupAttempts.computeIfAbsent(txId, key -> new AtomicInteger(0)).getAndIncrement() == 0) { + return true; + } + } + + return false; + }); + } + + ExecutorService threadPool = newFixedThreadPool(10); + + List> futures = IntStream.range(0, 100).mapToObj(i -> threadPool.submit(() -> { + Transaction tx = node.transactions().begin(); + node.sql().execute(tx, "insert into test_table_1 (key, val) values (?, ?)", i, "val-" + i); + + tx.commitAsync(); + })).collect(toList()); + + try { + futures.forEach(ItDurableFinishFailureTest::getQuietly); + } finally { + threadPool.shutdown(); + threadPool.awaitTermination(5, SECONDS); + } + + await().timeout(15, SECONDS).until(() -> commitedTransactions(node) == 100); + + await().timeout(5, SECONDS).until(() -> retryContexts.stream() + .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty)); + } + + /** + * Verifies that transactions targeting different replication groups are tracked + * independently in the retry context and converge to the same retry timeout value. + * + *

Creates two separate single-partition zones with dedicated tables. Drops the first + * two {@link TxFinishReplicaRequest} attempts for each transaction (keyed by transaction ID) + * to advance both retry contexts by the same number of steps. On the third attempt, captures + * a snapshot of all retry context entries across all nodes. + * + *

After both transactions commit, asserts that: + *

    + *
  • exactly two distinct entries are present in the captured snapshot — one per transaction, + * confirming that each transaction is tracked under its own key;
  • + *
  • both entries carry the same timeout value, confirming that independently progressed + * retry contexts reach identical timeouts when starting from the same initial value + * and advancing the same number of steps.
  • + *
+ * + *

Finally, asserts that both retry context entries are cleaned up after successful commit. + */ + @Test + public void testRetryDurableFinishForDifferentZones() { + IgniteImpl node = anyNode(); + + String zone1Sql = "create zone test_zone_1 (partitions 1, replicas " + REPLICAS + + ") storage profiles ['" + DEFAULT_STORAGE_PROFILE + "']"; + String table1Sql = "create table test_table_1 (key bigint primary key, val varchar(20)) zone TEST_ZONE_1"; + + sql(zone1Sql); + sql(table1Sql); + + String zone2Sql = "create zone test_zone_2 (partitions 1, replicas " + REPLICAS + + ") storage profiles ['" + DEFAULT_STORAGE_PROFILE + "']"; + String table2Sql = "create table test_table_2 (key bigint primary key, val varchar(20)) zone TEST_ZONE_2"; + + sql(zone2Sql); + sql(table2Sql); + + Map failedFinishAttempts = new ConcurrentHashMap<>(); + Map timeoutSamples = new ConcurrentHashMap<>(); + + List retryContexts = new ArrayList<>(); + + for (IgniteImpl n : runningNodesIter()) { + TxManagerImpl txManager = (TxManagerImpl) n.txManager(); + retryContexts.add(txManager.retryContext()); + + n.dropMessages((dest, msg) -> { + if (msg instanceof TxFinishReplicaRequest) { + String txId = ((TxFinishReplicaRequest) msg).txId().toString(); + + if (failedFinishAttempts.computeIfAbsent(txId, key -> new AtomicInteger(0)).getAndIncrement() < 2) { + return true; + } + + timeoutSamples.putAll( + retryContexts.stream() + .map(KeyBasedRetryContext::snapshot) + .filter(snapshot -> !snapshot.isEmpty()) + .flatMap(snapshot -> snapshot.entrySet().stream()) + .collect(toMap(Entry::getKey, Entry::getValue, (existing, replacement) -> replacement)) + ); + } + + return false; + }); + } + + Transaction tx1 = node.transactions().begin(); + node.sql().execute(tx1, "insert into test_table_1 (key, val) values (1, 'val-1')"); + + tx1.commitAsync(); + + Transaction tx2 = node.transactions().begin(); + node.sql().execute(tx2, "insert into TEST_TABLE_2 (key, val) values (1, 'val-1')"); + + tx2.commitAsync(); + + await().timeout(5, SECONDS).until(() -> commitedTransactions(node) == 2); + + assertEquals(2, timeoutSamples.size(), + "Expected timeout state for both transactions, but got: " + timeoutSamples.keySet()); + + List collectedTimeouts = timeoutSamples.values().stream() + .map(TimeoutState::getTimeout) + .distinct() + .collect(toList()); + + assertEquals(1, collectedTimeouts.size(), + "Expected both transactions to have the same timeout value, but got: " + collectedTimeouts); + + await().timeout(5, SECONDS).until(() -> retryContexts.stream() + .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty)); + } + + /** + * Returns the total number of committed transactions on the given node by reading + * the {@value #TOTAL_COMMITED_TRANSACTIONS_METRIC_NAME} metric from + * {@link TransactionMetricsSource}. + * + *

Fails the test immediately if the metric is not found, as this indicates + * a misconfiguration or an unexpected change in metric naming. + * + * @param node the node to read the metric from. + * @return total number of committed transactions. + */ + private static long commitedTransactions(IgniteImpl node) { + Iterable metrics = node.metricManager() + .metricSnapshot() + .metrics() + .get(TransactionMetricsSource.SOURCE_NAME); + + for (Metric m : metrics) { + if (TOTAL_COMMITED_TRANSACTIONS_METRIC_NAME.equals(m.name())) { + return ((LongMetric) m).value(); + } + } + + fail(); + + return -1; + } + + /** + * Waits for the given {@link Future} to complete and returns its result. + * + *

Wraps checked exceptions as {@link AssertionError} so they propagate cleanly + * through {@link java.util.function.Consumer} lambdas in test code without requiring + * explicit try-catch blocks. + * + *

    + *
  • {@link ExecutionException} — wraps the cause as an {@link AssertionError}, + * preserving the original exception for diagnosis.
  • + *
  • {@link InterruptedException} — restores the interrupt flag and wraps + * as an {@link AssertionError}.
  • + *
+ * + * @param the future's result type. + * @param future the future to wait for. + * @return the future's result. + * @throws AssertionError if the future completed exceptionally or the thread was interrupted. + */ + private static T getQuietly(Future future) { + try { + return future.get(); + } catch (ExecutionException e) { + throw new AssertionError("Future completed exceptionally", e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError("Interrupted while waiting for future", e); + } + } +} diff --git a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxCleanupFailureTest.java b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxCleanupFailureTest.java index 723e57df6d53..dd4125b9602f 100644 --- a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxCleanupFailureTest.java +++ b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTxCleanupFailureTest.java @@ -17,31 +17,77 @@ package org.apache.ignite.tx.distributed; +import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE; import static org.apache.ignite.internal.tx.metrics.TransactionMetricsSource.METRIC_PENDING_WRITE_INTENTS; import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.IntStream; import org.apache.ignite.internal.ClusterPerTestIntegrationTest; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.metrics.LongMetric; import org.apache.ignite.internal.metrics.Metric; +import org.apache.ignite.internal.tx.impl.TxManagerImpl; import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest; import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource; +import org.apache.ignite.internal.util.retry.KeyBasedRetryContext; +import org.apache.ignite.internal.util.retry.TimeoutState; import org.apache.ignite.tx.Transaction; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; /** - * Tests for transaction cleanup failure. + * Integration tests verifying the retry behavior of transaction write intent cleanup + * ({@link WriteIntentSwitchReplicaRequest}) in a 3-node Apache Ignite cluster. + * + *

Each test drops or intercepts {@link WriteIntentSwitchReplicaRequest} messages on all nodes + * to simulate transient network failures during the cleanup phase that follows a committed + * transaction. Tests verify that: + *

    + *
  • cleanup is retried after a transient failure and eventually succeeds;
  • + *
  • retry timeouts grow monotonically between consecutive attempts;
  • + *
  • the retry context is cleaned up after successful cleanup;
  • + *
  • no unnecessary retries occur when the first cleanup attempt succeeds.
  • + *
+ * + *

Each test creates a single-partition, 3-replica zone and table in {@link #setup()}. + * Tests that require additional zones or tables create them locally. */ public class ItTxCleanupFailureTest extends ClusterPerTestIntegrationTest { - /** Table name. */ + /** + * Thread name fragment identifying the thread that sends retry attempts. + * Used to distinguish retried messages from the original cleanup attempt in message interceptors. + */ + private static final String CLEANUP_THREAD_NAME = "tx-async-write-intent"; + + /** Name of the default test table created in {@link #setup()}. */ private static final String TABLE_NAME = "test_table"; + + /** Number of replicas for all test zones. */ private static final int REPLICAS = 3; + /** + * Creates a single-partition, 3-replica zone and a test table before each test. + * Tests that require additional tables or zones create them locally. + */ @BeforeEach public void setup() { String zoneSql = "create zone test_zone (partitions 1, replicas " + REPLICAS @@ -52,6 +98,50 @@ public void setup() { sql(tableSql); } + /** + * Verifies that no retry occurs when the write intent cleanup succeeds on the first attempt. + * + *

Installs a message interceptor that counts {@link WriteIntentSwitchReplicaRequest} + * messages without dropping any of them. After all write intents are resolved, asserts + * that exactly one cleanup message was sent across all nodes. + */ + @Test + public void testNoRetryOnSuccessfulCleanup() { + IgniteImpl node = anyNode(); + Transaction tx = node.transactions().begin(); + node.sql().execute(tx, "insert into " + TABLE_NAME + " (key, val) values (1, 'val-1')"); + + AtomicInteger cleanupAttempts = new AtomicInteger(); + + for (IgniteImpl n : runningNodesIter()) { + n.dropMessages((dest, msg) -> { + if (msg instanceof WriteIntentSwitchReplicaRequest) { + cleanupAttempts.incrementAndGet(); + } + return false; + }); + } + + tx.commitAsync(); + + await().timeout(5, TimeUnit.SECONDS) + .until(() -> pendingWriteIntents(node) == 0); + + await().timeout(1, SECONDS).until(() -> cleanupAttempts.get() >= 1); + + assertEquals(1, cleanupAttempts.get()); + } + + /** + * Verifies that the write intent cleanup is retried after a single failure. + * + *

Drops the first {@link WriteIntentSwitchReplicaRequest} on all nodes to simulate + * a transient failure. On the subsequent retry — arriving on the write intent switch + * executor thread — captures a snapshot of the retry context across all nodes and records + * how many nodes have an active entry for this transaction. After cleanup completes, + * asserts that exactly one node had an active retry entry during the retry, and that + * the retry context is fully cleaned up. + */ @Test public void testRetry() { IgniteImpl node = anyNode(); @@ -60,11 +150,26 @@ public void testRetry() { AtomicInteger failedCleanupAttempts = new AtomicInteger(); + List retryContexts = new ArrayList<>(); + + AtomicLong expectedSizeOfRetryContext = new AtomicLong(0); + for (IgniteImpl n : runningNodesIter()) { + retryContexts.add(((TxManagerImpl) n.txManager()).retryContext()); + n.dropMessages((dest, msg) -> { - if (msg instanceof WriteIntentSwitchReplicaRequest && failedCleanupAttempts.get() == 0) { - // Makes cleanup fail on write intent switch attempt with replication timeout, on the first attempt. - return failedCleanupAttempts.incrementAndGet() == 1; + if (msg instanceof WriteIntentSwitchReplicaRequest) { + if (failedCleanupAttempts.get() == 0) { + // Makes cleanup fail on write intent switch attempt with replication timeout, on the first attempt. + return failedCleanupAttempts.incrementAndGet() == 1; + } + + if (Thread.currentThread().getName().contains(CLEANUP_THREAD_NAME)) { + expectedSizeOfRetryContext.compareAndSet(0, retryContexts.stream() + .map(KeyBasedRetryContext::snapshot) + .filter(retryContextSnapshot -> retryContextSnapshot.size() == 1) + .count()); + } } return false; @@ -73,12 +178,266 @@ public void testRetry() { tx.commitAsync(); - await().timeout(5, TimeUnit.SECONDS).until(() -> failedCleanupAttempts.get() == 1); + await().timeout(5, SECONDS).until(() -> failedCleanupAttempts.get() == 1); + + await().timeout(5, SECONDS).until(() -> pendingWriteIntents(node) == 0); + + assertEquals(1, expectedSizeOfRetryContext.get()); + + await().timeout(5, SECONDS).until(() -> retryContexts.stream() + .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty)); + } + + /** + * Verifies that retry timeouts grow monotonically between consecutive retry attempts. + * + *

Drops the first 3 {@link WriteIntentSwitchReplicaRequest} messages arriving on the + * write intent switch executor thread (identified by the executor's thread name prefix). + * For each dropped attempt, captures the current timeout from the retry context snapshot. + * After all drops and the final successful cleanup, asserts that the captured timeout + * sequence is strictly increasing, confirming exponential backoff behavior. + * + *

Sampling is restricted to the write intent switch executor thread to ensure timeouts + * are captured after the retry context has been advanced for the current attempt, rather + * than observing a stale pre-drop state. + */ + @Test + public void testRetryWithGrowingTimeout() { + IgniteImpl node = anyNode(); + Transaction tx = node.transactions().begin(); + node.sql().execute(tx, "insert into " + TABLE_NAME + " (key, val) values (1, 'val-1')"); + + AtomicInteger failedCleanupAttempts = new AtomicInteger(); + + List retryContexts = new ArrayList<>(); + + List timeoutSamples = new CopyOnWriteArrayList<>(); + + for (IgniteImpl n : runningNodesIter()) { + TxManagerImpl txManager = (TxManagerImpl) n.txManager(); + retryContexts.add(txManager.retryContext()); + + n.dropMessages((dest, msg) -> { + if (msg instanceof WriteIntentSwitchReplicaRequest) { + if (Thread.currentThread().getName().contains(CLEANUP_THREAD_NAME)) { + if (failedCleanupAttempts.getAndIncrement() < 3) { + retryContexts.stream() + .map(KeyBasedRetryContext::snapshot) + .filter(retryContextSnapshot -> retryContextSnapshot.size() == 1) + .flatMap(retryContextSnapshot -> retryContextSnapshot.values().stream()) + .forEach(timeoutState -> timeoutSamples.add(timeoutState.getTimeout())); + + return true; + } + } + + return false; + } + + return false; + }); + } + + tx.commitAsync(); + + await().timeout(5, TimeUnit.SECONDS).until(() -> failedCleanupAttempts.get() == 3); - // Checks that cleanup finally succeeded. await().timeout(5, TimeUnit.SECONDS).until(() -> pendingWriteIntents(node) == 0); + + assertTrue(timeoutSamples.size() > 1, "Expected at least 2 timeout samples, got: " + timeoutSamples.size()); + + for (int i = 1; i < timeoutSamples.size(); i++) { + assertTrue(timeoutSamples.get(i - 1) < timeoutSamples.get(i), "timeout increasing is expected!"); + } + + await().timeout(5, SECONDS).until(() -> retryContexts.stream() + .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty)); } + /** + * Verifies that 100 concurrent transactions all have their write intents cleaned up + * after transient failures. + * + *

Creates a zone with 25 partitions to distribute transaction load. For each + * transaction, drops the first {@link WriteIntentSwitchReplicaRequest} on the write + * intent switch executor thread (keyed by transaction ID) to force exactly one retry + * per transaction. After all 100 tasks are submitted and the thread pool shuts down, + * waits for all pending write intents to reach zero and for all per-node retry context + * entries to be cleaned up. + * + * @throws Exception if the thread pool is interrupted during shutdown. + */ + @Test + public void testRetryManyConcurrentCleanUps() throws Exception { + IgniteImpl node = anyNode(); + + String zone1Sql = "create zone test_zone_1 (partitions 25, replicas " + REPLICAS + + ") storage profiles ['" + DEFAULT_STORAGE_PROFILE + "']"; + String table1Sql = "create table test_table_1 (key bigint primary key, val varchar(20)) zone TEST_ZONE_1"; + + sql(zone1Sql); + sql(table1Sql); + + Map failedCleanupAttempts = new ConcurrentHashMap<>(); + + List retryContexts = new ArrayList<>(); + + for (IgniteImpl n : runningNodesIter()) { + TxManagerImpl txManager = (TxManagerImpl) n.txManager(); + retryContexts.add(txManager.retryContext()); + + n.dropMessages((dest, msg) -> { + if (msg instanceof WriteIntentSwitchReplicaRequest) { + if (Thread.currentThread().getName().contains(CLEANUP_THREAD_NAME)) { + String txId = ((WriteIntentSwitchReplicaRequest) msg).txId().toString(); + + return failedCleanupAttempts.computeIfAbsent(txId, key -> new AtomicInteger(0)).getAndIncrement() == 0; + } + + return false; + } + + return false; + }); + } + + ExecutorService threadPool = Executors.newFixedThreadPool(10); + + List> futures = IntStream.range(0, 100).mapToObj(i -> threadPool.submit(() -> { + Transaction tx = node.transactions().begin(); + node.sql().execute(tx, "insert into test_table_1 (key, val) values (?, ?)", i, "val-" + i); + + tx.commitAsync(); + })).collect(toList()); + + try { + futures.forEach(ItTxCleanupFailureTest::getQuietly); + } finally { + threadPool.shutdown(); + threadPool.awaitTermination(5, SECONDS); + } + + await().timeout(5, SECONDS).until(() -> pendingWriteIntents(node) == 0); + + await().timeout(5, SECONDS).until(() -> retryContexts.stream() + .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty)); + } + + /** + * Verifies that cleanup retries for transactions targeting different replication groups + * are tracked independently in each node's retry context, and that independently + * progressed contexts converge to the same timeout value. + * + *

Creates two separate single-partition zones with dedicated tables. For each + * transaction, drops the first two {@link WriteIntentSwitchReplicaRequest} messages + * arriving on the cleanup thread (keyed by transaction ID), forcing both retry contexts + * to advance by the same number of steps. On the third attempt for each transaction, + * captures a snapshot of all retry context entries across all nodes, merging per-node + * entries by transaction ID — values are identical across nodes for the same transaction + * since each node applies the same backoff logic independently. + * + *

After both transactions' write intents are resolved, asserts that: + *

    + *
  • exactly two distinct transaction IDs are present in the merged snapshot — + * one per transaction, confirming each transaction is tracked under its own key + * in every node's retry context;
  • + *
  • both entries carry the same timeout value, confirming that independently + * progressed per-node retry contexts reach identical timeouts when starting + * from the same initial value and advancing the same number of steps.
  • + *
+ * + *

Finally, asserts that all per-node retry context entries are cleaned up after + * successful cleanup. + */ + @Test + public void testRetryCleanUpsForDifferentZones() { + IgniteImpl node = anyNode(); + + String zone1Sql = "create zone test_zone_1 (partitions 1, replicas " + REPLICAS + + ") storage profiles ['" + DEFAULT_STORAGE_PROFILE + "']"; + String table1Sql = "create table test_table_1 (key bigint primary key, val varchar(20)) zone TEST_ZONE_1"; + + sql(zone1Sql); + sql(table1Sql); + + String zone2Sql = "create zone test_zone_2 (partitions 1, replicas " + REPLICAS + + ") storage profiles ['" + DEFAULT_STORAGE_PROFILE + "']"; + String table2Sql = "create table test_table_2 (key bigint primary key, val varchar(20)) zone TEST_ZONE_2"; + + sql(zone2Sql); + sql(table2Sql); + + Map failedCleanupAttempts = new ConcurrentHashMap<>(); + Map timeoutSamples = new ConcurrentHashMap<>(); + + List retryContexts = new ArrayList<>(); + + for (IgniteImpl n : runningNodesIter()) { + TxManagerImpl txManager = (TxManagerImpl) n.txManager(); + retryContexts.add(txManager.retryContext()); + + n.dropMessages((dest, msg) -> { + if (msg instanceof WriteIntentSwitchReplicaRequest) { + if (Thread.currentThread().getName().contains(CLEANUP_THREAD_NAME)) { + String txId = ((WriteIntentSwitchReplicaRequest) msg).txId().toString(); + + if (failedCleanupAttempts.computeIfAbsent(txId, key -> new AtomicInteger(0)).getAndIncrement() < 2) { + return true; + } + + timeoutSamples.putAll( + retryContexts.stream() + .map(KeyBasedRetryContext::snapshot) + .filter(snapshot -> !snapshot.isEmpty()) + .flatMap(snapshot -> snapshot.entrySet().stream()) + .collect(toMap(Entry::getKey, Entry::getValue, (existing, replacement) -> replacement)) + ); + } + + return false; + } + + return false; + }); + } + + Transaction tx1 = node.transactions().begin(); + node.sql().execute(tx1, "insert into test_table_1 (key, val) values (1, 'val-1')"); + + tx1.commitAsync(); + + Transaction tx2 = node.transactions().begin(); + node.sql().execute(tx2, "insert into TEST_TABLE_2 (key, val) values (1, 'val-1')"); + + tx2.commitAsync(); + + await().timeout(5, SECONDS).until(() -> pendingWriteIntents(node) == 0); + + assertEquals(2, timeoutSamples.size(), + "Expected timeout state for both transactions, but got: " + timeoutSamples.keySet()); + + List collectedTimeouts = timeoutSamples.values().stream() + .map(TimeoutState::getTimeout) + .distinct() + .collect(toList()); + + assertEquals(1, collectedTimeouts.size(), + "Expected both transactions to have the same timeout value, but got: " + collectedTimeouts); + + await().timeout(5, SECONDS).until(() -> retryContexts.stream() + .map(KeyBasedRetryContext::snapshot).allMatch(Map::isEmpty)); + } + + /** + * Returns the number of pending write intents on the given node by reading the + * {@link TransactionMetricsSource#METRIC_PENDING_WRITE_INTENTS} metric. + * + *

Fails the test immediately if the metric is not found, as this indicates + * a misconfiguration or an unexpected change in metric naming. + * + * @param node the node to read the metric from. + * @return number of pending write intents. + */ private static long pendingWriteIntents(IgniteImpl node) { Iterable metrics = node.metricManager() .metricSnapshot() @@ -95,4 +454,34 @@ private static long pendingWriteIntents(IgniteImpl node) { return -1; } + + /** + * Waits for the given {@link Future} to complete and returns its result. + * + *

Wraps checked exceptions as {@link AssertionError} so they propagate cleanly + * through {@link java.util.function.Consumer} lambdas in test code without requiring + * explicit try-catch blocks. + * + *

    + *
  • {@link ExecutionException} — wraps the cause as an {@link AssertionError}, + * preserving the original exception for diagnosis.
  • + *
  • {@link InterruptedException} — restores the interrupt flag and wraps + * as an {@link AssertionError}.
  • + *
+ * + * @param the future's result type. + * @param future the future to wait for. + * @return the future's result. + * @throws AssertionError if the future completed exceptionally or the thread was interrupted. + */ + private static T getQuietly(Future future) { + try { + return future.get(); + } catch (ExecutionException e) { + throw new AssertionError("Future completed exceptionally", e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError("Interrupted while waiting for future", e); + } + } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java index 3912d52699d3..4889aa9e044c 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.tx.impl; -import static java.lang.Math.min; import static java.util.concurrent.CompletableFuture.allOf; import static java.util.function.Function.identity; import static java.util.stream.Collectors.toList; @@ -25,7 +24,7 @@ import static org.apache.ignite.internal.logger.Loggers.toThrottledLogger; import static org.apache.ignite.internal.tx.TxStateMeta.builder; import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow; -import static org.apache.ignite.internal.util.IgniteUtils.scheduleRetry; +import static org.apache.ignite.internal.util.retry.RetryUtil.scheduleRetry; import java.util.ArrayList; import java.util.Collection; @@ -34,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -57,6 +57,7 @@ import org.apache.ignite.internal.tx.message.TxCleanupMessageResponse; import org.apache.ignite.internal.tx.message.TxMessageGroup; import org.apache.ignite.internal.util.CompletableFutures; +import org.apache.ignite.internal.util.retry.KeyBasedRetryContext; import org.jetbrains.annotations.Nullable; /** @@ -65,9 +66,6 @@ public class TxCleanupRequestSender { private static final int ATTEMPTS_LOG_THRESHOLD = 100; - private static final int RETRY_INITIAL_TIMEOUT_MS = 20; - private static final int RETRY_MAX_TIMEOUT_MS = 30_000; - private final IgniteThrottledLogger throttledLog; /** Placement driver helper. */ @@ -87,6 +85,8 @@ public class TxCleanupRequestSender { /** Executor that is used to schedule retries of cleanup messages in case of retryable errors. */ private final ScheduledExecutorService retryExecutor; + private final KeyBasedRetryContext retryContext; + /** * The constructor. * @@ -95,13 +95,15 @@ public class TxCleanupRequestSender { * @param txStateVolatileStorage Volatile transaction state storage. * @param cleanupExecutor Cleanup executor. * @param commonScheduler Common scheduler. + * @param retryContext retry context. */ public TxCleanupRequestSender( TxMessageSender txMessageSender, PlacementDriverHelper placementDriverHelper, VolatileTxStateMetaStorage txStateVolatileStorage, ExecutorService cleanupExecutor, - ScheduledExecutorService commonScheduler + ScheduledExecutorService commonScheduler, + KeyBasedRetryContext retryContext ) { this.txMessageSender = txMessageSender; this.placementDriverHelper = placementDriverHelper; @@ -109,6 +111,7 @@ public TxCleanupRequestSender( this.cleanupExecutor = cleanupExecutor; this.retryExecutor = commonScheduler; this.throttledLog = toThrottledLogger(Loggers.forClass(TxCleanupRequestSender.class), commonScheduler); + this.retryContext = retryContext; } /** @@ -189,7 +192,7 @@ private void markTxnCleanupReplicated(UUID txId, TxState state, ZonePartitionId * @return Completable future of Void. */ public CompletableFuture cleanup(ZonePartitionId commitPartitionId, String node, UUID txId) { - return sendCleanupMessageWithRetries(commitPartitionId, false, null, txId, node, null, RETRY_INITIAL_TIMEOUT_MS, 0); + return sendCleanupMessageWithRetries(commitPartitionId, false, null, txId, node, null); } /** @@ -226,7 +229,7 @@ public CompletableFuture cleanup( enlistedPartitionGroups.add(new EnlistedPartitionGroup(partitionId, partition.tableIds())); }); - return cleanupPartitions(commitPartitionId, partitionsByPrimaryName, commit, commitTimestamp, txId, RETRY_INITIAL_TIMEOUT_MS, 0); + return cleanupPartitions(commitPartitionId, partitionsByPrimaryName, commit, commitTimestamp, txId); } /** @@ -245,18 +248,6 @@ public CompletableFuture cleanup( boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId - ) { - return cleanup(commitPartitionId, partitions, commit, commitTimestamp, txId, RETRY_INITIAL_TIMEOUT_MS, 0); - } - - private CompletableFuture cleanup( - @Nullable ZonePartitionId commitPartitionId, - Collection partitions, - boolean commit, - @Nullable HybridTimestamp commitTimestamp, - UUID txId, - long timeout, - int attemptsMade ) { Map partitionIds = partitions.stream() .collect(toMap(EnlistedPartitionGroup::groupId, identity())); @@ -277,9 +268,7 @@ private CompletableFuture cleanup( commit, commitTimestamp, txId, - toPartitionInfos(partitionData.partitionsWithoutPrimary, partitionIds), - timeout, - attemptsMade + toPartitionInfos(partitionData.partitionsWithoutPrimary, partitionIds) ); Map> partitionsByPrimaryName = toPartitionInfosByPrimaryName( @@ -291,9 +280,7 @@ private CompletableFuture cleanup( partitionsByPrimaryName, commit, commitTimestamp, - txId, - timeout, - attemptsMade + txId ); }); } @@ -320,9 +307,7 @@ private void cleanupPartitionsWithoutPrimary( boolean commit, @Nullable HybridTimestamp commitTimestamp, UUID txId, - List partitionsWithoutPrimary, - long timeout, - int attemptsMade + List partitionsWithoutPrimary ) { Map partitionIds = partitionsWithoutPrimary.stream() .collect(toMap(EnlistedPartitionGroup::groupId, identity())); @@ -340,9 +325,7 @@ private void cleanupPartitionsWithoutPrimary( partitionsByPrimaryName, commit, commitTimestamp, - txId, - timeout, - attemptsMade + txId ); }); } @@ -352,9 +335,7 @@ private CompletableFuture cleanupPartitions( Map> partitionsByNode, boolean commit, @Nullable HybridTimestamp commitTimestamp, - UUID txId, - long timeout, - int attemptsMade + UUID txId ) { List> cleanupFutures = new ArrayList<>(); @@ -363,7 +344,7 @@ private CompletableFuture cleanupPartitions( List nodePartitions = entry.getValue(); cleanupFutures.add(sendCleanupMessageWithRetries(commitPartitionId, commit, commitTimestamp, txId, node, - commitPartitionId == null ? null : nodePartitions, timeout, attemptsMade)); + commitPartitionId == null ? null : nodePartitions)); } return allOf(cleanupFutures.toArray(new CompletableFuture[0])); @@ -375,9 +356,7 @@ private CompletableFuture sendCleanupMessageWithRetries( @Nullable HybridTimestamp commitTimestamp, UUID txId, String node, - @Nullable Collection partitions, - long timeout, - int attemptsMade + @Nullable Collection partitions ) { return txMessageSender.cleanup(node, partitions, txId, commit, commitTimestamp) .thenApply(response -> { @@ -391,15 +370,20 @@ private CompletableFuture sendCleanupMessageWithRetries( }) .handleAsync((networkMessage, throwable) -> { if (throwable != null) { + String timeoutKey = commitPartitionId == null ? txId.toString() : commitPartitionId.toString(); + if (ReplicatorRecoverableExceptions.isRecoverable(throwable)) { - if (attemptsMade > ATTEMPTS_LOG_THRESHOLD) { - throttledLog.warn( - "Unsuccessful transaction cleanup after {} attempts, keep retrying [txId={}]", - throwable, - ATTEMPTS_LOG_THRESHOLD, - txId - ); - } + retryContext.getState(timeoutKey).ifPresent(timeoutState -> { + if (timeoutState.getAttempt() > ATTEMPTS_LOG_THRESHOLD || timeoutState.getAttempt() < 0) { + throttledLog.warn( + "Unsuccessful transaction cleanup after {} attempts for key {}, keep retrying [txId={}]", + throwable, + ATTEMPTS_LOG_THRESHOLD, + timeoutKey, + txId + ); + } + }); // In the case of a failure we repeat the process, but start with finding correct primary replicas // for this subset of partitions. If nothing changed in terms of the nodes and primaries @@ -418,13 +402,12 @@ private CompletableFuture sendCleanupMessageWithRetries( commitTimestamp, txId, node, - partitions, - incrementTimeout(timeout), - attemptsMade + 1 + partitions ), - timeout, + retryContext.updateAndGetState(timeoutKey).getTimeout(), TimeUnit.MILLISECONDS, - retryExecutor + retryExecutor, + Optional.of(() -> retryContext.resetState(timeoutKey)) ); } @@ -436,13 +419,12 @@ private CompletableFuture sendCleanupMessageWithRetries( partitions, commit, commitTimestamp, - txId, - incrementTimeout(timeout), - attemptsMade + 1 + txId ), - timeout, + retryContext.updateAndGetState(timeoutKey).getTimeout(), TimeUnit.MILLISECONDS, - retryExecutor + retryExecutor, + Optional.of(() -> retryContext.resetState(timeoutKey)) ); } @@ -454,10 +436,6 @@ private CompletableFuture sendCleanupMessageWithRetries( .thenCompose(v -> v); } - private static long incrementTimeout(long currentTimeout) { - return min(currentTimeout * 2, RETRY_MAX_TIMEOUT_MS); - } - private static class CleanupContext { private final ZonePartitionId commitPartitionId; diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java index 5bbee2749f1b..c431858f04e3 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java @@ -43,12 +43,14 @@ import static org.apache.ignite.internal.util.ExceptionUtils.unwrapRootCause; import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis; import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; +import static org.apache.ignite.internal.util.retry.RetryUtil.scheduleRetry; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; @@ -120,6 +122,8 @@ import org.apache.ignite.internal.tx.views.LocksViewProvider; import org.apache.ignite.internal.tx.views.TransactionsViewProvider; import org.apache.ignite.internal.util.CompletableFutures; +import org.apache.ignite.internal.util.retry.KeyBasedRetryContext; +import org.apache.ignite.internal.util.retry.TimeoutStrategy; import org.apache.ignite.lang.ErrorGroups.Common; import org.apache.ignite.tx.TransactionException; import org.jetbrains.annotations.Nullable; @@ -241,6 +245,8 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler, SystemVi private final ConcurrentLinkedQueue> stopFuts = new ConcurrentLinkedQueue<>(); + private final KeyBasedRetryContext retryContext; + /** * Test-only constructor. * @@ -259,6 +265,7 @@ public class TxManagerImpl implements TxManager, NetworkMessageHandler, SystemVi * @param transactionInflights Transaction inflights. * @param lowWatermark Low watermark. * @param metricManager Metric manager. + * @param timeoutStrategy Timeout strategy. */ @TestOnly public TxManagerImpl( @@ -277,7 +284,8 @@ public TxManagerImpl( TransactionInflights transactionInflights, LowWatermark lowWatermark, ScheduledExecutorService commonScheduler, - MetricManager metricManager + MetricManager metricManager, + TimeoutStrategy timeoutStrategy ) { this( clusterService.nodeName(), @@ -299,7 +307,8 @@ public TxManagerImpl( lowWatermark, commonScheduler, new FailureManager(new NoOpFailureHandler()), - metricManager + metricManager, + timeoutStrategy ); } @@ -323,6 +332,7 @@ public TxManagerImpl( * @param transactionInflights Transaction inflights. * @param lowWatermark Low watermark. * @param metricManager Metric manager. + * @param timeoutStrategy Timeout strategy. */ public TxManagerImpl( String nodeName, @@ -344,7 +354,8 @@ public TxManagerImpl( LowWatermark lowWatermark, ScheduledExecutorService commonScheduler, FailureProcessor failureProcessor, - MetricManager metricManager + MetricManager metricManager, + TimeoutStrategy timeoutStrategy ) { this.txConfig = txConfig; this.systemCfg = systemCfg; @@ -401,12 +412,15 @@ public TxManagerImpl( transactionExpirationRegistry = new TransactionExpirationRegistry(txStateVolatileStorage); + retryContext = new KeyBasedRetryContext(20, timeoutStrategy); + txCleanupRequestSender = new TxCleanupRequestSender( txMessageSender, placementDriverHelper, txStateVolatileStorage, writeIntentSwitchPool, - commonScheduler + commonScheduler, + retryContext ); txMetrics = new TransactionMetricsSource(clockService); @@ -824,7 +838,7 @@ private CompletableFuture trackFuture(CompletableFuture fut) { */ private CompletableFuture durableFinish( HybridTimestampTracker observableTimestampTracker, - ZonePartitionId commitPartition, + @Nullable ZonePartitionId commitPartition, boolean commit, Map enlistedPartitions, UUID txId, @@ -868,14 +882,23 @@ private CompletableFuture durableFinish( if (ReplicatorRecoverableExceptions.isRecoverable(cause)) { LOG.debug("Failed to finish Tx. The operation will be retried {}.", ex, formatTxInfo(txId, txStateVolatileStorage)); - return supplyAsync(() -> durableFinish( - observableTimestampTracker, - commitPartition, - commit, - enlistedPartitions, - txId, - commitTimestamp, - txFinishFuture + + String timeoutKey = commitPartition == null ? txId.toString() : commitPartition.toString(); + + return supplyAsync(() -> scheduleRetry( + () -> durableFinish( + observableTimestampTracker, + commitPartition, + commit, + enlistedPartitions, + txId, + commitTimestamp, + txFinishFuture + ), + retryContext.updateAndGetState(timeoutKey).getTimeout(), + MILLISECONDS, + commonScheduler, + Optional.of(() -> retryContext.resetState(timeoutKey)) ), partitionOperationsExecutor).thenCompose(identity()); } else { LOG.warn("Failed to finish Tx {}.", ex, @@ -1340,4 +1363,9 @@ private static long longProperty(SystemDistributedConfiguration systemProperties public void clearLocalRwTxCounter() { localRwTxCounter.clear(); } + + @TestOnly + public KeyBasedRetryContext retryContext() { + return retryContext; + } } diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java index 1732625a81ea..5cbc0255b6cf 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxCleanupTest.java @@ -66,6 +66,8 @@ import org.apache.ignite.internal.tx.impl.TxCleanupRequestSender; import org.apache.ignite.internal.tx.impl.TxMessageSender; import org.apache.ignite.internal.tx.impl.VolatileTxStateMetaStorage; +import org.apache.ignite.internal.util.retry.KeyBasedRetryContext; +import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy; import org.apache.ignite.network.NetworkAddress; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -148,7 +150,8 @@ public void setup() { placementDriverHelper, mock(VolatileTxStateMetaStorage.class), testSyncExecutorService(), - testSyncScheduledExecutorService() + testSyncScheduledExecutorService(), + new KeyBasedRetryContext(20, new NoopTimeoutStrategy()) ); } diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java index 0c10b0e5e8c3..d54efcb489b6 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java @@ -107,6 +107,7 @@ import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest; import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter; import org.apache.ignite.internal.tx.test.TestTransactionIds; +import org.apache.ignite.internal.util.retry.NoopTimeoutStrategy; import org.apache.ignite.lang.ErrorGroups.Transactions; import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.tx.MismatchingTransactionOutcomeException; @@ -203,7 +204,8 @@ public void setup() { transactionInflights, lowWatermark, commonScheduler, - new TestMetricManager() + new TestMetricManager(), + new NoopTimeoutStrategy() ); assertThat(txManager.startAsync(new ComponentContext()), willCompleteSuccessfully());