Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,15 @@ void run(MiniRaftCluster cluster) throws Exception {
final StateMachine stateMachine = restartedFollower.getStateMachine();
final SnapshotInfo info = stateMachine.getLatestSnapshot();
LOG.info("{} restarted snapshot info {} from {}", followerId, info, stateMachine);
assertNotNull(info);
assertTrue(info.getTermIndex().equals(applied), () -> info + " != " + applied);

JavaUtils.attemptUntilTrue(() -> {
System.out.println(cluster.printServers());
final TermIndex leaderLastApplied = leaderStateMachine.getLastAppliedTermIndex();
LOG.info("Leader {} last applied {}", leader.getId(), leaderLastApplied);
final TermIndex followerLastApplied = stateMachine.getLastAppliedTermIndex();
LOG.info("Follower {} last applied {}", followerId, followerLastApplied);
return followerLastApplied.equals(leaderLastApplied);
}, 10, TimeDuration.ONE_SECOND, "followerLastApplied", LOG);
LOG.info("Follower {} last applied {}, snapshot {}", followerId, followerLastApplied, applied);
return followerLastApplied != null && followerLastApplied.getIndex() >= applied.getIndex();
}, 10, TimeDuration.ONE_SECOND, "followerSnapshotApplied", LOG);

sendMessages(cluster, 7);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private void testRestartFollower(CLUSTER cluster) throws Exception {
LOG.info("{}: newLeaderNextIndex = {}", leaderId, newLeaderNextIndex);
Assertions.assertTrue(newLeaderNextIndex > oldLeaderNextIndex);
Assertions.assertEquals(newLeaderNextIndex, follower.getRaftLog().getNextIndex());
}, 10, ONE_SECOND, "followerNextIndex", LOG);
}, 60, ONE_SECOND, "followerNextIndex", LOG);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.ReadIndexException;
Expand All @@ -30,6 +32,7 @@
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedConsumer;
Expand Down Expand Up @@ -179,8 +182,28 @@ public void testFollowerLinearizableReadParallel() throws Exception {
runWithNewCluster(LinearizableReadTests::runTestFollowerReadOnlyParallel);
}

private static long getLogEntryIndex(RaftServer.Division leader, Message message, long startIndex) throws Exception {
final long nextIndex = leader.getRaftLog().getNextIndex();
for (long index = startIndex; index < nextIndex; index++) {
final LogEntryProto entry = leader.getRaftLog().get(index);
if (entry != null && entry.hasStateMachineLogEntry()
&& message.getContent().equals(entry.getStateMachineLogEntry().getLogData())) {
return index;
}
}
throw new AssertionError("Failed to find " + message + " from index " + startIndex + " to " + nextIndex);
}

private static void waitForCommitIndex(RaftServer.Division leader, long index) throws Exception {
JavaUtils.attempt(() -> {
final long commitIndex = leader.getRaftLog().getLastCommittedIndex();
Assertions.assertTrue(commitIndex >= index, () -> "commitIndex=" + commitIndex + " < index=" + index);
}, 10, HUNDRED_MILLIS, "commitIndex >= " + index, null);
}

static <C extends MiniRaftCluster> void runTestFollowerReadOnlyParallel(C cluster) throws Exception {
final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = leader.getId();

final List<RaftServer.Division> followers = cluster.getFollowers();
Assertions.assertEquals(2, followers.size());
Expand All @@ -199,9 +222,12 @@ static <C extends MiniRaftCluster> void runTestFollowerReadOnlyParallel(C cluste
assertReplyExact(count, leaderClient.io().send(INCREMENT));

count++;
final long nextIndex = leader.getRaftLog().getNextIndex();
writeReplies.add(new Reply(count, leaderClient.async().send(WAIT_AND_INCREMENT)));
// sleep to let the commitIndex/appliedIndex get updated.
Thread.sleep(100);
final long waitAndIncrementIndex = JavaUtils.attemptRepeatedly(
() -> getLogEntryIndex(leader, WAIT_AND_INCREMENT, nextIndex),
10, HUNDRED_MILLIS, "WAIT_AND_INCREMENT entry", null);
waitForCommitIndex(leader, waitAndIncrementIndex);
// WAIT_AND_INCREMENT will delay 500ms to update the count, the read must wait for it.
assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0));
f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY, f1)));
Expand Down
62 changes: 48 additions & 14 deletions ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.slf4j.event.Level;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -114,6 +115,41 @@ static CompletableFuture<Void> killAndRestartServer(
return future;
}

