Skip to content

Commit 3df85e2

Browse files
committed
Merge branch '1.13.x'
2 parents 710075a + b3ab39e commit 3df85e2

File tree

2 files changed

+56
-14
lines changed

2 files changed

+56
-14
lines changed

implementations/micrometer-registry-statsd/src/main/java/io/micrometer/statsd/StatsdMeterRegistry.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.micrometer.core.instrument.internal.DefaultMeter;
2727
import io.micrometer.core.instrument.util.HierarchicalNameMapper;
2828
import io.micrometer.statsd.internal.*;
29+
import io.netty.channel.Channel;
2930
import io.netty.channel.unix.DomainSocketAddress;
3031
import io.netty.util.AttributeKey;
3132
import org.reactivestreams.Publisher;
@@ -94,6 +95,9 @@ public class StatsdMeterRegistry extends MeterRegistry {
9495

9596
Disposable.Swap statsdConnection = Disposables.swap();
9697

98+
@Nullable
99+
private Channel flushableChannel;
100+
97101
private Disposable.Swap meterPoller = Disposables.swap();
98102

99103
@Nullable
@@ -295,6 +299,7 @@ private void connectAndSubscribe(UdpClient udpClient) {
295299
private void retryReplaceClient(Mono<? extends Connection> connectMono) {
296300
connectMono.retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1)).maxBackoff(Duration.ofMinutes(1)))
297301
.subscribe(connection -> {
302+
this.flushableChannel = connection.channel();
298303
this.statsdConnection.replace(connection);
299304

300305
// now that we're connected, start polling gauges and other pollable
@@ -309,6 +314,9 @@ private void startPolling() {
309314

310315
public void stop() {
311316
if (started.compareAndSet(true, false)) {
317+
if (this.flushableChannel != null) {
318+
this.flushableChannel.flush();
319+
}
312320
if (statsdConnection.get() != null) {
313321
statsdConnection.get().dispose();
314322
}
@@ -321,6 +329,7 @@ public void stop() {
321329
@Override
322330
public void close() {
323331
poll();
332+
this.sink.complete();
324333
stop();
325334
super.close();
326335
}

implementations/micrometer-registry-statsd/src/test/java/io/micrometer/statsd/StatsdMeterRegistryPublishTest.java

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,25 @@ void cleanUp() {
7979
}
8080
}
8181

82+
@ParameterizedTest
83+
@EnumSource(StatsdProtocol.class)
84+
void receiveAllBufferedMetricsAfterCloseSuccessfully(StatsdProtocol protocol) throws InterruptedException {
85+
skipUdsTestOnWindows(protocol);
86+
serverLatch = new CountDownLatch(3);
87+
server = startServer(protocol, 0);
88+
89+
final int port = getPort(protocol);
90+
meterRegistry = new StatsdMeterRegistry(getBufferedConfig(protocol, port), Clock.SYSTEM);
91+
startRegistryAndWaitForClient();
92+
Thread.sleep(1000);
93+
Counter counter = Counter.builder("my.counter").register(meterRegistry);
94+
counter.increment();
95+
counter.increment();
96+
counter.increment();
97+
meterRegistry.close();
98+
assertThat(serverLatch.await(5, TimeUnit.SECONDS)).isTrue();
99+
}
100+
82101
@ParameterizedTest
83102
@EnumSource(StatsdProtocol.class)
84103
void receiveMetricsSuccessfully(StatsdProtocol protocol) throws InterruptedException {
@@ -336,11 +355,14 @@ private DisposableChannel startServer(StatsdProtocol protocol, int port) {
336355
return UdpServer.create()
337356
.bindAddress(() -> protocol == StatsdProtocol.UDP
338357
? InetSocketAddress.createUnresolved("localhost", port) : newDomainSocketAddress())
339-
.handle((in, out) -> in.receive().asString().flatMap(packet -> {
340-
serverLatch.countDown();
341-
serverMetricReadCount.getAndIncrement();
342-
return Flux.never();
343-
}))
358+
.handle((in, out) -> in.receive()
359+
.asString()
360+
.flatMap(packet -> Flux.just(packet.split("\n")))
361+
.flatMap(packetLine -> {
362+
serverLatch.countDown();
363+
serverMetricReadCount.getAndIncrement();
364+
return Flux.never();
365+
}))
344366
.doOnBound((server) -> bound = true)
345367
.doOnUnbound((server) -> bound = false)
346368
.wiretap("udpserver", LogLevel.INFO)
@@ -351,14 +373,17 @@ else if (protocol == StatsdProtocol.TCP) {
351373
return TcpServer.create()
352374
.host("localhost")
353375
.port(port)
354-
.handle((in, out) -> in.receive().asString().flatMap(packet -> {
355-
IntStream.range(0, packet.split("my.counter").length - 1).forEach(i -> {
356-
serverLatch.countDown();
357-
serverMetricReadCount.getAndIncrement();
358-
});
359-
in.withConnection(channel::set);
360-
return Flux.never();
361-
}))
376+
.handle((in, out) -> in.receive()
377+
.asString()
378+
.flatMap(packet -> Flux.just(packet.split("\n")))
379+
.flatMap(packetLine -> {
380+
IntStream.range(0, packetLine.split("my.counter").length - 1).forEach(i -> {
381+
serverLatch.countDown();
382+
serverMetricReadCount.getAndIncrement();
383+
});
384+
in.withConnection(channel::set);
385+
return Flux.never();
386+
}))
362387
.doOnBound((server) -> bound = true)
363388
.doOnUnbound((server) -> {
364389
bound = false;
@@ -388,6 +413,14 @@ private static DomainSocketAddress newDomainSocketAddress() {
388413
}
389414

390415
private StatsdConfig getUnbufferedConfig(StatsdProtocol protocol, int port) {
416+
return getConfig(protocol, port, false);
417+
}
418+
419+
private StatsdConfig getBufferedConfig(StatsdProtocol protocol, int port) {
420+
return getConfig(protocol, port, true);
421+
}
422+
423+
private StatsdConfig getConfig(StatsdProtocol protocol, int port, boolean buffered) {
391424
return new StatsdConfig() {
392425
@Override
393426
public String get(String key) {
@@ -411,7 +444,7 @@ public StatsdProtocol protocol() {
411444

412445
@Override
413446
public boolean buffered() {
414-
return false;
447+
return buffered;
415448
}
416449
};
417450
}

0 commit comments

Comments
 (0)