Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,17 @@ public RaftProto.InstallSnapshotResponse installSnapshot(RaftProto.InstallSnapsh
raftNode.getSnapshot().getLock().unlock();
}

// discard old log entries
// discard old log entries and update node state
raftNode.getLock().lock();
try {
raftNode.getRaftLog().truncatePrefix(lastSnapshotIndex + 1);
raftNode.setCommitIndex(lastSnapshotIndex);
raftNode.setLastAppliedIndex(lastSnapshotIndex);
RaftProto.Configuration snapshotConfiguration
= raftNode.getSnapshot().getMetaData().getConfiguration();
if (snapshotConfiguration.getServersCount() > 0) {
raftNode.setConfiguration(snapshotConfiguration);
}
} finally {
raftNode.getLock().unlock();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package com.github.wenweihu86.raft.service.impl;

import com.github.wenweihu86.raft.RaftNode;
import com.github.wenweihu86.raft.RaftOptions;
import com.github.wenweihu86.raft.StateMachine;
import com.github.wenweihu86.raft.proto.RaftProto;
import com.google.protobuf.ByteString;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

/**
* Reproduction test for raft-java bug #75:
*
* After installSnapshot completes (final chunk), the handler applies the
* state machine from the snapshot and truncates the log, but FAILS to
* update commitIndex, lastAppliedIndex, or configuration to match the
* snapshot's lastIncludedIndex and included configuration.
*
* Compare with the constructor (RaftNode lines 88-112) which correctly
* sets commitIndex = max(snapshot.lastIncludedIndex, log.commitIndex),
* lastAppliedIndex = commitIndex, and configuration = snapshot config.
*
* The buggy code is in RaftConsensusServiceImpl.installSnapshot(),
* lines 279-301, where after the final chunk:
* - stateMachine.readSnapshot() is called (OK)
* - snapshot.reload() is called (OK)
* - raftLog.truncatePrefix() is called (OK)
* - commitIndex is NOT updated (BUG)
* - lastAppliedIndex is NOT updated (BUG)
* - configuration is NOT updated (BUG)
*/
public class InstallSnapshotBugTest {

private static final String TEST_DATA_DIR = "/tmp/raft-java-bug75-test";

private RaftNode raftNode;
private RaftConsensusServiceImpl service;
private NoOpStateMachine stateMachine;
private ScheduledExecutorService scheduler;

/** Minimal state machine that does nothing -- sufficient for this test. */
static class NoOpStateMachine implements StateMachine {
boolean readSnapshotCalled = false;
@Override public void writeSnapshot(String snapshotDir) {}
@Override public void readSnapshot(String snapshotDir) {
readSnapshotCalled = true;
}
@Override public void apply(byte[] dataBytes) {}
}

@Before
public void setUp() throws Exception {
// Clean up any previous test data
FileUtils.deleteDirectory(new File(TEST_DATA_DIR));

// Build initial configuration: 3 servers (server IDs 1, 2, 3)
List<RaftProto.Server> initialServers = new ArrayList<>();
for (int i = 1; i <= 3; i++) {
initialServers.add(RaftProto.Server.newBuilder()
.setServerId(i)
.setEndpoint(RaftProto.Endpoint.newBuilder()
.setHost("127.0.0.1")
.setPort(8000 + i)
.build())
.build());
}

RaftOptions options = new RaftOptions();
options.setDataDir(TEST_DATA_DIR);
// Use large election timeout to avoid interference
options.setElectionTimeoutMilliseconds(60000);
options.setSnapshotPeriodSeconds(3600);

stateMachine = new NoOpStateMachine();

// localServer is server 1
raftNode = new RaftNode(options, initialServers, initialServers.get(0), stateMachine);

// Instead of calling init() (which tries to create BRPC peer connections),
// inject a ScheduledExecutorService via reflection so that stepDown() ->
// resetElectionTimer() does not NPE.
scheduler = Executors.newScheduledThreadPool(2);
Field schedulerField = RaftNode.class.getDeclaredField("scheduledExecutorService");
schedulerField.setAccessible(true);
schedulerField.set(raftNode, scheduler);

// Simulate some prior progress: commitIndex=5, lastAppliedIndex=5
raftNode.setCommitIndex(5);
raftNode.setLastAppliedIndex(5);

service = new RaftConsensusServiceImpl(raftNode);
}

@After
public void tearDown() throws Exception {
if (scheduler != null) {
scheduler.shutdownNow();
}
FileUtils.deleteDirectory(new File(TEST_DATA_DIR));
}

@Test
public void testInstallSnapshotDoesNotUpdateCommitIndexOrConfiguration() throws IOException {
// --- Arrange ---

// Build a snapshot configuration with 4 servers (different from initial 3)
RaftProto.Configuration.Builder snapshotConfBuilder = RaftProto.Configuration.newBuilder();
for (int i = 1; i <= 4; i++) {
snapshotConfBuilder.addServers(RaftProto.Server.newBuilder()
.setServerId(i)
.setEndpoint(RaftProto.Endpoint.newBuilder()
.setHost("127.0.0.1")
.setPort(9000 + i)
.build())
.build());
}
RaftProto.Configuration snapshotConf = snapshotConfBuilder.build();

// Build snapshot metadata: lastIncludedIndex=100, lastIncludedTerm=10
RaftProto.SnapshotMetaData snapshotMeta = RaftProto.SnapshotMetaData.newBuilder()
.setLastIncludedIndex(100)
.setLastIncludedTerm(10)
.setConfiguration(snapshotConf)
.build();

// The installSnapshot request: single chunk (isFirst=true, isLast=true)
// term must be >= currentTerm to pass the term check
RaftProto.InstallSnapshotRequest request = RaftProto.InstallSnapshotRequest.newBuilder()
.setServerId(2) // leader is server 2
.setTerm(10) // high enough term
.setSnapshotMetaData(snapshotMeta)
.setFileName("state.dat")
.setOffset(0)
.setData(ByteString.copyFromUtf8("snapshot-data"))
.setIsFirst(true)
.setIsLast(true)
.build();

// Record pre-install state
long preCommitIndex = raftNode.getCommitIndex();
long preLastApplied = raftNode.getLastAppliedIndex();
int preConfigSize = raftNode.getConfiguration().getServersCount();

Assert.assertEquals("Pre-condition: commitIndex should be 5", 5, preCommitIndex);
Assert.assertEquals("Pre-condition: lastAppliedIndex should be 5", 5, preLastApplied);
Assert.assertEquals("Pre-condition: configuration should have 3 servers", 3, preConfigSize);

// --- Act ---
RaftProto.InstallSnapshotResponse response = service.installSnapshot(request);

// --- Assert ---
Assert.assertEquals("installSnapshot should succeed",
RaftProto.ResCode.RES_CODE_SUCCESS, response.getResCode());

// Verify the state machine WAS applied (this part works correctly)
Assert.assertTrue("State machine readSnapshot should have been called",
stateMachine.readSnapshotCalled);

// Verify the snapshot metadata was reloaded (this part works correctly)
long snapshotLastIndex = raftNode.getSnapshot().getMetaData().getLastIncludedIndex();
Assert.assertEquals("Snapshot metadata should reflect lastIncludedIndex=100",
100, snapshotLastIndex);

// === FIX VERIFICATION ===
// After the fix, commitIndex, lastAppliedIndex, and configuration
// should all be updated to match the snapshot, just as the constructor
// does on startup (RaftNode.java lines 88-112).
long postCommitIndex = raftNode.getCommitIndex();
long postLastApplied = raftNode.getLastAppliedIndex();
int postConfigSize = raftNode.getConfiguration().getServersCount();

System.out.println("=== Bug #75 Fix Verification ===");
System.out.println("Snapshot lastIncludedIndex: 100");
System.out.println("commitIndex after install: " + postCommitIndex);
System.out.println("lastAppliedIndex after install: " + postLastApplied);
System.out.println("configuration servers after install: " + postConfigSize);

// After fix: commitIndex should be updated to 100
Assert.assertEquals(
"commitIndex should be updated to snapshot lastIncludedIndex",
100, postCommitIndex);

// After fix: lastAppliedIndex should be updated to 100
Assert.assertEquals(
"lastAppliedIndex should be updated to snapshot lastIncludedIndex",
100, postLastApplied);

// After fix: configuration should have 4 servers from snapshot
Assert.assertEquals(
"configuration should be updated from snapshot metadata",
4, postConfigSize);

System.out.println("\nAll assertions passed: fix verified.");
}
}