-
Notifications
You must be signed in to change notification settings - Fork 429
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RATIS-1743. (superseded) #782
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @szetszwo for the patch. I think it solves part of the problem.
@@ -208,8 +243,7 @@ synchronized void updateIndex(long i) { | |||
this.workerThread = new Thread(this::run, name); | |||
|
|||
// Server Id can be null in unit tests | |||
metricRegistry.addDataQueueSizeGauge(queue); | |||
metricRegistry.addLogWorkerQueueSizeGauge(writeTasks.q); | |||
metricRegistry.addQueueSizeGauges(queues); | |||
metricRegistry.addFlushBatchSizeGauge(() -> (Gauge<Integer>) () -> flushBatchSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The syncBatchSize
gauge (created in addFlushBatchSizeGauge
) still holds SegmentedRaftLogWorker
after close()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adoroszlai , good catch. Just have changed it to use AtomicInteger.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately the lambda still has a reference to the enclosing SegmentedRaftLogWorker
instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The following POC resolves this specific leak (please feel free to adapt):
diff --git ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java
index 50d582ac..d213563b 100644
--- ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java
+++ ratis-server/src/main/java/org/apache/ratis/server/metrics/SegmentedRaftLogMetrics.java
@@ -24,6 +24,7 @@ import com.codahale.metrics.Timer;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.ToIntFunction;
@@ -113,8 +114,10 @@ public class SegmentedRaftLogMetrics extends RaftLogMetricsBase {
});
}
- public void addFlushBatchSizeGauge(MetricRegistry.MetricSupplier<Gauge> supplier) {
- registry.gauge(RAFT_LOG_SYNC_BATCH_SIZE, supplier);
+ public AtomicInteger addFlushBatchSizeGauge() {
+ MutableIntegerGaugeSupplier migs = new MutableIntegerGaugeSupplier();
+ registry.gauge(RAFT_LOG_SYNC_BATCH_SIZE, migs);
+ return migs.asAtomicInteger();
}
private Timer getTimer(String timerName) {
@@ -181,4 +184,23 @@ public class SegmentedRaftLogMetrics extends RaftLogMetricsBase {
public void onStateMachineDataReadTimeout() {
registry.counter(RAFT_LOG_STATEMACHINE_DATA_READ_TIMEOUT_COUNT).inc();
}
+
+ public static class MutableIntegerGaugeSupplier implements MetricRegistry.MetricSupplier<Gauge>, Gauge<Integer> {
+ private final AtomicInteger value = new AtomicInteger();
+
+ public AtomicInteger asAtomicInteger() {
+ return value;
+ }
+
+ @Override
+ public Gauge<Integer> newMetric() {
+ return this;
+ }
+
+ @Override
+ public Integer getValue() {
+ return value.get();
+ }
+ }
+
}
diff --git ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index a5b12eb9..fa4516e1 100644
--- ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -211,7 +211,7 @@ class SegmentedRaftLogWorker {
private final long segmentMaxSize;
private final long preallocatedSize;
private final RaftServer.Division server;
- private final AtomicInteger flushBatchSize = new AtomicInteger();
+ private final AtomicInteger flushBatchSize;
private final boolean asyncFlush;
private final boolean unsafeFlush;
@@ -243,7 +243,7 @@ class SegmentedRaftLogWorker {
// Server Id can be null in unit tests
metricRegistry.addQueueSizeGauges(queues);
- metricRegistry.addFlushBatchSizeGauge(() -> flushBatchSize::get);
+ flushBatchSize = metricRegistry.addFlushBatchSizeGauge();
this.logFlushTimer = metricRegistry.getFlushTimer();
this.raftLogSyncTimer = metricRegistry.getRaftLogSyncTimer();
this.raftLogQueueingTimer = metricRegistry.getRaftLogQueueTimer(
With this, we still have a leak due to lastLeaderElapsedTime
gauge in LeaderElectionMetrics
(at least).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adoroszlai , thanks for catching it. I mean to pass the AtomicInteger and use lambda in SegmentedRaftLogMetrics. Then, it won't have a reference to SegmentedRaftLogWorker. Let me try it.
…orker` after `close()`.
@adoroszlai , this approach still cannot fix the failure of TestStorageContainerManagerHA, even with the buffers moved to Queues. If the byte array are created incrementally, TestStorageContainerManagerHA can pass; see #783 . |
I think that's because we have a similar leak in |
@adoroszlai , You are right. Let's merge this then. I somehow thought that this was not able to fix the problem. Thanks a lot! |
this.writeBuffer = ByteBuffer.allocateDirect(bufferSize); | ||
final int logEntryLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt(); | ||
// 4 bytes (serialized size) + logEntryLimit + 4 bytes (checksum) | ||
this.sharedBuffer = () -> new byte[logEntryLimit + 8]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adoroszlai , The buffer is not shared so that TestStorageContainerManagerHA can pass. If we use MemoizedSupplier
as below, it will fail.
this.sharedBuffer = MemoizedSupplier.valueOf(() -> new byte[logEntryLimit + 8]);
Tried to fix LeaderElectionMetrics but TestStorageContainerManagerHA still fails. |
This was superseded by #784. |
static class Buffers { | ||
private final ByteBuffer writeBuffer; | ||
private final Supplier<byte[]> sharedBuffer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adoroszlai , @SzyWilliam , FYI, just have learned from this article that Java lambda is garbage collected in a different way. That is the reason that the Buffers
class here does not work but the AtomicReference
works. See https://tanbt.medium.com/java-lambdas-and-the-garbage-collector-part-1-da934e11518a
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szetszwo Thanks for sharing! Learned a lot from this article.
See https://issues.apache.org/jira/browse/RATIS-1743