Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s3stream): add memory usage detect switch #948

Merged
merged 1 commit into from
Feb 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions s3stream/src/main/java/com/automq/stream/s3/DirectByteBufAlloc.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.slf4j.LoggerFactory;

public class DirectByteBufAlloc {
public static final boolean MEMORY_USAGE_DETECT = Boolean.parseBoolean(System.getenv("AUTOMQ_MEMORY_USAGE_DETECT"));

private static final Logger LOGGER = LoggerFactory.getLogger(DirectByteBufAlloc.class);
private static final PooledByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
private static final Map<Integer, LongAdder> USAGE_STATS = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -67,24 +69,32 @@ public static ByteBuf byteBuffer(int initCapacity) {

public static ByteBuf byteBuffer(int initCapacity, int type) {
try {
LongAdder usage = USAGE_STATS.compute(type, (k, v) -> {
if (v == null) {
v = new LongAdder();
if (MEMORY_USAGE_DETECT) {
LongAdder usage = USAGE_STATS.compute(type, (k, v) -> {
if (v == null) {
v = new LongAdder();
}
v.add(initCapacity);
return v;
});
long now = System.currentTimeMillis();
if (now - lastMetricLogTime > 60000) {
// it's ok to be not thread safe
lastMetricLogTime = now;
DirectByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric();
LOGGER.info("Direct Memory usage: {}", DirectByteBufAlloc.directByteBufAllocMetric);
}
v.add(initCapacity);
return v;
});
long now = System.currentTimeMillis();
if (now - lastMetricLogTime > 60000) {
// it's ok to be not thread safe
lastMetricLogTime = now;
DirectByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric();
LOGGER.info("Direct Memory usage: {}", DirectByteBufAlloc.directByteBufAllocMetric);
return new WrappedByteBuf(ALLOC.directBuffer(initCapacity), () -> usage.add(-initCapacity));
} else {
return ALLOC.directBuffer(initCapacity);
}
return new WrappedByteBuf(ALLOC.directBuffer(initCapacity), () -> usage.add(-initCapacity));
} catch (OutOfMemoryError e) {
DirectByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric();
LOGGER.error("alloc direct buffer OOM, {}", DirectByteBufAlloc.directByteBufAllocMetric, e);
if (MEMORY_USAGE_DETECT) {
DirectByteBufAlloc.directByteBufAllocMetric = new DirectByteBufAllocMetric();
LOGGER.error("alloc direct buffer OOM, {}", DirectByteBufAlloc.directByteBufAllocMetric, e);
} else {
LOGGER.error("alloc direct buffer OOM", e);
}
System.err.println("alloc direct buffer OOM");
Runtime.getRuntime().halt(1);
throw e;
Expand Down
Loading