Skip to content
37 changes: 32 additions & 5 deletions ratis-docs/src/site/markdown/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,38 @@ if it fails to receive any RPC responses from this peer within this specified ti

### Read Index - Configurations related to ReadIndex used in linearizable read

| **Property** | `raft.server.read.read-index.applied-index.enabled` |
|:----------------|:----------------------------------------------------------------------|
| **Description** | whether applied index (instead of commit index) is used for ReadIndex |
| **Type** | boolean |
| **Default** | false |
| **Property** | `raft.server.read.read-index.type` |
|:----------------|:-----------------------------------------------------------------------------|
| **Description** | type of read index returned |
| **Type** | enum `Read.ReadIndex.Type` [`COMMIT_INDEX`, `APPLIED_INDEX`, `REPLIED_INDEX` |
| **Default** | `Read.ReadIndex.Type.COMMIT_INDEX` |

* `Read.ReadIndex.Type.COMMIT_INDEX` - Use leader's CommitIndex (see Raft Paper section 6.4)
* The safest type as it is specified in the Raft dissertation
* This ReadIndex type can be chosen if the base linearizable read from followers performance already meets expectations.

* `Read.ReadIndex.Type.APPLIED_INDEX` - Use leader's AppliedIndex
* Allow leader to return AppliedIndex (instead of CommitIndex) as the ReadIndex
* This reduces the time follower applying logs up to ReadIndex since AppliedIndex ≤ CommitIndex
* This ReadIndex type can be chosen `Read.ReadIndex.Type.COMMIT_INDEX` read latency is too high.

* `Read.ReadIndex.Type.REPLIED_INDEX` - Use leader's RepliedIndex
* RepliedIndex is defined as the last AppliedIndex of the leader when returning the last batch.
* Leader delays replying write requests and only reply them every write batch boundary configurable by `raft.server.read.read-index.replied-index.batch-interval`.
* This allows the ReadIndex to advance in a coarser, less frequent steps, so followers are more likely to have already applied past the ReadIndex when a read arrives.
* This is most effective on read-heavy, follower-read workloads which prioritizes overall read throughput without consistency sacrifice.
* There is a trade-off in increased write latency (up to one `raft.server.read.read-index.replied-index.batch-interval`) per write.
* RepliedIndex still guarantees linearizability (no stale read) since by definition each ReadIndex returns the index of the last replied request.
* If the RepliedIndex is set to 0, the behavior is identical to `Read.ReadIndex.Type.APPLIED_INDEX`

Note that theoretically all the ReadIndex types still guarantee linearizability,
but there are tradeoffs (e.g. Write and Read performance) between different types.

| **Property** | `raft.server.read.read-index.replied-index.batch-interval` |
|:----------------|:--------------------------------------------------------------------------------------------------------------------------------------------|
| **Description** | if `Read.ReadIndex.Type` is `REPLIED_INDEX`, the interval at which held write replies are flushed to clients and `repliedIndex` is advanced |
| **Type** | TimeDuration |
| **Default** | 10ms |

| **Property** | `raft.server.read.leader.heartbeat-check.enabled` |
|:----------------|:--------------------------------------------------|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,15 +280,34 @@ static void setWriteIndexCacheExpiryTime(RaftProperties properties, TimeDuration
interface ReadIndex {
String PREFIX = Read.PREFIX + ".read-index";

String APPLIED_INDEX_ENABLED_KEY = PREFIX + ".applied-index.enabled";
boolean APPLIED_INDEX_ENABLED_DEFAULT = false;
static boolean appliedIndexEnabled(RaftProperties properties) {
return getBoolean(properties::getBoolean, APPLIED_INDEX_ENABLED_KEY,
APPLIED_INDEX_ENABLED_DEFAULT, getDefaultLog());
enum Type {
/** ReadIndex returns leader's commitIndex (see Raft Paper section 6.4). */
COMMIT_INDEX,

/** ReadIndex returns leader's appliedIndex to reduce the ReadIndex latency. */
APPLIED_INDEX,

/** ReadIndex returns leader's repliedIndex, the index of the last replied request. */
REPLIED_INDEX
}

String TYPE_KEY = PREFIX + ".type";
Type TYPE_DEFAULT = Type.COMMIT_INDEX;
static Type type(RaftProperties properties) {
return get(properties::getEnum, TYPE_KEY, TYPE_DEFAULT, getDefaultLog());
}
static void setType(RaftProperties properties, Type type) {
set(properties::setEnum, TYPE_KEY, type);
}

static void setAppliedIndexEnabled(RaftProperties properties, boolean enabled) {
setBoolean(properties::setBoolean, APPLIED_INDEX_ENABLED_KEY, enabled);
String REPLIED_INDEX_BATCH_INTERVAL_KEY = PREFIX + ".replied-index.batch-interval";
TimeDuration REPLIED_INDEX_BATCH_INTERVAL_DEFAULT = TimeDuration.valueOf(10, TimeUnit.MILLISECONDS);
static TimeDuration repliedIndexBatchInterval(RaftProperties properties) {
return getTimeDuration(properties.getTimeDuration(REPLIED_INDEX_BATCH_INTERVAL_DEFAULT.getUnit()),
REPLIED_INDEX_BATCH_INTERVAL_KEY, REPLIED_INDEX_BATCH_INTERVAL_DEFAULT, getDefaultLog());
}
static void setRepliedIndexBatchInterval(RaftProperties properties, TimeDuration interval) {
setTimeDuration(properties::setTimeDuration, REPLIED_INDEX_BATCH_INTERVAL_KEY, interval);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.ratis.protocol.exceptions.ReadIndexException;
import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type;
import org.apache.ratis.server.impl.ReadIndexHeartbeats.AppendEntriesListener;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.leader.LeaderState;
Expand Down Expand Up @@ -82,6 +83,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -353,10 +355,13 @@ boolean isApplied() {
private final PendingStepDown pendingStepDown;

private final ReadIndexHeartbeats readIndexHeartbeats;
private final boolean readIndexAppliedIndexEnabled;
private final RaftServerConfigKeys.Read.ReadIndex.Type readIndexType;
private final Supplier<Long> readIndexSupplier;
private final boolean leaderHeartbeatCheckEnabled;
private final LeaderLease lease;

private ReplyFlusher replyFlusher;

LeaderStateImpl(RaftServerImpl server) {
this.name = ServerStringUtils.generateUnifiedName(server.getMemberId(), getClass());
this.server = server;
Expand Down Expand Up @@ -391,8 +396,22 @@ boolean isApplied() {
} else {
this.followerMaxGapThreshold = (long) (followerGapRatioMax * maxPendingRequests);
}
this.readIndexAppliedIndexEnabled = RaftServerConfigKeys.Read.ReadIndex
.appliedIndexEnabled(properties);

this.readIndexType = RaftServerConfigKeys.Read.ReadIndex.type(properties);
switch (readIndexType) {
case REPLIED_INDEX:
this.replyFlusher = new ReplyFlusher(name, state.getLastAppliedIndex(),
() -> server.getState().getLastAppliedIndex(),
RaftServerConfigKeys.Read.ReadIndex.repliedIndexBatchInterval(properties));
readIndexSupplier = replyFlusher::getRepliedIndex;
break;
case APPLIED_INDEX:
readIndexSupplier = () -> server.getState().getLastAppliedIndex();
break;
case COMMIT_INDEX:
default:
readIndexSupplier = () -> server.getRaftLog().getLastCommittedIndex();
}
this.leaderHeartbeatCheckEnabled = RaftServerConfigKeys.Read
.leaderHeartbeatCheckEnabled(properties);

Expand All @@ -419,6 +438,10 @@ void start() {
startupLogEntry.get();
processor.start();
senders.forEach(LogAppender::start);

if (replyFlusher != null) {
replyFlusher.start();
}
}

boolean isReady() {
Expand Down Expand Up @@ -453,6 +476,9 @@ CompletableFuture<Void> stop() {
startupLogEntry.get().getAppliedIndexFuture().completeExceptionally(
new ReadIndexException("failed to obtain read index since: ", nle));
server.getServerRpc().notifyNotLeader(server.getMemberId().getGroupId());
if (replyFlusher != null) {
replyFlusher.stop();
}
logAppenderMetrics.unregister();
raftServerMetrics.unregister();
pendingRequests.close();
Expand Down Expand Up @@ -1140,23 +1166,21 @@ public boolean checkLeadership() {
/**
* Obtain the current readIndex for read only requests. See Raft paper section 6.4.
* 1. Leader makes sure at least one log from current term is committed.
* 2. Leader record last committed index or applied index (depending on configuration) as readIndex.
* 2. Leader record last committed index or applied index or replied index (depending on configuration) as readIndex.
* 3. Leader broadcast heartbeats to followers and waits for acknowledgements.
* 4. If majority respond success, returns readIndex.
* @return current readIndex.
*/
CompletableFuture<Long> getReadIndex(Long readAfterWriteConsistentIndex) {
final long index = readIndexAppliedIndexEnabled ?
server.getState().getLastAppliedIndex() : server.getRaftLog().getLastCommittedIndex();
final long index = readIndexSupplier.get();
final long readIndex;
if (readAfterWriteConsistentIndex != null && readAfterWriteConsistentIndex > index) {
readIndex = readAfterWriteConsistentIndex;
} else {
readIndex = index;
}
LOG.debug("readIndex={} ({}Index={}, readAfterWriteConsistentIndex={})",
readIndex, readIndexAppliedIndexEnabled ? "applied" : "commit",
index, readAfterWriteConsistentIndex);
LOG.debug("readIndex={} ({}={}, readAfterWriteConsistentIndex={})",
readIndex, readIndexType, index, readAfterWriteConsistentIndex);

// if group contains only one member, fast path
if (server.getRaftConf().isSingleton()) {
Expand Down Expand Up @@ -1218,7 +1242,15 @@ private boolean checkLeaderLease() {
}

void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) {
pendingRequests.replyPendingRequest(termIndex, reply);
if (readIndexType == Type.REPLIED_INDEX) {
// Remove from pending map but hold the reply for batch flushing.
final PendingRequest pending = pendingRequests.removePendingRequest(termIndex);
if (pending != null) {
replyFlusher.hold(pending, reply);
}
} else {
pendingRequests.replyPendingRequest(termIndex, reply);
}
}

TransactionContext getTransactionContext(TermIndex termIndex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,18 @@ void replyPendingRequest(TermIndex termIndex, RaftClientReply reply) {
}
}

/**
* Remove the {@link PendingRequest} for the given {@link TermIndex} without sending a reply.
* @return the removed {@link PendingRequest}, or null if not found.
*/
PendingRequest removePendingRequest(TermIndex termIndex) {
final PendingRequest pending = pendingRequests.remove(termIndex);
if (pending != null) {
Preconditions.assertEquals(termIndex, pending.getTermIndex(), "termIndex");
}
return pending;
}

/**
* The leader state is stopped. Send NotLeaderException to all the pending
* requests since they have not got applied to the state machine yet.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.impl;

import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.server.raftlog.RaftLogIndex;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;

/**
* Implements the reply flush logic as part of the leader batch write when RepliedIndex is used.
*/
public class ReplyFlusher {
static final Logger LOG = LoggerFactory.getLogger(ReplyFlusher.class);

/** A write reply that has been built but not yet sent to the client. */
static class HeldReply {
private final PendingRequest pending;
private final RaftClientReply reply;

HeldReply(PendingRequest pending, RaftClientReply reply) {
this.pending = pending;
this.reply = reply;
}

void release() {
pending.setReply(reply);
}
}

static class Replies {
private LinkedList<HeldReply> list = new LinkedList<>();

synchronized void add(PendingRequest pending, RaftClientReply reply) {
list.add(new HeldReply(pending, reply));
}

synchronized LinkedList<HeldReply> getAndSetNewList() {
final LinkedList<HeldReply> old = list;
list = new LinkedList<>();
return old;
}
}

private final String name;
private final LifeCycle lifeCycle;
private final Daemon daemon;
private Replies replies = new Replies();
private final RaftLogIndex repliedIndex;
/** Supplies the last applied index from the state machine. */
private final LongSupplier appliedIndexSupplier;
/** The interval at which held write replies are flushed. */
private final TimeDuration batchInterval;

ReplyFlusher(String name, long repliedIndex, LongSupplier appliedIndexSupplier, TimeDuration batchInterval) {
this.name = name + "-ReplyFlusher";
this.lifeCycle = new LifeCycle(this.name);
this.daemon = Daemon.newBuilder()
.setName(this.name)
.setRunnable(this::run)
.build();
this.repliedIndex = new RaftLogIndex("repliedIndex", repliedIndex);
this.appliedIndexSupplier = appliedIndexSupplier;
this.batchInterval = batchInterval;
}

long getRepliedIndex() {
return repliedIndex.get();
}

/** Hold a write reply for later batch flushing. */
void hold(PendingRequest pending, RaftClientReply reply) {
replies.add(pending, reply);
}

void start() {
lifeCycle.transition(LifeCycle.State.STARTING);
// We need to transition to RUNNING first so that ReplyFlusher#run always
// see that the lifecycle state is in RUNNING state.
lifeCycle.transition(LifeCycle.State.RUNNING);
daemon.start();
}

/** The reply flusher daemon loop. */
private void run() {
try {
while (lifeCycle.getCurrentState() == LifeCycle.State.RUNNING) {
try {
Thread.sleep(batchInterval.toLong(TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
flush();
}
} finally {
// Flush remaining on exit
flush();
}
}

/** Flush all held replies and advance {@link #repliedIndex} to the applied index. */
private void flush() {
final LinkedList<HeldReply> toFlush = replies.getAndSetNewList();
for (HeldReply held : toFlush) {
held.release();
}
final long appliedIndex = appliedIndexSupplier.getAsLong();
repliedIndex.updateToMax(appliedIndex, s ->
LOG.debug("{}: flushed {} replies, {}", name, toFlush.size(), s));
}

/** Stop the reply flusher daemon. */
void stop() {
lifeCycle.checkStateAndClose();
daemon.interrupt();
try {
daemon.join(batchInterval.toLong(TimeUnit.MILLISECONDS )* 2);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
Expand Down Expand Up @@ -60,7 +61,7 @@ public abstract class LinearizableReadTests<CLUSTER extends MiniRaftCluster>

public abstract boolean isLeaderLeaseEnabled();

public abstract boolean readIndexAppliedIndexEnabled();
public abstract Type readIndexType();

public abstract void assertRaftProperties(RaftProperties properties);

Expand All @@ -77,7 +78,7 @@ public void setup() {
CounterStateMachine.setProperties(p);
RaftServerConfigKeys.Read.setOption(p, LINEARIZABLE);
RaftServerConfigKeys.Read.setLeaderLeaseEnabled(p, isLeaderLeaseEnabled());
RaftServerConfigKeys.Read.ReadIndex.setAppliedIndexEnabled(p, readIndexAppliedIndexEnabled());
RaftServerConfigKeys.Read.ReadIndex.setType(p, readIndexType());
}

@Test
Expand Down
Loading
Loading