diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java index e53e11fb..4f4ec89c 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java @@ -48,12 +48,18 @@ public class DLedgerConfig { private int peerPushQuota = 20 * 1024 * 1024; + private int peerPushPendingMaxBytes = 200 * 1024 * 1024; + private String storeType = FILE; //FILE, MEMORY private String dataStorePath; private int maxPendingRequestsNum = 10000; - private int maxWaitAckTimeMs = 2500; + private int maxPendingCommitIndexNum = 800000; + + private int maxPendingCommitBytes = 500 * 1024 * 1024; + + private int maxWaitAckTimeMs = 5000; private int maxPushTimeOutMs = 1000; @@ -255,6 +261,22 @@ public void setMaxPendingRequestsNum(int maxPendingRequestsNum) { this.maxPendingRequestsNum = maxPendingRequestsNum; } + public int getMaxPendingCommitIndexNum() { + return maxPendingCommitIndexNum; + } + + public void setMaxPendingCommitIndexNum(int max) { + this.maxPendingCommitIndexNum = max; + } + + public int getMaxPendingCommitBytes() { + return maxPendingCommitBytes; + } + + public void setMaxPendingCommitBytes(int max) { + this.maxPendingCommitBytes = max; + } + public int getMaxWaitAckTimeMs() { return maxWaitAckTimeMs; } @@ -371,6 +393,14 @@ public void setPeerPushThrottlePoint(int peerPushThrottlePoint) { this.peerPushThrottlePoint = peerPushThrottlePoint; } + public int getPeerPushPendingMaxBytes() { + return peerPushPendingMaxBytes; + } + + public void setPeerPushPendingMaxBytes(int peerPushPendingMaxBytes) { + this.peerPushPendingMaxBytes = peerPushPendingMaxBytes; + } + public int getPeerPushQuota() { return peerPushQuota; } diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java index 5682fd57..19d433ee 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java @@ -25,6 +25,7 @@ import io.openmessaging.storage.dledger.entry.DLedgerEntry; import io.openmessaging.storage.dledger.exception.DLedgerException; import io.openmessaging.storage.dledger.metrics.DLedgerMetricsManager; +import io.openmessaging.storage.dledger.protocol.AppendEntryResponse; import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode; import io.openmessaging.storage.dledger.protocol.InstallSnapshotRequest; import io.openmessaging.storage.dledger.protocol.InstallSnapshotResponse; @@ -168,8 +169,22 @@ public long getPeerWaterMark(long term, String peerId) { } } + public int getPendingCount(long currTerm) { + if (pendingClosure == null) { + return 0; + } + ConcurrentMap pendings = pendingClosure.get(currTerm); + if (pendings == null) { + return 0; + } + return pendings.size(); + } + public boolean isPendingFull(long currTerm) { checkTermForPendingMap(currTerm, "isPendingFull"); + if (dLedgerStore.isLocalToomuchUncommitted()) { + return true; + } return pendingClosure.get(currTerm).size() > dLedgerConfig.getMaxPendingRequestsNum(); } @@ -227,6 +242,7 @@ public void checkResponseFuturesTimeout(final long beginIndex) { } ConcurrentMap closureMap = this.pendingClosure.get(term); if (closureMap != null && closureMap.size() > 0) { + boolean anyChecked = false; for (long i = beginIndex; i < maxIndex; i++) { Closure closure = closureMap.get(i); if (closure == null) { @@ -234,10 +250,22 @@ public void checkResponseFuturesTimeout(final long beginIndex) { } else if (closure.isTimeOut()) { closure.done(Status.error(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT)); closureMap.remove(i); + anyChecked = true; } else { + anyChecked = true; break; } } + if (!anyChecked) { + // since the batch append may have index discontinuous, we should check here + for (Map.Entry futureEntry : closureMap.entrySet()) { + Closure closure = futureEntry.getValue(); + if (closure.isTimeOut()) { + closure.done(Status.error(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT)); + closureMap.remove(futureEntry.getKey()); + } + } + } } } @@ -334,6 +362,7 @@ public void doWork() { // advance the commit index // we can only commit the index whose term is equals to current term (refer to raft paper 5.4.2) if (DLedgerEntryPusher.this.memberState.leaderUpdateCommittedIndex(currTerm, quorumIndex)) { + dLedgerStore.updateCommittedIndex(quorumIndex); DLedgerEntryPusher.this.fsmCaller.onCommitted(quorumIndex); } else { // If the commit index is not advanced, we should wait for the next round @@ -941,6 +970,7 @@ private void handleDoAppend(long writeIndex, PushEntryRequest request, future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode())); long committedIndex = Math.min(dLedgerStore.getLedgerEndIndex(), request.getCommitIndex()); if (DLedgerEntryPusher.this.memberState.followerUpdateCommittedIndex(committedIndex)) { + dLedgerStore.updateCommittedIndex(committedIndex); DLedgerEntryPusher.this.fsmCaller.onCommitted(committedIndex); } } catch (Throwable t) { @@ -1004,6 +1034,7 @@ private CompletableFuture handleDoCommit(long committedIndex, PreConditions.check(request.getType() == PushEntryRequest.Type.COMMIT, DLedgerResponseCode.UNKNOWN); committedIndex = committedIndex <= dLedgerStore.getLedgerEndIndex() ? committedIndex : dLedgerStore.getLedgerEndIndex(); if (DLedgerEntryPusher.this.memberState.followerUpdateCommittedIndex(committedIndex)) { + dLedgerStore.updateCommittedIndex(committedIndex); DLedgerEntryPusher.this.fsmCaller.onCommitted(committedIndex); } future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode())); @@ -1024,6 +1055,7 @@ private CompletableFuture handleDoTruncate(long truncateIndex future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode())); long committedIndex = request.getCommitIndex() <= dLedgerStore.getLedgerEndIndex() ? request.getCommitIndex() : dLedgerStore.getLedgerEndIndex(); if (DLedgerEntryPusher.this.memberState.followerUpdateCommittedIndex(committedIndex)) { + dLedgerStore.updateCommittedIndex(committedIndex); DLedgerEntryPusher.this.fsmCaller.onCommitted(committedIndex); } } catch (Throwable t) { diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java index f0391f3e..8f8fee74 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java @@ -354,6 +354,8 @@ public AppendFuture appendAsLeader(List bodies) thr } // only wait last entry ack is ok future = new BatchAppendFuture<>(positions); + long batchTimeout = (long)dLedgerConfig.getMaxWaitAckTimeMs() * (positions.length / 1000 + 1); + future.setTimeOutMs(batchTimeout); } else { DLedgerEntry dLedgerEntry = new DLedgerEntry(); totalBytes += bodies.get(0).length; diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerMemoryStore.java b/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerMemoryStore.java index a0082f84..ebcdfce4 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerMemoryStore.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerMemoryStore.java @@ -184,4 +184,9 @@ public void flush() { public long getLedgerEndTerm() { return ledgerEndTerm; } + + @Override + public boolean isLocalToomuchUncommitted() { + return getLedgerEndIndex() - memberState.getCommittedIndex() > dLedgerConfig.getMaxPendingCommitIndexNum(); + } } diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerStore.java b/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerStore.java index a37749b7..c9f60860 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerStore.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/store/DLedgerStore.java @@ -29,6 +29,11 @@ public abstract class DLedgerStore { public abstract DLedgerEntry get(Long index); + // should check both index and uncommitted data size + public boolean isLocalToomuchUncommitted() { + return false; + } + public abstract long getLedgerEndTerm(); public abstract long getLedgerEndIndex(); @@ -37,6 +42,8 @@ public abstract class DLedgerStore { public abstract long getLedgerBeforeBeginTerm(); + public void updateCommittedIndex(long index) {} + protected void updateLedgerEndIndexAndTerm() { if (getMemberState() != null) { getMemberState().updateLedgerIndexAndTerm(getLedgerEndIndex(), getLedgerEndTerm()); diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java b/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java index c776d157..3b9a2d53 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java @@ -50,6 +50,7 @@ public class DLedgerMmapFileStore extends DLedgerStore { private long ledgerEndIndex = -1; private long ledgerEndTerm; + private long committedPos; private final DLedgerConfig dLedgerConfig; private final MemberState memberState; private final MmapFileList dataFileList; @@ -654,6 +655,16 @@ public DLedgerEntry getFirstLogOfTargetTerm(long targetTerm, long endIndex) { return entry; } + @Override + public void updateCommittedIndex(long index) { + Pair posAndSize = getEntryPosAndSize(index); + this.committedPos = posAndSize.getKey() + posAndSize.getValue(); + } + + private long getCommittedPos() { + return this.committedPos; + } + private Pair getEntryPosAndSize(Long index) { indexCheck(index); SelectMmapBufferResult indexSbr = null; @@ -681,6 +692,16 @@ public long getLedgerEndTerm() { return ledgerEndTerm; } + @Override + public boolean isLocalToomuchUncommitted() { + long fallBehindBytes = dataFileList.getMaxWrotePosition() - getCommittedPos(); + if (fallBehindBytes > dLedgerConfig.getMaxPendingCommitBytes()) { + return true; + } + + return getLedgerEndIndex() - memberState.getCommittedIndex() > dLedgerConfig.getMaxPendingCommitIndexNum(); + } + public void addAppendHook(AppendHook writeHook) { if (!appendHooks.contains(writeHook)) { appendHooks.add(writeHook);