Skip to content

Commit 5928a51

Browse files
apoorvmittal10pdruley
authored andcommitted
KAFKA-17894: Implemented broker topic metrics for Share Group 1/N (KIP-1103) (apache#18444)
The PR implements the BrokerTopicMetrics defined in KIP-1103. The PR also corrected the share-acknowledgement-rate and share-acknowledgement-count metrics defined in KIP-932 as they are moved to BrokerTopicMetrics, necessary changes to KIP-932 broker metrics will be done once we complete KIP-1103. Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai <[email protected]>, Jun Rao <[email protected]>
1 parent bef37de commit 5928a51

File tree

11 files changed

+504
-87
lines changed

11 files changed

+504
-87
lines changed

checkstyle/import-control-server.xml

+3
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@
9292
<allow pkg="org.apache.kafka.network" />
9393
<allow pkg="org.apache.kafka.storage.internals.log" />
9494
</subpackage>
95+
<subpackage name="share">
96+
<allow pkg="org.apache.kafka.storage.log.metrics" />
97+
</subpackage>
9598
</subpackage>
9699

97100
<subpackage name="security">

checkstyle/suppressions.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
files="(KafkaClusterTestKit).java"/>
4646
<suppress checks="NPathComplexity" files="TestKitNodes.java"/>
4747
<suppress checks="JavaNCSS"
48-
files="(RemoteLogManagerTest|SharePartitionTest).java"/>
48+
files="(RemoteLogManagerTest|SharePartitionManagerTest|SharePartitionTest).java"/>
4949
<suppress checks="ClassDataAbstractionCoupling|ClassFanOutComplexity" files="SharePartitionManagerTest"/>
5050
<suppress checks="CyclomaticComplexity" files="SharePartition.java"/>
5151

core/src/main/java/kafka/server/share/SharePartitionManager.java

+65-31
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.kafka.server.util.timer.SystemTimer;
6161
import org.apache.kafka.server.util.timer.SystemTimerReaper;
6262
import org.apache.kafka.server.util.timer.Timer;
63+
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
6364

6465
import org.slf4j.Logger;
6566
import org.slf4j.LoggerFactory;
@@ -68,13 +69,17 @@
6869
import java.util.Collection;
6970
import java.util.Collections;
7071
import java.util.HashMap;
72+
import java.util.HashSet;
7173
import java.util.LinkedHashMap;
7274
import java.util.List;
7375
import java.util.Map;
7476
import java.util.Objects;
77+
import java.util.Optional;
78+
import java.util.Set;
7579
import java.util.concurrent.CompletableFuture;
7680
import java.util.concurrent.ConcurrentHashMap;
7781
import java.util.function.BiConsumer;
82+
import java.util.function.Consumer;
7883

7984
/**
8085
* The SharePartitionManager is responsible for managing the SharePartitions and ShareSessions.
@@ -140,6 +145,11 @@ public class SharePartitionManager implements AutoCloseable {
140145
*/
141146
private final ShareGroupMetrics shareGroupMetrics;
142147

148+
/**
149+
* The broker topic stats is used to record the broker topic metrics for share group.
150+
*/
151+
private final BrokerTopicStats brokerTopicStats;
152+
143153
/**
144154
* The max fetch records is the maximum number of records that can be fetched by a share fetch request.
145155
*/
@@ -155,7 +165,8 @@ public SharePartitionManager(
155165
int maxFetchRecords,
156166
Persister persister,
157167
GroupConfigManager groupConfigManager,
158-
Metrics metrics
168+
Metrics metrics,
169+
BrokerTopicStats brokerTopicStats
159170
) {
160171
this(replicaManager,
161172
time,
@@ -167,7 +178,8 @@ public SharePartitionManager(
167178
maxFetchRecords,
168179
persister,
169180
groupConfigManager,
170-
metrics
181+
metrics,
182+
brokerTopicStats
171183
);
172184
}
173185

@@ -182,7 +194,8 @@ private SharePartitionManager(
182194
int maxFetchRecords,
183195
Persister persister,
184196
GroupConfigManager groupConfigManager,
185-
Metrics metrics
197+
Metrics metrics,
198+
BrokerTopicStats brokerTopicStats
186199
) {
187200
this(replicaManager,
188201
time,
@@ -196,7 +209,8 @@ private SharePartitionManager(
196209
maxFetchRecords,
197210
persister,
198211
groupConfigManager,
199-
metrics
212+
metrics,
213+
brokerTopicStats
200214
);
201215
}
202216

@@ -213,7 +227,8 @@ private SharePartitionManager(
213227
int maxFetchRecords,
214228
Persister persister,
215229
GroupConfigManager groupConfigManager,
216-
Metrics metrics
230+
Metrics metrics,
231+
BrokerTopicStats brokerTopicStats
217232
) {
218233
this.replicaManager = replicaManager;
219234
this.time = time;
@@ -227,6 +242,7 @@ private SharePartitionManager(
227242
this.groupConfigManager = groupConfigManager;
228243
this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time);
229244
this.maxFetchRecords = maxFetchRecords;
245+
this.brokerTopicStats = brokerTopicStats;
230246
}
231247

232248
/**
@@ -252,7 +268,7 @@ public CompletableFuture<Map<TopicIdPartition, PartitionData>> fetchMessages(
252268
partitionMaxBytes.keySet(), groupId, fetchParams);
253269

254270
CompletableFuture<Map<TopicIdPartition, PartitionData>> future = new CompletableFuture<>();
255-
processShareFetch(new ShareFetch(fetchParams, groupId, memberId, future, partitionMaxBytes, batchSize, maxFetchRecords));
271+
processShareFetch(new ShareFetch(fetchParams, groupId, memberId, future, partitionMaxBytes, batchSize, maxFetchRecords, brokerTopicStats));
256272

257273
return future;
258274
}
@@ -274,9 +290,11 @@ public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.Part
274290
) {
275291
log.trace("Acknowledge request for topicIdPartitions: {} with groupId: {}",
276292
acknowledgeTopics.keySet(), groupId);
277-
this.shareGroupMetrics.shareAcknowledgement();
278293
Map<TopicIdPartition, CompletableFuture<Throwable>> futures = new HashMap<>();
294+
// Track the topics for which we have received an acknowledgement for metrics.
295+
Set<String> topics = new HashSet<>();
279296
acknowledgeTopics.forEach((topicIdPartition, acknowledgePartitionBatches) -> {
297+
topics.add(topicIdPartition.topic());
280298
SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, topicIdPartition);
281299
SharePartition sharePartition = partitionCacheMap.get(sharePartitionKey);
282300
if (sharePartition != null) {
@@ -302,7 +320,13 @@ public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.Part
302320
}
303321
});
304322

305-
return mapAcknowledgementFutures(futures);
323+
// Update the metrics for the topics for which we have received an acknowledgement.
324+
topics.forEach(topic -> {
325+
brokerTopicStats.allTopicsStats().totalShareAcknowledgementRequestRate().mark();
326+
brokerTopicStats.topicStats(topic).totalShareAcknowledgementRequestRate().mark();
327+
});
328+
329+
return mapAcknowledgementFutures(futures, Optional.of(failedShareAcknowledgeMetricsHandler()));
306330
}
307331

308332
/**
@@ -363,24 +387,31 @@ public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.Part
363387
}
364388
});
365389

366-
return mapAcknowledgementFutures(futuresMap);
390+
return mapAcknowledgementFutures(futuresMap, Optional.empty());
367391
}
368392

369-
private CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> mapAcknowledgementFutures(Map<TopicIdPartition, CompletableFuture<Throwable>> futuresMap) {
393+
private CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>> mapAcknowledgementFutures(
394+
Map<TopicIdPartition, CompletableFuture<Throwable>> futuresMap,
395+
Optional<Consumer<Set<String>>> failedMetricsHandler
396+
) {
370397
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
371398
futuresMap.values().toArray(new CompletableFuture[0]));
372399
return allFutures.thenApply(v -> {
373400
Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> result = new HashMap<>();
401+
// Keep the set as same topic might appear multiple times. Multiple partitions can fail for same topic.
402+
Set<String> failedTopics = new HashSet<>();
374403
futuresMap.forEach((topicIdPartition, future) -> {
375404
ShareAcknowledgeResponseData.PartitionData partitionData = new ShareAcknowledgeResponseData.PartitionData()
376405
.setPartitionIndex(topicIdPartition.partition());
377406
Throwable t = future.join();
378407
if (t != null) {
379408
partitionData.setErrorCode(Errors.forException(t).code())
380409
.setErrorMessage(t.getMessage());
410+
failedTopics.add(topicIdPartition.topic());
381411
}
382412
result.put(topicIdPartition, partitionData);
383413
});
414+
failedMetricsHandler.ifPresent(handler -> handler.accept(failedTopics));
384415
return result;
385416
});
386417
}
@@ -554,7 +585,10 @@ void processShareFetch(ShareFetch shareFetch) {
554585

555586
List<DelayedShareFetchKey> delayedShareFetchWatchKeys = new ArrayList<>();
556587
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>();
588+
// Track the topics for which we have received a share fetch request for metrics.
589+
Set<String> topics = new HashSet<>();
557590
for (TopicIdPartition topicIdPartition : shareFetch.partitionMaxBytes().keySet()) {
591+
topics.add(topicIdPartition.topic());
558592
SharePartitionKey sharePartitionKey = sharePartitionKey(
559593
shareFetch.groupId(),
560594
topicIdPartition
@@ -598,6 +632,12 @@ void processShareFetch(ShareFetch shareFetch) {
598632
sharePartitions.put(topicIdPartition, sharePartition);
599633
}
600634

635+
// Update the metrics for the topics for which we have received a share fetch request.
636+
topics.forEach(topic -> {
637+
brokerTopicStats.allTopicsStats().totalShareFetchRequestRate().mark();
638+
brokerTopicStats.topicStats(topic).totalShareFetchRequestRate().mark();
639+
});
640+
601641
// If all the partitions in the request errored out, then complete the fetch request with an exception.
602642
if (shareFetch.errorInAllPartitions()) {
603643
shareFetch.maybeComplete(Collections.emptyMap());
@@ -695,6 +735,21 @@ private static void removeSharePartitionFromCache(
695735
}
696736
}
697737

738+
/**
739+
* The handler to update the failed share acknowledge request metrics.
740+
*
741+
* @return A Consumer that updates the failed share acknowledge request metrics.
742+
*/
743+
private Consumer<Set<String>> failedShareAcknowledgeMetricsHandler() {
744+
return failedTopics -> {
745+
// Update failed share acknowledge request metric.
746+
failedTopics.forEach(topic -> {
747+
brokerTopicStats.allTopicsStats().failedShareAcknowledgementRequestRate().mark();
748+
brokerTopicStats.topicStats(topic).failedShareAcknowledgementRequestRate().mark();
749+
});
750+
};
751+
}
752+
698753
/**
699754
* The SharePartitionListener is used to listen for partition events. The share partition is associated with
700755
* the topic-partition, we need to handle the partition events for the share partition.
@@ -759,10 +814,6 @@ static class ShareGroupMetrics {
759814

760815
public static final String METRICS_GROUP_NAME = "share-group-metrics";
761816

762-
public static final String SHARE_ACK_SENSOR = "share-acknowledgement-sensor";
763-
public static final String SHARE_ACK_RATE = "share-acknowledgement-rate";
764-
public static final String SHARE_ACK_COUNT = "share-acknowledgement-count";
765-
766817
public static final String RECORD_ACK_SENSOR_PREFIX = "record-acknowledgement";
767818
public static final String RECORD_ACK_RATE = "record-acknowledgement-rate";
768819
public static final String RECORD_ACK_COUNT = "record-acknowledgement-count";
@@ -775,7 +826,6 @@ static class ShareGroupMetrics {
775826
public static final Map<Byte, String> RECORD_ACKS_MAP = new HashMap<>();
776827

777828
private final Time time;
778-
private final Sensor shareAcknowledgementSensor;
779829
private final Map<Byte, Sensor> recordAcksSensorMap = new HashMap<>();
780830
private final Sensor partitionLoadTimeSensor;
781831

@@ -787,18 +837,6 @@ static class ShareGroupMetrics {
787837

788838
public ShareGroupMetrics(Metrics metrics, Time time) {
789839
this.time = time;
790-
791-
shareAcknowledgementSensor = metrics.sensor(SHARE_ACK_SENSOR);
792-
shareAcknowledgementSensor.add(new Meter(
793-
metrics.metricName(
794-
SHARE_ACK_RATE,
795-
METRICS_GROUP_NAME,
796-
"Rate of acknowledge requests."),
797-
metrics.metricName(
798-
SHARE_ACK_COUNT,
799-
METRICS_GROUP_NAME,
800-
"The number of acknowledge requests.")));
801-
802840
for (Map.Entry<Byte, String> entry : RECORD_ACKS_MAP.entrySet()) {
803841
recordAcksSensorMap.put(entry.getKey(), metrics.sensor(String.format("%s-%s-sensor", RECORD_ACK_SENSOR_PREFIX, entry.getValue())));
804842
recordAcksSensorMap.get(entry.getKey())
@@ -828,10 +866,6 @@ public ShareGroupMetrics(Metrics metrics, Time time) {
828866
new Max());
829867
}
830868

831-
void shareAcknowledgement() {
832-
shareAcknowledgementSensor.record();
833-
}
834-
835869
void recordAcknowledgement(byte ackType) {
836870
// unknown ack types (such as gaps for control records) are intentionally ignored
837871
if (recordAcksSensorMap.containsKey(ackType)) {

core/src/main/scala/kafka/server/BrokerServer.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,8 @@ class BrokerServer(
440440
config.shareGroupConfig.shareFetchMaxFetchRecords,
441441
persister,
442442
groupConfigManager,
443-
metrics
443+
metrics,
444+
brokerTopicStats
444445
)
445446

446447
dataPlaneRequestProcessor = new KafkaApis(

0 commit comments

Comments
 (0)