Skip to content

RATIS-2403. Support leader batch write to improve linearizable follower read throughput#1362

Open
ivandika3 wants to merge 10 commits intoapache:masterfrom
ivandika3:RATIS-2403
Open

RATIS-2403. Support leader batch write to improve linearizable follower read throughput#1362
ivandika3 wants to merge 10 commits intoapache:masterfrom
ivandika3:RATIS-2403

Conversation

@ivandika3
Copy link
Contributor

@ivandika3 ivandika3 commented Feb 27, 2026

What changes were proposed in this pull request?

This patch is to implement leader batch write as suggested by @szetszwo to by making leader delay some requests to ensure that ReadIndex increase in coarser, more discrete steps (the original ReadIndex returns the latest commitIndex which can increase rapidly for a busy server).

The main insight is that linearizability read requires the latest read to see all the replied writes, not writes that have been applied to the state machine (although in Raft replied writes implies that it has been applied in state machine). We can exploit this by withholding the write replies in a batch which is flushed (i.e. replies are sent) at a configurable interval. The idea is similar to "group commit" in other systems (although group commit is about batch write operations to a single IO operation).

These are the introduced implementations

  • Introduce a new ReadIndex type (RepliedIndex) which denotes the latest index of the replied write requests. Also introduce raft.server.read.read-index.type (as suggested by @szetszwo) since now we support 3 ReadIndex types.
  • Leader batch implementation: A list to hold the finished replies and a flusher daemon to flush the replies at a configurable interval
    • I tried to use AtomicReference<List<HeldReply>> see 4a8a402, but the implementation on holdReply is not atomic since AtomicReference is not designed for atomic list appends.
    • I considered using ConcurrentLinkedQueue but I am worried that if the push throughput is higher than poll throughput the flushReplies might be stuck polling. One solution is to get the queue size and only poll until that size, but ConcurrentLinkedQueue size method is not constant-time (refer to the Javadoc).
    • Regarding the LinkedList, checked with LLM and this is the response (it makes sense to me)
      image
    • Current the implementation uses a list with swap (instead of the initial implementation https://issues.apache.org/jira/secure/attachment/13080907/13080907_leader-batch-write.patch that calls clear and copying the list which can be costly)

What is the link to the Apache JIRA

https://issues.apache.org/jira/browse/RATIS-2403

How was this patch tested?

UT.

Comment on lines +175 to +185
if (readIndexType == Type.REPLIED_INDEX) {
// With REPLIED_INDEX the read index only advances after the leader has applied the
// transaction and the reply batch is flushed. WAIT_AND_INCREMENT takes 500 ms in
// the state machine but we only waited 100 ms, so its reply has not been generated
// yet and the follower read may only see the preceding sync INCREMENT (count - 1).
assertReplyAtLeast(count - 1, f0Client.io().sendReadOnly(QUERY, f0));
f1Replies.add(new Reply(count - 1, f1Client.async().sendReadOnly(QUERY, f1)));
} else {
assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0));
f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY, f1)));
}
Copy link
Contributor Author

@ivandika3 ivandika3 Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to fix this failing test, but the issue is that runTestFollowerReadOnlyParallel does not wait for the WAIT_AND_INCREMENT (which delays 500 ms although the test only sleep for 100ms) to return so technically we cannot assert that the count is incremented by WAIT_AND_INCREMENT since the StateMachine#applyTransactions future has not finished yet. I might be wrong.

When I tried to make repliedIndex to be current ServerState appliedIndex instead of the highest index of all the replies, the test pass. However, I think appliedIndex should be higher than repliedIndex so the follower might still need to wait longer than necessary. I considered to use the StateMachine.lastAppliedTermIndex, but we cannot guarantee the correctness or semantics of StateMachine.lastAppliedTermIndex (i.e. is it the highest index where applyTransaction future actually finish or simply the last applyTransaction called). I also found out that ServerState.appliedIndex does not mean that the async StateMachine#applyTransaction futures actually finish (although the sync StateMachine#applyTransactionSerial is guaranteed to be finished).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry that the repliedIndex idea may not work:

We have the invariant

  • repliedIndex <= appliedIndex <= committedIndex

The repliedIndex idea is: When repliedIndex < appliedIndex, it is safe for a follower to return any states >= repliedIndex since appliedIndex is not yet visible to any clients -- The data is not yet stale in this sense.

However, after a follower/leader has returned a read call, its appliedIndex A becomes visible to that client. The subsequence reads must return a state >= A. Below is an example of the problem:

  1. Leader: repliedIndex = 10 < appliedIndex = 20
  2. Follower 1: appliedIndex = 18
  3. Follower 2: appliedIndex = 14
  4. Client first reads from Follower 1.
  5. The same client reads from Follower 2 (or the Leader) <---- stale read.

Let's see if there are some ways to make it work.

Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivandika3 , thanks a lot for working on this!

Please https://issues.apache.org/jira/secure/attachment/13081003/1362_review.patch for my review. I did not put all the comments here.

