diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java index 498e61a70a18..520ca3392d3e 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java @@ -412,6 +412,12 @@ public void onShutdown() { } } + @Override + public long getPersistedAppliedIndex() { + // Clamp to 0 because lastAppliedIndex() returns -1 during rebalance. + return max(0, txStateStorage.lastAppliedIndex()); + } + /** * Adds a given Table Partition-level Raft processor to the set of managed processors. * diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java index bf2697e9b2d0..f2d71fb7af42 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java @@ -96,4 +96,14 @@ default void onConfigurationCommitted( * Invoked once after a raft node has been shut down. */ void onShutdown(); + + /** + * Returns the last applied index persisted by the state machine. + * Called during {@code NodeImpl.init()} to prevent truncation of already-applied log entries. + * + * @return persisted applied index, or 0 if unknown. + */ + default long getPersistedAppliedIndex() { + return 0; + } } diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java index 15dcbf16017d..05f134b872aa 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java @@ -870,6 +870,11 @@ public RaftGroupListener getListener() { return listener; } + @Override + public long getPersistedAppliedIndex() { + return listener.getPersistedAppliedIndex(); + } + @Override public void onApply(Iterator iter) { var iterWrapper = new WriteCommandIterator(iter, marshaller); diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/StateMachine.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/StateMachine.java index 3519c99520dd..b4dff1d1cc00 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/StateMachine.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/StateMachine.java @@ -133,4 +133,14 @@ default void onRawConfigurationCommitted(ConfigurationEntry conf, long lastAppli * @param ctx context of leader change */ void onStartFollowing(final LeaderChangeContext ctx); + + /** + * Returns the last applied index persisted by the state machine. + * Called during {@code NodeImpl.init()} to prevent truncation of already-applied log entries. + * + * @return persisted applied index, or 0 if unknown. + */ + default long getPersistedAppliedIndex() { + return 0; + } } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java index d373a55f6c5d..fec7137c5643 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java @@ -1066,6 +1066,19 @@ public boolean init(final NodeOptions opts) { return false; } + // Restore appliedId so that unsafeTruncateSuffix() can reject truncation of applied entries. + long persistedApplied = this.options.getFsm().getPersistedAppliedIndex(); + if (persistedApplied > 0) { + long term = this.logManager.getTerm(persistedApplied); + if (term > 0) { + // Term is 0 when the index is outside the log (covered by a snapshot) — skip in that case. + this.logManager.setAppliedId(new LogId(persistedApplied, term)); + } else { + LOG.warn("Node {} persisted applied index {} is not in the raft log, expecting snapshot to cover it.", + getNodeId(), persistedApplied); + } + } + final Status st = this.logManager.checkConsistency(); if (!st.isOk()) { LOG.error("Node {} is initialized with inconsistent log, status={}.", getNodeId(), st); diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java index 78fa5a61d004..12acc4dc846c 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java @@ -1049,11 +1049,25 @@ private boolean reset(final long nextLogIndex) { } } - private void unsafeTruncateSuffix(final long lastIndexKept, final Lock lock) { + /** + * Truncates log entries after {@code lastIndexKept}. + * + * @return {@code true} on success, {@code false} if truncation would discard applied entries (node moves to error state). + */ + private boolean unsafeTruncateSuffix(final long lastIndexKept, final Lock lock) { if (lastIndexKept < this.appliedId.getIndex()) { - LOG.error("FATAL ERROR: Can't truncate logs before appliedId={}, lastIndexKept={}", this.appliedId, - lastIndexKept); - return; + LOG.error("Raft log suffix conflict: cannot truncate entries that have been applied to the state machine. " + + "nodeId={}, appliedId={}, lastIndexKept={}. The partition will be moved to error state.", + this.nodeId, this.appliedId, lastIndexKept); + lock.unlock(); + try { + reportError(RaftError.EINVAL.getNumber(), + "Raft log suffix conflict: attempted to truncate applied entries, appliedId=%s, lastIndexKept=%d", + this.appliedId, lastIndexKept); + } finally { + lock.lock(); + } + return false; } this.logsInMemory.removeFromLastWhen(entry -> entry.getId().getIndex() > lastIndexKept); @@ -1068,6 +1082,7 @@ private void unsafeTruncateSuffix(final long lastIndexKept, final Lock lock) { final TruncateSuffixClosure c = new TruncateSuffixClosure(lastIndexKept, lastTermKept); offerEvent(c, EventType.TRUNCATE_SUFFIX); lock.lock(); + return true; } @SuppressWarnings("NonAtomicOperationOnVolatileField") @@ -1121,7 +1136,11 @@ private boolean checkAndResolveConflict(final List entries, final Stab if (entries.get(conflictingIndex).getId().getIndex() <= this.lastLogIndex) { // Truncate all the conflicting entries to make local logs // consensus with the leader. - unsafeTruncateSuffix(entries.get(conflictingIndex).getId().getIndex() - 1, lock); + if (!unsafeTruncateSuffix(entries.get(conflictingIndex).getId().getIndex() - 1, lock)) { + Utils.runClosureInThread(nodeOptions.getCommonExecutor(), done, + new Status(RaftError.EINVAL, "Raft log suffix conflict with applied entries")); + return false; + } } this.lastLogIndex = lastLogEntry.getId().getIndex(); } // else this is a duplicated AppendEntriesRequest, we have diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java index b9ec11bad7fd..85f6fb1efc94 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java @@ -23,9 +23,14 @@ import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.verify; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import org.apache.ignite.internal.thread.IgniteThreadFactory; @@ -40,6 +45,7 @@ import org.apache.ignite.raft.jraft.entity.EnumOutter; import org.apache.ignite.raft.jraft.entity.LogEntry; import org.apache.ignite.raft.jraft.entity.LogId; +import org.apache.ignite.raft.jraft.error.RaftException; import org.apache.ignite.raft.jraft.entity.NodeId; import org.apache.ignite.raft.jraft.entity.PeerId; import org.apache.ignite.raft.jraft.entity.RaftOutter; @@ -476,4 +482,46 @@ public void testLastLogIndexWhenShutdown() throws Exception { Exception e = assertThrows(IllegalStateException.class, () -> this.logManager.getLastLogIndex(true)); assertEquals("Node is shutting down", e.getMessage()); } + + /** Suffix truncation below appliedId must report error and abort the append (IGNITE-25502). */ + @Test + public void testSuffixTruncationBelowAppliedIndexReportsError() { + List entries = new ArrayList<>(); + for (int i = 1; i <= 10; i++) { + entries.add(TestUtils.mockEntry(i, 1)); + } + assertThat("Initial entries should be appended successfully", appendEntries(entries), willBe(true)); + assertEquals(10, this.logManager.getLastLogIndex()); + + this.logManager.getLastLogId(true); // Flush disruptor so setDiskId() completes. + this.logManager.setAppliedId(new LogId(8, 1)); + + // Conflicting entries at index 6+ (term 2 vs existing term 1) trigger unsafeTruncateSuffix(5). + // Since 5 < appliedId.index (8), this must be rejected. + List conflicting = new ArrayList<>(); + for (int i = 6; i <= 12; i++) { + conflicting.add(TestUtils.mockEntry(i, 2)); + } + assertThat("Append should fail due to suffix conflict with applied entries", appendEntries(conflicting), willBe(false)); + + verify(fsmCaller).onError(any(RaftException.class)); + assertEquals(10, this.logManager.getLastLogIndex()); + + for (int i = 1; i <= 10; i++) { + LogEntry entry = this.logManager.getEntry(i); + assertEquals(i, entry.getId().getIndex()); + assertEquals(1, entry.getId().getTerm()); + } + } + + private CompletableFuture appendEntries(List entries) { + CompletableFuture future = new CompletableFuture<>(); + this.logManager.appendEntries(new ArrayList<>(entries), new LogManager.StableClosure() { + @Override + public void run(Status status) { + future.complete(status.isOk()); + } + }); + return future; + } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java index cfa64a01cf75..83e14a4db017 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ItTruncateRaftLogAndRestartNodesTest.java @@ -23,8 +23,10 @@ import static org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.util.CompletableFutures.allOf; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.arrayWithSize; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; @@ -72,13 +74,13 @@ import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.raft.jraft.conf.ConfigurationManager; import org.apache.ignite.raft.jraft.core.NodeImpl; +import org.apache.ignite.raft.jraft.core.State; import org.apache.ignite.raft.jraft.option.LogStorageOptions; import org.apache.ignite.raft.jraft.option.NodeOptions; import org.apache.ignite.raft.jraft.option.RaftOptions; import org.apache.ignite.raft.jraft.storage.LogStorage; import org.apache.ignite.tx.TransactionOptions; import org.hamcrest.Matchers; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; /** @@ -98,7 +100,6 @@ protected int initialNodes() { return 0; } - @Disabled("https://issues.apache.org/jira/browse/IGNITE-25502") @Test void enterNodeWithIndexGreaterThanCurrentMajority() throws Exception { cluster.startAndInit(3); @@ -148,12 +149,21 @@ void enterNodeWithIndexGreaterThanCurrentMajority() throws Exception { startNode(2); + // Node 2 has applied entries that the new majority (nodes 0, 1) doesn't have, + // so it must go to ERROR state when the leader tries to overwrite those entries. + await() + .timeout(10, TimeUnit.SECONDS) + .untilAsserted(() -> + assertThat(raftNodeImpl(2, replicationGroup).getState(), equalTo(State.STATE_ERROR)) + ); + + // SQL should still work via the healthy majority. assertThat( toPeopleFromSqlRows(executeSql(selectPeopleDml(TABLE_NAME))), arrayWithSize(Matchers.allOf(greaterThan(0), lessThan(people.length))) ); - for (int nodeIndex = 0; nodeIndex < 3; nodeIndex++) { + for (int nodeIndex = 0; nodeIndex < 2; nodeIndex++) { assertThat( "nodeIndex=" + nodeIndex, scanPeopleFromAllPartitions(nodeIndex, TABLE_NAME),