private static List<LogEntryProto> findLogEntriesContaining(
RaftLog log, long expectedTerm, SimpleMessage[] expectedMessages) {
final List<LogEntryProto> entries = RaftTestUtil.getStateMachineLogEntries(log, s -> {});
final List<LogEntryProto> matched = new ArrayList<>(expectedMessages.length);
int e = 0;
for (SimpleMessage expected : expectedMessages) {
boolean found = false;
for (; e < entries.size(); e++) {
final LogEntryProto entry = entries.get(e);
if (entry.getTerm() >= expectedTerm
&& expected.getContent().equals(entry.getStateMachineLogEntry().getLogData())) {
matched.add(entry);
e++;
found = true;
break;
}
}
Assertions.assertTrue(found, () -> "Failed to find " + expected + " in entries " + entries);
}
return matched;
}

private static void assertLogEntriesContaining(
RaftServer.Division server, long expectedTerm, SimpleMessage[] expectedMessages, int numAttempts, Logger log)
throws Exception {
final String name = server.getId() + " assertLogEntriesContaining";
JavaUtils.attempt(() -> {
RaftTestUtil.assertLogEntries(
findLogEntriesContaining(server.getRaftLog(), expectedTerm, expectedMessages),
expectedTerm, expectedMessages);
return null;
},
numAttempts, TimeDuration.ONE_SECOND, () -> name, log);
}

