diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java index 19d433ee..33b300b2 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java @@ -17,6 +17,8 @@ package io.openmessaging.storage.dledger; import com.alibaba.fastjson.JSON; + +import io.openmessaging.storage.dledger.DLedgerEntryPusher.EntryDispatcherState; import io.openmessaging.storage.dledger.common.Closure; import io.openmessaging.storage.dledger.common.ShutdownAbleThread; import io.openmessaging.storage.dledger.common.Status; @@ -55,6 +57,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -180,12 +183,33 @@ public int getPendingCount(long currTerm) { return pendings.size(); } + public long getPendingSize(long currTerm) { + if (dispatcherMap == null) { + return 0; + } + long total = 0; + for (EntryDispatcher dispatcher : dispatcherMap.values()) { + total += dispatcher.pendingTotalSize.get(); + } + return total; + } + public boolean isPendingFull(long currTerm) { checkTermForPendingMap(currTerm, "isPendingFull"); if (dLedgerStore.isLocalToomuchUncommitted()) { return true; } - return pendingClosure.get(currTerm).size() > dLedgerConfig.getMaxPendingRequestsNum(); + if (pendingClosure.get(currTerm).size() > dLedgerConfig.getMaxPendingRequestsNum()) { + return true; + } + // avoid too much memory in pending if more than half followers fall behind too much + int fallBehindTooMuch = 0; + for (EntryDispatcher dispatcher : dispatcherMap.values()) { + if (dispatcher.pendingTotalSize.get() >= dLedgerConfig.getPeerPushPendingMaxBytes()) { + fallBehindTooMuch++; + } + } + return fallBehindTooMuch > dispatcherMap.size() / 2; } public void appendClosure(Closure closure, long term, long index) { @@ -409,6 +433,8 @@ private class EntryDispatcher extends ShutdownAbleThread { private long matchIndex = -1; private final int maxPendingSize = 1000; + private AtomicLong pendingTotalSize = new AtomicLong(0); + private long term = -1; private String leaderId = null; private long lastCheckLeakTimeMs = System.currentTimeMillis(); @@ -715,6 +741,10 @@ private void doAppend() throws Exception { doCheckAppendResponse(); break; } + if (pendingTotalSize.get() >= dLedgerConfig.getPeerPushPendingMaxBytes()) { + // to avoid oom or fullgc, we should wait for a while if too much pending big entry size + break; + } long lastIndexToBeSend = doAppendInner(writeIndex); if (lastIndexToBeSend == -1) { break; @@ -759,8 +789,10 @@ private void sendBatchAppendEntryRequest() throws Exception { StopWatch watch = StopWatch.createStarted(); CompletableFuture responseFuture = dLedgerRpcService.push(batchAppendEntryRequest); pendingMap.put(firstIndex, new Pair<>(System.currentTimeMillis(), batchAppendEntryRequest.getCount())); + pendingTotalSize.addAndGet(entriesSize); responseFuture.whenComplete((x, ex) -> { try { + pendingTotalSize.addAndGet(-1 * entriesSize); PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN); DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode()); switch (responseCode) {