Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
5db24b3
IGNITE-27685 Implement exponential backoff for durable finish
JKonSir Feb 26, 2026
e9fd277
Merge branch 'main' into ignite-27685
JKonSir Feb 27, 2026
283c950
Merge branch 'my-main' into ignite-27685
JKonSir Mar 3, 2026
61e687d
Merge branch 'main' into ignite-27685
JKonSir Mar 3, 2026
2d6cca5
IGNITE-27685 Implement exponential backoff for durable finish
JKonSir Mar 4, 2026
d0dc5e6
IGNITE-27685 Implement exponential backoff for durable finish
JKonSir Mar 5, 2026
c1a4133
IGNITE-27685 Implement exponential backoff for durable finish
JKonSir Mar 5, 2026
f0fe45a
Merge branch 'main' into ignite-27685
JKonSir Mar 6, 2026
f537ec2
Merge remote-tracking branch 'my-ai3/ignite-27685' into ignite-27685
JKonSir Mar 6, 2026
99128d6
IGNITE-27685 Implement exponential backoff for durable finish
JKonSir Mar 9, 2026
12be8ac
IGNITE-27685 Implement exponential backoff for durable finish
JKonSir Mar 10, 2026
344ff9a
IGNITE-27685 Implement exponential backoff for durable finish
JKonSir Mar 10, 2026
eced1e7
IGNITE-27685 Implement exponential backoff for durable finish
JKonSir Mar 11, 2026
6e8bce6
IGNITE-27685 Implement exponential backoff for durable finish
JKonSir Mar 11, 2026
db0bf9f
IGNITE-27685 Implement exponential backoff for durable finish
JKonSir Mar 11, 2026
6cf9ed9
IGNITE-27685 Implement exponential backoff for durable finish
JKonSir Mar 11, 2026
3884105
Merge branch 'main' into ignite-27685
JKonSir Mar 12, 2026
47248a3
IGNITE-27685 Implement exponential backoff for durable finish
JKonSir Mar 12, 2026
c8f6bee
IGNITE-27685 Implement exponential backoff for durable finish
JKonSir Mar 12, 2026
7ad88ef
Merge branch 'main' into ignite-27685
JKonSir Mar 12, 2026
7768b2b
IGNITE-27685 Implement exponential backoff for durable finish
JKonSir Mar 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -60,7 +59,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -1341,35 +1339,6 @@ public static Object[] flatArray(Object... arguments) {
return list.toArray();
}

/**
* Schedules the provided operation to be retried after the specified delay.
*
* @param operation Operation.
* @param delay Delay.
* @param unit Time unit of the delay.
* @param executor Executor to schedule the retry in.
* @return Future that is completed when the operation is successful or failed with an exception.
*/
public static <T> CompletableFuture<T> scheduleRetry(
Callable<CompletableFuture<T>> operation,
long delay,
TimeUnit unit,
ScheduledExecutorService executor
) {
CompletableFuture<T> future = new CompletableFuture<>();

executor.schedule(() -> operation.call()
.whenComplete((res, e) -> {
if (e == null) {
future.complete(res);
} else {
future.completeExceptionally(e);
}
}), delay, unit);

return future;
}

