Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support control memory to avoid oom or fgc #314

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,18 @@ public class DLedgerConfig {

private int peerPushQuota = 20 * 1024 * 1024;

private int peerPushPendingMaxBytes = 200 * 1024 * 1024;

private String storeType = FILE; //FILE, MEMORY
private String dataStorePath;

private int maxPendingRequestsNum = 10000;

private int maxWaitAckTimeMs = 2500;
private int maxPendingCommitIndexNum = 800000;

private int maxPendingCommitBytes = 500 * 1024 * 1024;

private int maxWaitAckTimeMs = 5000;

private int maxPushTimeOutMs = 1000;

Expand Down Expand Up @@ -255,6 +261,22 @@ public void setMaxPendingRequestsNum(int maxPendingRequestsNum) {
this.maxPendingRequestsNum = maxPendingRequestsNum;
}

public int getMaxPendingCommitIndexNum() {
return maxPendingCommitIndexNum;
}

public void setMaxPendingCommitIndexNum(int max) {
this.maxPendingCommitIndexNum = max;
}

public int getMaxPendingCommitBytes() {
return maxPendingCommitBytes;
}

public void setMaxPendingCommitBytes(int max) {
this.maxPendingCommitBytes = max;
}

public int getMaxWaitAckTimeMs() {
return maxWaitAckTimeMs;
}
Expand Down Expand Up @@ -371,6 +393,14 @@ public void setPeerPushThrottlePoint(int peerPushThrottlePoint) {
this.peerPushThrottlePoint = peerPushThrottlePoint;
}

public int getPeerPushPendingMaxBytes() {
return peerPushPendingMaxBytes;
}

public void setPeerPushPendingMaxBytes(int peerPushPendingMaxBytes) {
this.peerPushPendingMaxBytes = peerPushPendingMaxBytes;
}

public int getPeerPushQuota() {
return peerPushQuota;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package io.openmessaging.storage.dledger;

import com.alibaba.fastjson.JSON;

import io.openmessaging.storage.dledger.DLedgerEntryPusher.EntryDispatcherState;
import io.openmessaging.storage.dledger.common.Closure;
import io.openmessaging.storage.dledger.common.ShutdownAbleThread;
import io.openmessaging.storage.dledger.common.Status;
Expand All @@ -25,6 +27,7 @@
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
import io.openmessaging.storage.dledger.exception.DLedgerException;
import io.openmessaging.storage.dledger.metrics.DLedgerMetricsManager;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dledger.protocol.InstallSnapshotRequest;
import io.openmessaging.storage.dledger.protocol.InstallSnapshotResponse;
Expand Down Expand Up @@ -54,6 +57,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -168,9 +172,44 @@ public long getPeerWaterMark(long term, String peerId) {
}
}

public int getPendingCount(long currTerm) {
if (pendingClosure == null) {
return 0;
}
ConcurrentMap<Long, Closure> pendings = pendingClosure.get(currTerm);
if (pendings == null) {
return 0;
}
return pendings.size();
}

public long getPendingSize(long currTerm) {
if (dispatcherMap == null) {
return 0;
}
long total = 0;
for (EntryDispatcher dispatcher : dispatcherMap.values()) {
total += dispatcher.pendingTotalSize.get();
}
return total;
}

public boolean isPendingFull(long currTerm) {
checkTermForPendingMap(currTerm, "isPendingFull");
return pendingClosure.get(currTerm).size() > dLedgerConfig.getMaxPendingRequestsNum();
if (dLedgerStore.isLocalToomuchUncommitted()) {
return true;
}
if (pendingClosure.get(currTerm).size() > dLedgerConfig.getMaxPendingRequestsNum()) {
return true;
}
// avoid too much memory in pending if more than half followers fall behind too much
int fallBehindTooMuch = 0;
for (EntryDispatcher dispatcher : dispatcherMap.values()) {
if (dispatcher.pendingTotalSize.get() >= dLedgerConfig.getPeerPushPendingMaxBytes()) {
fallBehindTooMuch++;
}
}
return fallBehindTooMuch > dispatcherMap.size() / 2;
}

public void appendClosure(Closure closure, long term, long index) {
Expand Down Expand Up @@ -227,17 +266,30 @@ public void checkResponseFuturesTimeout(final long beginIndex) {
}
ConcurrentMap<Long, Closure> closureMap = this.pendingClosure.get(term);
if (closureMap != null && closureMap.size() > 0) {
boolean anyChecked = false;
for (long i = beginIndex; i < maxIndex; i++) {
Closure closure = closureMap.get(i);
if (closure == null) {
// index may be removed for complete, we should continue scan
} else if (closure.isTimeOut()) {
closure.done(Status.error(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT));
closureMap.remove(i);
anyChecked = true;
} else {
anyChecked = true;
break;
}
}
if (!anyChecked) {
// since the batch append may have index discontinuous, we should check here
for (Map.Entry<Long, Closure> futureEntry : closureMap.entrySet()) {
Closure closure = futureEntry.getValue();
if (closure.isTimeOut()) {
closure.done(Status.error(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT));
closureMap.remove(futureEntry.getKey());
}
}
}
}
}

Expand Down Expand Up @@ -334,6 +386,7 @@ public void doWork() {
// advance the commit index
// we can only commit the index whose term is equals to current term (refer to raft paper 5.4.2)
if (DLedgerEntryPusher.this.memberState.leaderUpdateCommittedIndex(currTerm, quorumIndex)) {
dLedgerStore.updateCommittedIndex(quorumIndex);
DLedgerEntryPusher.this.fsmCaller.onCommitted(quorumIndex);
} else {
// If the commit index is not advanced, we should wait for the next round
Expand Down Expand Up @@ -380,6 +433,8 @@ private class EntryDispatcher extends ShutdownAbleThread {
private long matchIndex = -1;

private final int maxPendingSize = 1000;
private AtomicLong pendingTotalSize = new AtomicLong(0);

private long term = -1;
private String leaderId = null;
private long lastCheckLeakTimeMs = System.currentTimeMillis();
Expand Down Expand Up @@ -686,6 +741,10 @@ private void doAppend() throws Exception {
doCheckAppendResponse();
break;
}
if (pendingTotalSize.get() >= dLedgerConfig.getPeerPushPendingMaxBytes()) {
// to avoid oom or fullgc, we should wait for a while if too much pending big entry size
break;
}
long lastIndexToBeSend = doAppendInner(writeIndex);
if (lastIndexToBeSend == -1) {
break;
Expand Down Expand Up @@ -730,8 +789,11 @@ private void sendBatchAppendEntryRequest() throws Exception {
StopWatch watch = StopWatch.createStarted();
CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(batchAppendEntryRequest);
pendingMap.put(firstIndex, new Pair<>(System.currentTimeMillis(), batchAppendEntryRequest.getCount()));
pendingTotalSize.addAndGet(entriesSize);
responseFuture.whenComplete((x, ex) -> {
try {
pendingTotalSize.addAndGet(-1 * entriesSize);
wakeup();
PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN);
DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());
switch (responseCode) {
Expand Down Expand Up @@ -941,6 +1003,7 @@ private void handleDoAppend(long writeIndex, PushEntryRequest request,
future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
long committedIndex = Math.min(dLedgerStore.getLedgerEndIndex(), request.getCommitIndex());
if (DLedgerEntryPusher.this.memberState.followerUpdateCommittedIndex(committedIndex)) {
dLedgerStore.updateCommittedIndex(committedIndex);
DLedgerEntryPusher.this.fsmCaller.onCommitted(committedIndex);
}
} catch (Throwable t) {
Expand Down Expand Up @@ -1004,6 +1067,7 @@ private CompletableFuture<PushEntryResponse> handleDoCommit(long committedIndex,
PreConditions.check(request.getType() == PushEntryRequest.Type.COMMIT, DLedgerResponseCode.UNKNOWN);
committedIndex = committedIndex <= dLedgerStore.getLedgerEndIndex() ? committedIndex : dLedgerStore.getLedgerEndIndex();
if (DLedgerEntryPusher.this.memberState.followerUpdateCommittedIndex(committedIndex)) {
dLedgerStore.updateCommittedIndex(committedIndex);
DLedgerEntryPusher.this.fsmCaller.onCommitted(committedIndex);
}
future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
Expand All @@ -1024,6 +1088,7 @@ private CompletableFuture<PushEntryResponse> handleDoTruncate(long truncateIndex
future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));
long committedIndex = request.getCommitIndex() <= dLedgerStore.getLedgerEndIndex() ? request.getCommitIndex() : dLedgerStore.getLedgerEndIndex();
if (DLedgerEntryPusher.this.memberState.followerUpdateCommittedIndex(committedIndex)) {
dLedgerStore.updateCommittedIndex(committedIndex);
DLedgerEntryPusher.this.fsmCaller.onCommitted(committedIndex);
}
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,8 @@ public AppendFuture<AppendEntryResponse> appendAsLeader(List<byte[]> bodies) thr
}
// only wait last entry ack is ok
future = new BatchAppendFuture<>(positions);
long batchTimeout = (long)dLedgerConfig.getMaxWaitAckTimeMs() * (positions.length / 1000 + 1);
future.setTimeOutMs(batchTimeout);
} else {
DLedgerEntry dLedgerEntry = new DLedgerEntry();
totalBytes += bodies.get(0).length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,9 @@ public void flush() {
public long getLedgerEndTerm() {
return ledgerEndTerm;
}

@Override
public boolean isLocalToomuchUncommitted() {
return getLedgerEndIndex() - memberState.getCommittedIndex() > dLedgerConfig.getMaxPendingCommitIndexNum();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ public abstract class DLedgerStore {

public abstract DLedgerEntry get(Long index);

// should check both index and uncommitted data size
public boolean isLocalToomuchUncommitted() {
return false;
}

public abstract long getLedgerEndTerm();

public abstract long getLedgerEndIndex();
Expand All @@ -37,6 +42,8 @@ public abstract class DLedgerStore {

public abstract long getLedgerBeforeBeginTerm();

public void updateCommittedIndex(long index) {}

protected void updateLedgerEndIndexAndTerm() {
if (getMemberState() != null) {
getMemberState().updateLedgerIndexAndTerm(getLedgerEndIndex(), getLedgerEndTerm());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class DLedgerMmapFileStore extends DLedgerStore {

private long ledgerEndIndex = -1;
private long ledgerEndTerm;
private long committedPos;
private final DLedgerConfig dLedgerConfig;
private final MemberState memberState;
private final MmapFileList dataFileList;
Expand Down Expand Up @@ -654,6 +655,16 @@ public DLedgerEntry getFirstLogOfTargetTerm(long targetTerm, long endIndex) {
return entry;
}

@Override
public void updateCommittedIndex(long index) {
Pair<Long, Integer> posAndSize = getEntryPosAndSize(index);
this.committedPos = posAndSize.getKey() + posAndSize.getValue();
}

private long getCommittedPos() {
return this.committedPos;
}

private Pair<Long, Integer> getEntryPosAndSize(Long index) {
indexCheck(index);
SelectMmapBufferResult indexSbr = null;
Expand Down Expand Up @@ -681,6 +692,16 @@ public long getLedgerEndTerm() {
return ledgerEndTerm;
}

@Override
public boolean isLocalToomuchUncommitted() {
long fallBehindBytes = dataFileList.getMaxWrotePosition() - getCommittedPos();
if (fallBehindBytes > dLedgerConfig.getMaxPendingCommitBytes()) {
return true;
}

return getLedgerEndIndex() - memberState.getCommittedIndex() > dLedgerConfig.getMaxPendingCommitIndexNum();
}

public void addAppendHook(AppendHook writeHook) {
if (!appendHooks.contains(writeHook)) {
appendHooks.add(writeHook);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,54 @@ public void testPushNetworkOffline() throws Exception {
Assertions.assertTrue(hasWait);
}

@Test
public void testPushNetworkOfflineWithSmallFallConfig() throws Exception {
String group = UUID.randomUUID().toString();
String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort());

DLedgerServer dLedgerServer0 = launchServer(group, peers, "n0", "n0", DLedgerConfig.FILE);
dLedgerServer0.getDLedgerConfig().setMaxPendingCommitBytes(100);
boolean hasWait = false;
for (int i = 0; i < 3; i++) {
AppendEntryRequest appendEntryRequest = new AppendEntryRequest();
appendEntryRequest.setGroup(group);
appendEntryRequest.setRemoteId(dLedgerServer0.getMemberState().getSelfId());
appendEntryRequest.setBody(new byte[128]);
CompletableFuture<AppendEntryResponse> future = dLedgerServer0.handleAppend(appendEntryRequest);
Assertions.assertTrue(future instanceof AppendFuture);
if (future.isDone()) {
Assertions.assertEquals(DLedgerResponseCode.LEADER_PENDING_FULL.getCode(), future.get().getCode());
hasWait = true;
break;
}
}
Assertions.assertTrue(hasWait);
}

@Test
public void testPushNetworkOfflineWithSmallPendingCommitIndex() throws Exception {
String group = UUID.randomUUID().toString();
String peers = String.format("n0-localhost:%d;n1-localhost:%d", nextPort(), nextPort());

DLedgerServer dLedgerServer0 = launchServer(group, peers, "n0", "n0", DLedgerConfig.FILE);
dLedgerServer0.getDLedgerConfig().setMaxPendingCommitIndexNum(10);
boolean hasWait = false;
for (int i = 0; i < 12; i++) {
AppendEntryRequest appendEntryRequest = new AppendEntryRequest();
appendEntryRequest.setGroup(group);
appendEntryRequest.setRemoteId(dLedgerServer0.getMemberState().getSelfId());
appendEntryRequest.setBody(new byte[128]);
CompletableFuture<AppendEntryResponse> future = dLedgerServer0.handleAppend(appendEntryRequest);
Assertions.assertTrue(future instanceof AppendFuture);
if (future.isDone()) {
Assertions.assertEquals(DLedgerResponseCode.LEADER_PENDING_FULL.getCode(), future.get().getCode());
hasWait = true;
break;
}
}
Assertions.assertTrue(hasWait);
}

@Test
public void testPushNetworkNotStable() throws Exception {
String group = UUID.randomUUID().toString();
Expand Down
Loading