static void runTestBasicAppendEntries(
boolean async, boolean killLeader, int numMessages, MiniRaftCluster cluster, Logger log)
throws Exception {
Expand All @@ -140,38 +176,36 @@ static void runTestBasicAppendEntries(
final SimpleMessage[] messages = SimpleMessage.create(numMessages);

try (final RaftClient client = cluster.createClient()) {
final AtomicInteger asyncReplyCount = new AtomicInteger();
final CompletableFuture<Void> f = new CompletableFuture<>();
final List<CompletableFuture<RaftClientReply>> asyncReplies = new ArrayList<>();

for (SimpleMessage message : messages) {
if (async) {
client.async().send(message).thenAcceptAsync(reply -> {
if (!reply.isSuccess()) {
f.completeExceptionally(
new AssertionError("Failed with reply " + reply));
} else if (asyncReplyCount.incrementAndGet() == messages.length) {
f.complete(null);
}
});
asyncReplies.add(client.async().send(message));
} else {
final RaftClientReply reply = client.io().send(message);
Assertions.assertTrue(reply.isSuccess());
}
}
if (async) {
f.join();
Assertions.assertEquals(messages.length, asyncReplyCount.get());
asyncReplies.forEach(f -> {
final RaftClientReply reply = f.join();
Assertions.assertTrue(reply.isSuccess(), () -> "Failed with reply " + reply);
});
}
}
Thread.sleep(cluster.getTimeoutMax().toIntExact(TimeUnit.MILLISECONDS) + 100);
log.info(cluster.printAllLogs());
killAndRestartFollower.join();
killAndRestartLeader.join();
log.info(cluster.printAllLogs());


final List<RaftServer.Division> divisions = cluster.getServerAliveStream().collect(Collectors.toList());
for(RaftServer.Division impl: divisions) {
RaftTestUtil.assertLogEntries(impl, term, messages, 50, log);
if (killLeader) {
assertLogEntriesContaining(impl, term, messages, 50, log);
} else {
RaftTestUtil.assertLogEntries(impl, term, messages, 50, log);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,29 +148,32 @@ public void testAddServerForWaitReady() throws IOException, InterruptedException
final MiniRaftCluster cluster = newCluster(3);
cluster.start();
RaftTestUtil.waitForLeader(cluster);
try (RaftClient client = cluster.createClient()) {
for (int i = 0; i < 10; ++i) {
RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("message_" + i));
assertTrue(reply.isSuccess());
}
// add 3 new servers and wait longer time
try {
CodeInjectionForTesting.put(RaftServerImpl.START_COMPLETE, new SleepCode(2000));
final PeerChanges peerChanges = cluster.addNewPeers(2, true);
LOG.info("add new 3 servers");
LOG.info(cluster.printServers());
RaftClientReply reply = client.admin().setConfiguration(SetConfigurationRequest.Arguments.newBuilder()
.setServersInNewConf(peerChanges.getAddedPeers())
.setMode(SetConfigurationRequest.Mode.ADD).build());
assertTrue(reply.isSuccess());
for (RaftServer server : cluster.getServers()) {
RaftServerProxy proxy = (RaftServerProxy) server;
proxy.getImpls().forEach(s -> {
assertTrue(s.isRunning());
});
try (RaftClient client = cluster.createClient()) {
for (int i = 0; i < 10; ++i) {
RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("message_" + i));
assertTrue(reply.isSuccess());
}
// add 3 new servers and wait longer time
final PeerChanges peerChanges = cluster.addNewPeers(2, true);
LOG.info("add new 3 servers");
LOG.info(cluster.printServers());
RaftClientReply reply = client.admin().setConfiguration(SetConfigurationRequest.Arguments.newBuilder()
.setServersInNewConf(peerChanges.getAddedPeers())
.setMode(SetConfigurationRequest.Mode.ADD).build());
assertTrue(reply.isSuccess());
JavaUtils.attempt(() -> {
for (RaftServer server : cluster.getServers()) {
RaftServerProxy proxy = (RaftServerProxy) server;
proxy.getImpls().forEach(s -> assertTrue(s.isRunning()));
}
}, 10, HUNDRED_MILLIS, "all server impls running", LOG);
}
} finally {
cluster.shutdown();
CodeInjectionForTesting.remove(RaftServerImpl.START_COMPLETE);
}
Comment on lines +173 to 176
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding finally is good but the double try-block is not needed. Since this is only a test, I am fine with the current change.

+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -162,15 +162,16 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
               .setServersInNewConf(peerChanges.getAddedPeers())
               .setMode(SetConfigurationRequest.Mode.ADD).build());
       assertTrue(reply.isSuccess());
-      for (RaftServer server : cluster.getServers()) {
-        RaftServerProxy proxy = (RaftServerProxy) server;
-        proxy.getImpls().forEach(s -> {
-          assertTrue(s.isRunning());
-        });
-      }
+      JavaUtils.attempt(() -> {
+        for (RaftServer server : cluster.getServers()) {
+          RaftServerProxy proxy = (RaftServerProxy) server;
+          proxy.getImpls().forEach(s -> assertTrue(s.isRunning()));
+        }
+      }, 10, HUNDRED_MILLIS, "all server impls running", LOG);
+    } finally {
+      cluster.shutdown();
+      CodeInjectionForTesting.remove(RaftServerImpl.START_COMPLETE);
     }
-    cluster.shutdown();;
-    CodeInjectionForTesting.remove(RaftServerImpl.START_COMPLETE);
   }

cluster.shutdown();;
CodeInjectionForTesting.remove(RaftServerImpl.START_COMPLETE);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,14 +268,16 @@ public long takeSnapshot() {
// TODO: snapshot should be written to a tmp file, then renamed
final File snapshotFile = storage.getSnapshotFile(termIndex.getTerm(), endIndex);
LOG.debug("Taking a snapshot with {}, file:{}", termIndex, snapshotFile);
final LogEntryProto[] entries;
synchronized (indexMap) {
entries = indexMap.values().stream()
.filter(entry -> entry.getIndex() <= endIndex)
.toArray(LogEntryProto[]::new);
}
try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(snapshotFile, false,
segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) {
for (final LogEntryProto entry : indexMap.values()) {
if (entry.getIndex() > endIndex) {
break;
} else {
out.write(entry);
}
for (final LogEntryProto entry : entries) {
out.write(entry);
}
out.flush();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void runTestLogTruncate(MiniRaftCluster cluster, RaftServer.Division oldLeader,
for (RaftPeer peer : cluster.getGroup().getPeers()) {
final RaftServer.Division division = cluster.getDivision(peer.getId());
assertLogEntries(division, oldLeaderTerm, firstBatch);
assertEmptyTransactionContextMap(division);
waitForEmptyTransactionContextMap(division);
}

// kill a majority of followers
Expand Down Expand Up @@ -221,15 +221,12 @@ void runTestLogTruncate(MiniRaftCluster cluster, RaftServer.Division oldLeader,
for (RaftPeer peer : cluster.getGroup().getPeers()) {
final RaftServer.Division division = cluster.getDivision(peer.getId());
assertLogEntries(division, oldLeaderTerm, expectedMessages);
final String name = "assertEmptyTransactionContextMap:" + division.getId();
JavaUtils.attempt(() -> assertEmptyTransactionContextMap(division),
10, HUNDRED_MILLIS, name, LOG);

}
waitForEmptyTransactionContextMap(division);
}

if (!exceptions.isEmpty()) {
LOG.info("{} exceptions", exceptions.size());
for(int i = 0 ; i < exceptions.size(); i++) {
for (int i = 0; i < exceptions.size(); i++) {
LOG.info("exception {})", i, exceptions.get(i));
}
Assertions.fail();
Expand All @@ -241,6 +238,11 @@ static void assertEmptyTransactionContextMap(RaftServer.Division d) {
Assertions.assertTrue(map.isEmpty(), () -> d.getId() + " TransactionContextMap is non-empty: " + map);
}

void waitForEmptyTransactionContextMap(RaftServer.Division d) throws InterruptedException {
final String name = "assertEmptyTransactionContextMap:" + d.getId();
JavaUtils.attempt(() -> assertEmptyTransactionContextMap(d), 10, HUNDRED_MILLIS, name, LOG);
}

static void assertEntriesInTransactionContextMap(RaftServer.Division division,
SimpleMessage[] existing, SimpleMessage[] nonExisting) {
final RaftLog log = division.getRaftLog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.ReplyFlusher;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.JavaUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -89,11 +90,9 @@ static <C extends MiniRaftCluster> void runTestFollowerReadOnlyParallelRepliedIn
f0Replies.add(new Reply(0, f0Client.async().sendReadOnly(QUERY, f0)));
f1Replies.add(new Reply(0, f1Client.async().sendReadOnly(QUERY, f1)));

// sleep in order to make sure
// (1) the count is incremented, and
// (2) the reads will wait for the repliedIndex.
Thread.sleep(100);
assertEquals(count, leaderStateMachine.getCount());
// Wait until the leader state machine has applied the write while the ReplyFlusher remains blocked.
JavaUtils.attempt(() -> assertEquals(count, leaderStateMachine.getCount()),
10, HUNDRED_MILLIS, "leaderStateMachine count " + count, null);
}

for (int i = 0; i < n; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,16 @@ public void testElectionStepDownCommand() throws Exception {
void runTestElectionStepDownCommand(MiniRaftCluster cluster) throws Exception {
final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
String sb = getClusterAddress(cluster);
RaftServer.Division newLeader = cluster.getFollowers().get(0);
final StringPrintStream out = new StringPrintStream();
RatisShell shell = new RatisShell(out.getPrintStream());
Assertions.assertNotEquals(cluster.getLeader().getId(), newLeader.getId());
Assertions.assertEquals(2, cluster.getFollowers().size());
int ret = shell.run("election", "stepDown", "-peers", sb.toString());
int ret = shell.run("election", "pause", "-peers", sb, "-address",
leader.getPeer().getAddress());
Assertions.assertEquals(0, ret);

ret = shell.run("election", "stepDown", "-peers", sb);
Assertions.assertEquals(0, ret);
Assertions.assertEquals(3, cluster.getFollowers().size());
JavaUtils.attempt(() -> Assertions.assertNotEquals(leader.getId(), RaftTestUtil.waitForLeader(cluster).getId()),
10, TimeDuration.valueOf(1, TimeUnit.SECONDS), "testElectionStepDownCommand", LOG);
}
}
Loading