private final boolean readIndexAppliedIndexEnabled;
private final RaftServerConfigKeys.Read.ReadIndex.Type readIndexType;
private final Supplier<Long> readIndexSupplier;
private final MemoizedSupplier<String> readIndexLogPrefixSupplier;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this field and use readIndexType.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

}
LOG.debug("readIndex={} ({}Index={}, readAfterWriteConsistentIndex={})",
readIndex, readIndexAppliedIndexEnabled ? "applied" : "commit",
readIndex, readIndexLogPrefixSupplier.get(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use readIndexType:

    LOG.debug("readIndex={} ({}={}, readAfterWriteConsistentIndex={})",
        readIndex, readIndexType, index, readAfterWriteConsistentIndex);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated.

Comment on lines +379 to +388
/** The interval at which held write replies are flushed. */
private final TimeDuration repliedIndexBatchInterval;
/** The highest log index for which a write reply has been flushed (sent to the client). */
private final AtomicLong repliedIndex;
/** Guards {@link #heldReplies}. */
private final Object heldRepliesLock = new Object();
/** Buffer holding write replies waiting to be flushed. Guarded by {@link #heldRepliesLock}. */
private List<HeldReply> heldReplies = new ArrayList<>();
/** Daemon thread that periodically flushes held replies. */
private volatile Daemon replyFlusher;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there are quite many fields/methods/classes added, let's move all of them to a new class, say ReplyFlusher.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the idea and the patch. Updated based on the patch.

@szetszwo
Copy link
Contributor

szetszwo commented Mar 2, 2026

Regarding to this problem, one way to fix the bug is to hold also the read replies -- when a server's (leader or follower) appliedIndex > leader's repliedIndex, hold the read replies until leader's repliedIndex >= the local appliedIndex. The previous example becomes:

  1. Leader: repliedIndex = 10 < appliedIndex = 20
  2. F1: repliedIndex = 10, appliedIndex = 18
  3. F2: repliedIndex = 10, appliedIndex = 14
  4. Client first reads from F1 but F1 holds the reply.
  5. The same client reads from F2 but F2 holds the reply.
  6. Leader, F1 and F2 update their repliedIndex to 20
  7. Client receive a reply from F1 (appliedIndex = 18)
  8. Client receive a reply from F2 (appliedIndex = 14)

Note that Step 8 is not a stale read since the two reads can be considered as being sent in parallel and the read from F2 is faster than the read from F1.

@ivandika3 , what do you think?

@ivandika3
Copy link
Contributor Author

ivandika3 commented Mar 2, 2026

@szetszwo Thanks for the insight about the issue and the possible idea to fix this.

IMO storing the read replies implementation is going to be quite involved (which might hide other subtle correctness issue) and there might be some overhead in managing the pending read replies.

The way I see it is that the current repliedIndex (leader batch implementation) has two main benefits:

  1. RepliedIndex <= AppliedIndex so that the follower waitToAdvance latency is lower
  2. Coarser and discrete appliedIndex update which means that within that batch interval the ReadIndex should remain the same.

How about we simply update the repliedIndex to the current leader's appliedIndex in ReplyFlusher#flush? This means that we will lose the benefit 1, but still retain benefit 2. Since the leader appliedIndex >= follower appliedIndex, the stale read issue should not happen. Let me know what you think. I previously tried to do this and the test testFollowerLinearizableReadParallel test pass without any change. Let me try to change it such that the test remains unchanged.

@szetszwo
Copy link
Contributor

szetszwo commented Mar 2, 2026

How about we simply update the repliedIndex to the current leader's appliedIndex in ReplyFlusher#flush? ...

@ivandika3 , Even repliedIndex is updated with appliedIndex, the same problem can happen. Just found that the problem can happen using appliedIndex as the readIndex:

  1. Leader: committedIndex=20 and appliedIndex=10 since the entries are not yet applied due to slowness.
  2. Client first reads from F1: F1 gets readIndex = 10 but its appliedIndex is 18. So, it replies at appliedIndex = 18
  3. The same client reads from F2: It gets readIndex = 10 and replies at appliedIndex = 14

The original readIndex algorithm has the invariant:

  • Original readIndex algorithm: follower's appliedIndex <= follower's committedIndex <= leader's committedIndex

When a follower receives a read request, it cannot get a readIndex (= leader's committedIndex) > its appliedIndex at that time.

There are two options to fix it:

  1. Change Leader to send appliedIndex (or repliedIndex depending on the readIndex conf) as leaderCommit in appendEntries. Then, the following invariants will hold:
  • Using appliedIndex as readIndex: follower's appliedIndex <= follower's committedIndex <= leader's appliedIndex
  • Using repliedIndex as readIndex: follower's appliedIndex <= follower's committedIndex <= leader's repliedIndex
  1. Add a seenIndex to the read requests. The server (leader or follower) will wait until max(seenIndex, readIndex) is applied.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants