Skip to content

Commit

Permalink
feat: add config to control the memory usage for raft pending
Browse files Browse the repository at this point in the history
  • Loading branch information
absolute8511 committed Oct 15, 2023
1 parent d34f270 commit edd9fd8
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,18 @@ public class DLedgerConfig {

private int peerPushQuota = 20 * 1024 * 1024;

private int peerPushPendingMaxBytes = 200 * 1024 * 1024;

private String storeType = FILE; //FILE, MEMORY
private String dataStorePath;

private int maxPendingRequestsNum = 10000;

private int maxWaitAckTimeMs = 2500;
private int maxPendingCommitIndexNum = 800000;

private int maxPendingCommitBytes = 500 * 1024 * 1024;

private int maxWaitAckTimeMs = 5000;

private int maxPushTimeOutMs = 1000;

Expand Down Expand Up @@ -255,6 +261,22 @@ public void setMaxPendingRequestsNum(int maxPendingRequestsNum) {
this.maxPendingRequestsNum = maxPendingRequestsNum;
}

public int getMaxPendingCommitIndexNum() {
return maxPendingCommitIndexNum;
}

public void setMaxPendingCommitIndexNum(int max) {
this.maxPendingCommitIndexNum = max;
}

public int getMaxPendingCommitBytes() {
return maxPendingCommitBytes;
}

public void setMaxPendingCommitBytes(int max) {
this.maxPendingCommitBytes = max;
}

public int getMaxWaitAckTimeMs() {
return maxWaitAckTimeMs;
}
Expand Down Expand Up @@ -371,6 +393,14 @@ public void setPeerPushThrottlePoint(int peerPushThrottlePoint) {
this.peerPushThrottlePoint = peerPushThrottlePoint;
}

public int getPeerPushPendingMaxBytes() {
return peerPushPendingMaxBytes;
}

public void setPeerPushPendingMaxBytes(int peerPushPendingMaxBytes) {
this.peerPushPendingMaxBytes = peerPushPendingMaxBytes;
}

public int getPeerPushQuota() {
return peerPushQuota;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dledger.exception.DLedgerException;
import io.openmessaging.storage.dledger.metrics.DLedgerMetricsManager;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dledger.protocol.InstallSnapshotRequest;
import io.openmessaging.storage.dledger.protocol.InstallSnapshotResponse;
Expand Down Expand Up @@ -168,8 +169,22 @@ public long getPeerWaterMark(long term, String peerId) {
}
}

public int getPendingCount(long currTerm) {
if (pendingClosure == null) {
return 0;
}
ConcurrentMap<Long, Closure> pendings = pendingClosure.get(currTerm);
if (pendings == null) {
return 0;
}
return pendings.size();
}

public boolean isPendingFull(long currTerm) {
checkTermForPendingMap(currTerm, "isPendingFull");
if (dLedgerStore.isLocalToomuchUncommitted()) {
return true;
}
return pendingClosure.get(currTerm).size() > dLedgerConfig.getMaxPendingRequestsNum();
}

Expand Down Expand Up @@ -227,17 +242,30 @@ public void checkResponseFuturesTimeout(final long beginIndex) {
}
ConcurrentMap<Long, Closure> closureMap = this.pendingClosure.get(term);
if (closureMap != null && closureMap.size() > 0) {
boolean anyChecked = false;
for (long i = beginIndex; i < maxIndex; i++) {
Closure closure = closureMap.get(i);
if (closure == null) {
// index may be removed for complete, we should continue scan
} else if (closure.isTimeOut()) {
closure.done(Status.error(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT));
closureMap.remove(i);
anyChecked = true;
} else {
anyChecked = true;
break;
}
}
if (!anyChecked) {
// since the batch append may have index discontinuous, we should check here
for (Map.Entry<Long, Closure> futureEntry : closureMap.entrySet()) {
Closure closure = futureEntry.getValue();
if (closure.isTimeOut()) {
closure.done(Status.error(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT));
closureMap.remove(futureEntry.getKey());
}
}
}
}
}

Expand Down Expand Up @@ -334,6 +362,7 @@ public void doWork() {
// advance the commit index
// we can only commit the index whose term is equals to current term (refer to raft paper 5.4.2)
if (DLedgerEntryPusher.this.memberState.leaderUpdateCommittedIndex(currTerm, quorumIndex)) {
dLedgerStore.updateCommittedIndex(quorumIndex);
DLedgerEntryPusher.this.fsmCaller.onCommitted(quorumIndex);
} else {
// If the commit index is not advanced, we should wait for the next round
Expand Down Expand Up @@ -941,6 +970,7 @@ private void handleDoAppend(long writeIndex, PushEntryRequest request,
future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
long committedIndex = Math.min(dLedgerStore.getLedgerEndIndex(), request.getCommitIndex());
if (DLedgerEntryPusher.this.memberState.followerUpdateCommittedIndex(committedIndex)) {
dLedgerStore.updateCommittedIndex(committedIndex);
DLedgerEntryPusher.this.fsmCaller.onCommitted(committedIndex);
}
} catch (Throwable t) {
Expand Down Expand Up @@ -1004,6 +1034,7 @@ private CompletableFuture<PushEntryResponse> handleDoCommit(long committedIndex,
PreConditions.check(request.getType() == PushEntryRequest.Type.COMMIT, DLedgerResponseCode.UNKNOWN);
committedIndex = committedIndex <= dLedgerStore.getLedgerEndIndex() ? committedIndex : dLedgerStore.getLedgerEndIndex();
if (DLedgerEntryPusher.this.memberState.followerUpdateCommittedIndex(committedIndex)) {
dLedgerStore.updateCommittedIndex(committedIndex);
DLedgerEntryPusher.this.fsmCaller.onCommitted(committedIndex);
}
future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
Expand All @@ -1024,6 +1055,7 @@ private CompletableFuture<PushEntryResponse> handleDoTruncate(long truncateIndex
future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
long committedIndex = request.getCommitIndex() <= dLedgerStore.getLedgerEndIndex() ? request.getCommitIndex() : dLedgerStore.getLedgerEndIndex();
if (DLedgerEntryPusher.this.memberState.followerUpdateCommittedIndex(committedIndex)) {
dLedgerStore.updateCommittedIndex(committedIndex);
DLedgerEntryPusher.this.fsmCaller.onCommitted(committedIndex);
}
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,8 @@ public AppendFuture<AppendEntryResponse> appendAsLeader(List<byte[]> bodies) thr
}
// only wait last entry ack is ok
future = new BatchAppendFuture<>(positions);
long batchTimeout = (long)dLedgerConfig.getMaxWaitAckTimeMs() * (positions.length / 1000 + 1);
future.setTimeOutMs(batchTimeout);
} else {
DLedgerEntry dLedgerEntry = new DLedgerEntry();
totalBytes += bodies.get(0).length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,9 @@ public void flush() {
public long getLedgerEndTerm() {
return ledgerEndTerm;
}

@Override
public boolean isLocalToomuchUncommitted() {
return getLedgerEndIndex() - memberState.getCommittedIndex() > dLedgerConfig.getMaxPendingCommitIndexNum();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ public abstract class DLedgerStore {

public abstract DLedgerEntry get(Long index);

// should check both index and uncommitted data size
public boolean isLocalToomuchUncommitted() {
return false;
}

public abstract long getLedgerEndTerm();

public abstract long getLedgerEndIndex();
Expand All @@ -37,6 +42,8 @@ public abstract class DLedgerStore {

public abstract long getLedgerBeforeBeginTerm();

public void updateCommittedIndex(long index) {}

protected void updateLedgerEndIndexAndTerm() {
if (getMemberState() != null) {
getMemberState().updateLedgerIndexAndTerm(getLedgerEndIndex(), getLedgerEndTerm());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class DLedgerMmapFileStore extends DLedgerStore {

private long ledgerEndIndex = -1;
private long ledgerEndTerm;
private long committedPos;
private final DLedgerConfig dLedgerConfig;
private final MemberState memberState;
private final MmapFileList dataFileList;
Expand Down Expand Up @@ -654,6 +655,16 @@ public DLedgerEntry getFirstLogOfTargetTerm(long targetTerm, long endIndex) {
return entry;
}

@Override
public void updateCommittedIndex(long index) {
Pair<Long, Integer> posAndSize = getEntryPosAndSize(index);
this.committedPos = posAndSize.getKey() + posAndSize.getValue();
}

private long getCommittedPos() {
return this.committedPos;
}

private Pair<Long, Integer> getEntryPosAndSize(Long index) {
indexCheck(index);
SelectMmapBufferResult indexSbr = null;
Expand Down Expand Up @@ -681,6 +692,16 @@ public long getLedgerEndTerm() {
return ledgerEndTerm;
}

@Override
public boolean isLocalToomuchUncommitted() {
long fallBehindBytes = dataFileList.getMaxWrotePosition() - getCommittedPos();
if (fallBehindBytes > dLedgerConfig.getMaxPendingCommitBytes()) {
return true;
}

return getLedgerEndIndex() - memberState.getCommittedIndex() > dLedgerConfig.getMaxPendingCommitIndexNum();
}

public void addAppendHook(AppendHook writeHook) {
if (!appendHooks.contains(writeHook)) {
appendHooks.add(writeHook);
Expand Down

0 comments on commit edd9fd8

Please sign in to comment.