Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,12 @@ public void onShutdown() {
}
}

@Override
public long getPersistedAppliedIndex() {
// Clamp to 0 because lastAppliedIndex() returns -1 during rebalance.
return max(0, txStateStorage.lastAppliedIndex());
}

/**
* Adds a given Table Partition-level Raft processor to the set of managed processors.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,14 @@ default void onConfigurationCommitted(
* Invoked once after a raft node has been shut down.
*/
void onShutdown();

/**
* Returns the last applied index persisted by the state machine.
* Called during {@code NodeImpl.init()} to prevent truncation of already-applied log entries.
*
* @return persisted applied index, or 0 if unknown.
*/
default long getPersistedAppliedIndex() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,11 @@ public RaftGroupListener getListener() {
return listener;
}

@Override
public long getPersistedAppliedIndex() {
return listener.getPersistedAppliedIndex();
}

@Override
public void onApply(Iterator iter) {
var iterWrapper = new WriteCommandIterator(iter, marshaller);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,14 @@ default void onRawConfigurationCommitted(ConfigurationEntry conf, long lastAppli
* @param ctx context of leader change
*/
void onStartFollowing(final LeaderChangeContext ctx);

/**
* Returns the last applied index persisted by the state machine.
* Called during {@code NodeImpl.init()} to prevent truncation of already-applied log entries.
*
* @return persisted applied index, or 0 if unknown.
*/
default long getPersistedAppliedIndex() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,19 @@ public boolean init(final NodeOptions opts) {
return false;
}

// Restore appliedId so that unsafeTruncateSuffix() can reject truncation of applied entries.
long persistedApplied = this.options.getFsm().getPersistedAppliedIndex();
if (persistedApplied > 0) {
long term = this.logManager.getTerm(persistedApplied);
if (term > 0) {
// Term is 0 when the index is outside the log (covered by a snapshot) — skip in that case.
this.logManager.setAppliedId(new LogId(persistedApplied, term));
} else {
LOG.warn("Node {} persisted applied index {} is not in the raft log, expecting snapshot to cover it.",
getNodeId(), persistedApplied);
}
}

final Status st = this.logManager.checkConsistency();
if (!st.isOk()) {
LOG.error("Node {} is initialized with inconsistent log, status={}.", getNodeId(), st);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1049,11 +1049,25 @@ private boolean reset(final long nextLogIndex) {
}
}

private void unsafeTruncateSuffix(final long lastIndexKept, final Lock lock) {
/**
* Truncates log entries after {@code lastIndexKept}.
*
* @return {@code true} on success, {@code false} if truncation would discard applied entries (node moves to error state).
*/
private boolean unsafeTruncateSuffix(final long lastIndexKept, final Lock lock) {
if (lastIndexKept < this.appliedId.getIndex()) {
LOG.error("FATAL ERROR: Can't truncate logs before appliedId={}, lastIndexKept={}", this.appliedId,
lastIndexKept);
return;
LOG.error("Raft log suffix conflict: cannot truncate entries that have been applied to the state machine. "
+ "nodeId={}, appliedId={}, lastIndexKept={}. The partition will be moved to error state.",
this.nodeId, this.appliedId, lastIndexKept);
lock.unlock();
try {
reportError(RaftError.EINVAL.getNumber(),
"Raft log suffix conflict: attempted to truncate applied entries, appliedId=%s, lastIndexKept=%d",
this.appliedId, lastIndexKept);
} finally {
lock.lock();
}
return false;
}

this.logsInMemory.removeFromLastWhen(entry -> entry.getId().getIndex() > lastIndexKept);
Expand All @@ -1068,6 +1082,7 @@ private void unsafeTruncateSuffix(final long lastIndexKept, final Lock lock) {
final TruncateSuffixClosure c = new TruncateSuffixClosure(lastIndexKept, lastTermKept);
offerEvent(c, EventType.TRUNCATE_SUFFIX);
lock.lock();
return true;
}

@SuppressWarnings("NonAtomicOperationOnVolatileField")
Expand Down Expand Up @@ -1121,7 +1136,11 @@ private boolean checkAndResolveConflict(final List<LogEntry> entries, final Stab
if (entries.get(conflictingIndex).getId().getIndex() <= this.lastLogIndex) {
// Truncate all the conflicting entries to make local logs
// consensus with the leader.
unsafeTruncateSuffix(entries.get(conflictingIndex).getId().getIndex() - 1, lock);
if (!unsafeTruncateSuffix(entries.get(conflictingIndex).getId().getIndex() - 1, lock)) {
Utils.runClosureInThread(nodeOptions.getCommonExecutor(), done,
new Status(RaftError.EINVAL, "Raft log suffix conflict with applied entries"));
return false;
}
}
this.lastLogIndex = lastLogEntry.getId().getIndex();
} // else this is a duplicated AppendEntriesRequest, we have
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,14 @@
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.verify;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
Expand All @@ -40,6 +45,7 @@
import org.apache.ignite.raft.jraft.entity.EnumOutter;
import org.apache.ignite.raft.jraft.entity.LogEntry;
import org.apache.ignite.raft.jraft.entity.LogId;
import org.apache.ignite.raft.jraft.error.RaftException;
import org.apache.ignite.raft.jraft.entity.NodeId;
import org.apache.ignite.raft.jraft.entity.PeerId;
import org.apache.ignite.raft.jraft.entity.RaftOutter;
Expand Down Expand Up @@ -476,4 +482,46 @@ public void testLastLogIndexWhenShutdown() throws Exception {
Exception e = assertThrows(IllegalStateException.class, () -> this.logManager.getLastLogIndex(true));
assertEquals("Node is shutting down", e.getMessage());
}

/** Suffix truncation below appliedId must report error and abort the append (IGNITE-25502). */
@Test
public void testSuffixTruncationBelowAppliedIndexReportsError() {
List<LogEntry> entries = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
entries.add(TestUtils.mockEntry(i, 1));
}
assertThat("Initial entries should be appended successfully", appendEntries(entries), willBe(true));
assertEquals(10, this.logManager.getLastLogIndex());

this.logManager.getLastLogId(true); // Flush disruptor so setDiskId() completes.
this.logManager.setAppliedId(new LogId(8, 1));

// Conflicting entries at index 6+ (term 2 vs existing term 1) trigger unsafeTruncateSuffix(5).
// Since 5 < appliedId.index (8), this must be rejected.
List<LogEntry> conflicting = new ArrayList<>();
for (int i = 6; i <= 12; i++) {
conflicting.add(TestUtils.mockEntry(i, 2));
}
assertThat("Append should fail due to suffix conflict with applied entries", appendEntries(conflicting), willBe(false));

verify(fsmCaller).onError(any(RaftException.class));
assertEquals(10, this.logManager.getLastLogIndex());

for (int i = 1; i <= 10; i++) {
LogEntry entry = this.logManager.getEntry(i);
assertEquals(i, entry.getId().getIndex());
assertEquals(1, entry.getId().getTerm());
}
}

private CompletableFuture<Boolean> appendEntries(List<LogEntry> entries) {
CompletableFuture<Boolean> future = new CompletableFuture<>();
this.logManager.appendEntries(new ArrayList<>(entries), new LogManager.StableClosure() {
@Override
public void run(Status status) {
future.complete(status.isOk());
}
});
return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import static org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.CompletableFutures.allOf;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -72,13 +74,13 @@
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
import org.apache.ignite.raft.jraft.core.NodeImpl;
import org.apache.ignite.raft.jraft.core.State;
import org.apache.ignite.raft.jraft.option.LogStorageOptions;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.option.RaftOptions;
import org.apache.ignite.raft.jraft.storage.LogStorage;
import org.apache.ignite.tx.TransactionOptions;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

/**
Expand All @@ -98,7 +100,6 @@ protected int initialNodes() {
return 0;
}

@Disabled("https://issues.apache.org/jira/browse/IGNITE-25502")
@Test
void enterNodeWithIndexGreaterThanCurrentMajority() throws Exception {
cluster.startAndInit(3);
Expand Down Expand Up @@ -148,12 +149,21 @@ void enterNodeWithIndexGreaterThanCurrentMajority() throws Exception {

startNode(2);

// Node 2 has applied entries that the new majority (nodes 0, 1) doesn't have,
// so it must go to ERROR state when the leader tries to overwrite those entries.
await()
.timeout(10, TimeUnit.SECONDS)
.untilAsserted(() ->
assertThat(raftNodeImpl(2, replicationGroup).getState(), equalTo(State.STATE_ERROR))
);

// SQL should still work via the healthy majority.
assertThat(
toPeopleFromSqlRows(executeSql(selectPeopleDml(TABLE_NAME))),
arrayWithSize(Matchers.allOf(greaterThan(0), lessThan(people.length)))
);

for (int nodeIndex = 0; nodeIndex < 3; nodeIndex++) {
for (int nodeIndex = 0; nodeIndex < 2; nodeIndex++) {
assertThat(
"nodeIndex=" + nodeIndex,
scanPeopleFromAllPartitions(nodeIndex, TABLE_NAME),
Expand Down