private static CompletableFuture<Void> startAsync(ComponentContext componentContext, Stream<? extends IgniteComponent> components) {
return allOf(components
.filter(Objects::nonNull)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.util.retry;

import java.util.concurrent.ThreadLocalRandom;

/**
* A {@link TimeoutStrategy} that increases retry timeouts exponentially on each attempt.
*
* <p>Each call to {@link #next(int)} multiplies the current timeout by {@code backoffCoefficient},
* capping the result at {@link #maxTimeout()}. Optionally, random jitter can be applied to spread
* retry attempts across time and avoid thundering herd problems under high concurrency.
*
* <p>When jitter is enabled, the returned timeout is randomized within the range
* {@code [raw / 2, raw * 1.5]}, then capped at {@link #maxTimeout()}.
*
* <p>This class is stateless and thread-safe. A single instance can be shared across
* multiple retry contexts.
*/
public class ExponentialBackoffTimeoutStrategy implements TimeoutStrategy {
/** Default backoff coefficient applied on each retry step. Doubles the timeout per attempt. */
private static final double DEFAULT_BACKOFF_COEFFICIENT = 2.0;

/**
* Multiplier applied to the current timeout on each call to {@link #next(int)}.
* Must be greater than {@code 1.0} to produce a growing sequence.
*/
private final double backoffCoefficient;

/**
* Whether to apply random jitter to the computed timeout.
* When {@code true}, the result is randomized within {@code [raw / 2, raw * 1.5]}.
*/
private final boolean jitter;

/**
* Upper bound for the timeout produced by this strategy, in milliseconds.
* The result of {@link #next(int)} is always capped at this value.
*/
private final int maxTimeout;

/**
* Creates a strategy with default max timeout and backoff coefficient, and no jitter.
*
* @see TimeoutStrategy#DEFAULT_TIMEOUT_MS_MAX
*/
public ExponentialBackoffTimeoutStrategy() {
this(DEFAULT_TIMEOUT_MS_MAX, DEFAULT_BACKOFF_COEFFICIENT);
}

/**
* Creates a strategy with the given max timeout and backoff coefficient, and no jitter.
*
* @param maxTimeout maximum timeout this strategy may produce, in milliseconds.
* @param backoffCoefficient multiplier applied to the current timeout on each step.
* Must be greater than {@code 1.0}.
*/
public ExponentialBackoffTimeoutStrategy(
int maxTimeout,
double backoffCoefficient
) {
this(maxTimeout, backoffCoefficient, false);
}

/**
* Creates a strategy with the given max timeout, backoff coefficient, and jitter setting.
*
* @param maxTimeout maximum timeout this strategy may produce, in milliseconds.
* @param backoffCoefficient multiplier applied to the current timeout on each step.
* Must be greater than {@code 1.0}.
* @param jitter if {@code true}, random jitter is applied to each computed timeout.
*/
public ExponentialBackoffTimeoutStrategy(
int maxTimeout,
double backoffCoefficient,
boolean jitter
) {
this.maxTimeout = maxTimeout;
this.backoffCoefficient = backoffCoefficient;
this.jitter = jitter;
}

/**
* Computes the next retry timeout by multiplying {@code currentTimeout} by
* {@link #backoffCoefficient}, then capping at {@link #maxTimeout}.
* If jitter is enabled, the result is further randomized via {@link #applyJitter(int)}.
*
* @param currentTimeout current retry timeout in milliseconds.
* @return next retry timeout in milliseconds, capped at {@link #maxTimeout}.
*/
@Override
public int next(int currentTimeout) {
int raw = (int) Math.min((currentTimeout * backoffCoefficient), maxTimeout);

return jitter ? applyJitter(raw) : raw;
}

/** {@inheritDoc} */
@Override
public int maxTimeout() {
return maxTimeout;
}

/**
* Applies random jitter to the given timeout value.
*
* <p>The result is uniformly distributed within {@code [raw / 2, raw * 1.5]},
* then capped at {@link #maxTimeout} to ensure the strategy ceiling is never exceeded.
*
* @param raw computed timeout before jitter, in milliseconds.
* @return jittered timeout in milliseconds, capped at {@link #maxTimeout}.
*/
private int applyJitter(int raw) {
int lo = raw / 2;
int hi = raw + lo;

return Math.min(lo + ThreadLocalRandom.current().nextInt(hi - lo + 1), maxTimeout);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.util.retry;

import static java.util.Collections.unmodifiableMap;
import static java.util.Optional.of;
import static java.util.Optional.ofNullable;
import static org.apache.ignite.internal.util.retry.TimeoutState.attempt;
import static org.apache.ignite.internal.util.retry.TimeoutState.timeout;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.jetbrains.annotations.TestOnly;

/**
* A retry context that tracks timeout state independently per key.
*
* <p>Each key maps to its own {@link TimeoutState}, allowing separate backoff progression
* for different retry targets — for example, different replication group IDs or transaction IDs.
* State updates are performed atomically per key using {@link ConcurrentHashMap#compute}.
*
* <p>To prevent unbounded memory growth, the registry is capped at {@link #REGISTRY_SIZE_LIMIT}
* entries. Once the limit is reached, untracked keys receive a fixed {@link #fallbackTimeoutState}
* that always returns {@link TimeoutStrategy#maxTimeout()}. The limit is a soft cap and may be
* slightly exceeded under concurrent insertions.
*
* <p>This class is thread-safe.
*/
public class KeyBasedRetryContext {
/**
* Maximum number of keys tracked in {@link #registry}.
* Can be slightly exceeded under concurrent insertions. See class-level Javadoc.
*/
private static final int REGISTRY_SIZE_LIMIT = 1_000;

/**
* Timeout used when creating a new {@link TimeoutState} for a key that has no prior state.
* Also used as the reset value when a key's state is removed.
*/
private final int initialTimeout;

/** Strategy used to compute the next timeout from the current one on each advancement. */
private final TimeoutStrategy timeoutStrategy;

/**
* Sentinel state returned for keys that cannot be tracked because the registry is full.
* Initialized with {@link TimeoutStrategy#maxTimeout()} and attempt {@code -1} to distinguish
* it from legitimately tracked states.
*/
private final TimeoutState fallbackTimeoutState;

/** Per-key timeout state registry. Keys are typically transaction IDs or replication group IDs. */
private final ConcurrentHashMap<String, TimeoutState> registry = new ConcurrentHashMap<>();

/**
* Creates a new context with the given initial timeout and strategy.
*
* @param initialTimeout timeout used for the first retry attempt of any new key, in milliseconds.
* @param timeoutStrategy strategy used to compute subsequent timeout values.
*/
public KeyBasedRetryContext(int initialTimeout, TimeoutStrategy timeoutStrategy) {
this.initialTimeout = initialTimeout;
this.timeoutStrategy = timeoutStrategy;

this.fallbackTimeoutState = new TimeoutState(timeoutStrategy.maxTimeout(), -1);
}

/**
* Returns the current {@link TimeoutState} for the given key, if tracked.
*
* <p>Returns an empty {@link Optional} if the key has no recorded state yet.
* If the registry is full and the key is not already tracked, returns
* {@link Optional} containing the {@link #fallbackTimeoutState}.
*
* <p>This method does not insert the key into the registry.
*
* @param key the key to look up, typically a transaction ID or replication group ID.
* @return current state for the key, fallback state if registry is full, or empty if not tracked.
*/
public Optional<TimeoutState> getState(String key) {
if (!registry.containsKey(key) && registry.size() >= REGISTRY_SIZE_LIMIT) {
return of(fallbackTimeoutState);
}

return ofNullable(registry.get(key));
}

/**
* Atomically advances the retry state for the given key and returns the updated state.
*
* <p>If the key has no prior state, a new {@link TimeoutState} is created with
* {@link #initialTimeout} and attempt count {@code 1}. Otherwise, the timeout is
* advanced using {@link TimeoutStrategy#next(int)} and the attempt count is incremented.
*
* <p>The update is performed inside {@link ConcurrentHashMap#compute}, which holds
* an exclusive per-key lock for the duration of the lambda. The CAS on the
* {@link TimeoutState}'s internal {@link AtomicLong} is therefore always expected
* to succeed on the first attempt within the lambda.
*
* <p>If the registry is full and the key is not already tracked, returns
* {@link #fallbackTimeoutState} without modifying the registry.
*
* @param key the key to advance state for, typically a transaction ID or replication group ID.
* @return updated {@link TimeoutState} for the key, or {@link #fallbackTimeoutState}
* if the registry is full.
*/
public TimeoutState updateAndGetState(String key) {
if (!registry.containsKey(key) && registry.size() >= REGISTRY_SIZE_LIMIT) {
return fallbackTimeoutState;
}

return registry.compute(key, (k, state) -> {
if (state == null) {
return new TimeoutState(initialTimeout, 1);
}

long currentState = state.getRawState();
state.update(currentState, timeoutStrategy.next(timeout(currentState)), attempt(currentState) + 1);

return state;
});
}

/**
* Removes the retry state for the given key, resetting it as if no retries had occurred.
*
* <p>The next call to {@link #updateAndGetState(String)} for this key after a reset
* will create fresh state starting from {@link #initialTimeout}.
*
* @param key the key whose state should be removed.
*/
public void resetState(String key) {
registry.remove(key);
}

/**
* Returns an unmodifiable snapshot of the current registry contents.
*
* <p>The snapshot is a point-in-time copy of the registry map. The returned
* {@link TimeoutState} values are live references — their internal state may
* continue to change concurrently after the snapshot is taken.
*
* <p>This method is intended for testing only and should not be used in production code.
*
* @return unmodifiable copy of the current key-to-state mappings.
*/
@TestOnly
public Map<String, TimeoutState> snapshot() {
return unmodifiableMap(new HashMap<>(registry));
}
}
Loading