Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1846,6 +1846,21 @@ metricsServletTimeoutMs=30000
# Enable or disable broker bundles metrics. The default value is false.
exposeBundlesMetricsInPrometheus=false

# Enable or disable custom topic metric labels feature.
# If enabled, custom metric labels can be set on topics and will be exposed in Prometheus metrics.
# Default is false.
exposeCustomTopicMetricLabelsEnabled=false

# A comma-separated list of allowed custom metric label keys.
# Only these keys can be set as custom metric labels on topics.
# Example: sla_tier,data_sensitivity,cost_center,app_owner
# If empty and the feature is enabled, no custom metric labels can be set.
allowedCustomMetricLabelKeys=

# Maximum character length for a custom metric label value.
# Default is 128.
maxCustomMetricLabelValueLength=128

### --- Functions --- ###

# Enable Functions Worker Service in Broker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3474,6 +3474,30 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
)
private boolean exposeBundlesMetricsInPrometheus = false;

@FieldContext(
category = CATEGORY_METRICS,
doc = "Enable or disable custom topic metric labels feature. "
+ "If enabled, custom metric labels can be set on topics and will be exposed in Prometheus metrics. "
+ "Default is false."
)
private boolean exposeCustomTopicMetricLabelsEnabled = false;

@FieldContext(
category = CATEGORY_METRICS,
doc = "A comma-separated list of allowed custom metric label keys. "
+ "Only these keys can be set as custom metric labels on topics. "
+ "Example: sla_tier,data_sensitivity,cost_center,app_owner. "
+ "If empty and the feature is enabled, no custom metric labels can be set."
)
private String allowedCustomMetricLabelKeys = "";

@FieldContext(
category = CATEGORY_METRICS,
doc = "Maximum character length for a custom metric label value. "
+ "Default is 128."
)
private int maxCustomMetricLabelValueLength = 128;

/**** --- Functions. --- ****/
@FieldContext(
category = CATEGORY_FUNCTIONS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5466,4 +5466,86 @@ private static Long getIndexFromEntry(Entry entry) {
}
});
}

protected CompletableFuture<Void> internalSetCustomMetricLabels(Map<String, String> labels, boolean isGlobal) {
// Feature flag check
if (!pulsar().getConfiguration().isExposeCustomTopicMetricLabelsEnabled()) {
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Custom topic metric labels feature is disabled"));
}

// Validate labels against allowed keys and value length
Set<String> allowedKeys = getAllowedCustomMetricLabelKeys();
int maxValueLength = pulsar().getConfiguration().getMaxCustomMetricLabelValueLength();

if (labels != null && !labels.isEmpty()) {
for (Map.Entry<String, String> entry : labels.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();

// Check if key is allowed
if (allowedKeys.isEmpty() || !allowedKeys.contains(key)) {
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Label key '" + key + "' is not in the list of allowed custom metric label keys"));
}

// Check value length
if (value != null && value.length() > maxValueLength) {
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Label value for key '" + key + "' exceeds maximum length of " + maxValueLength));
}
}
}

return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal,
labels == null || labels.isEmpty(), policies -> {
if (labels == null || labels.isEmpty()) {
policies.setCustomMetricLabels(new HashMap<>());
} else {
policies.setCustomMetricLabels(new HashMap<>(labels));
}
});
}

protected CompletableFuture<Map<String, String>> internalGetCustomMetricLabels(boolean isGlobal) {
if (!pulsar().getConfiguration().isExposeCustomTopicMetricLabelsEnabled()) {
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Custom topic metric labels feature is disabled"));
}

return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getCustomMetricLabels)
.orElse(null));
}

protected CompletableFuture<Void> internalRemoveCustomMetricLabels(boolean removeAll, List<String> keys,
boolean isGlobal) {
if (!pulsar().getConfiguration().isExposeCustomTopicMetricLabelsEnabled()) {
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Custom topic metric labels feature is disabled"));
}

return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, true, policies -> {
Map<String, String> currentLabels = policies.getCustomMetricLabels();
if (currentLabels == null || currentLabels.isEmpty()) {
return; // Nothing to remove
}
if (removeAll) {
policies.setCustomMetricLabels(new HashMap<>());
} else {
for (String key : keys) {
currentLabels.remove(key);
}
policies.setCustomMetricLabels(currentLabels);
}
});
}

private Set<String> getAllowedCustomMetricLabelKeys() {
String allowedKeysStr = pulsar().getConfiguration().getAllowedCustomMetricLabelKeys();
if (allowedKeysStr == null || allowedKeysStr.trim().isEmpty()) {
return Set.of();
}
return Set.of(allowedKeysStr.split(","));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5131,5 +5131,95 @@ public void getMessageIDByIndex(@Suspended final AsyncResponse asyncResponse,
});
}

@POST
@Path("/{tenant}/{namespace}/{topic}/customMetricLabels")
@ApiOperation(value = "Set custom metric labels for a topic")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Operation successful"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic doesn't exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled"),
@ApiResponse(code = 412, message = "Feature is disabled or invalid label keys/values"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void setCustomMetricLabels(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Custom metric labels") Map<String, String> labels) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE)
.thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalSetCustomMetricLabels(labels, isGlobal))
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("setCustomMetricLabels", ex, asyncResponse);
return null;
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/customMetricLabels")
@ApiOperation(value = "Get custom metric labels for a topic", response = Map.class)
@ApiResponses(value = {
@ApiResponse(code = 200, message = "OK"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled"),
@ApiResponse(code = 412, message = "Feature is disabled"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getCustomMetricLabels(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.READ)
.thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalGetCustomMetricLabels(isGlobal))
.thenApply(asyncResponse::resume).exceptionally(ex -> {
handleTopicPolicyException("getCustomMetricLabels", ex, asyncResponse);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/customMetricLabels")
@ApiOperation(value = "Remove custom metric labels from a topic")
@ApiResponses(value = {
@ApiResponse(code = 204, message = "Operation successful"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled"),
@ApiResponse(code = 412, message = "Feature is disabled"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeCustomMetricLabels(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("all") @DefaultValue("false") boolean removeAll,
@QueryParam(value = "List of keys to remove, or null to remove all") List<String> keys) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperationAsync(topicName, PolicyName.MAX_SUBSCRIPTIONS, PolicyOperation.WRITE)
.thenCompose(__ -> preValidation(authoritative))
.thenCompose(__ -> internalRemoveCustomMetricLabels(removeAll, keys, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully removed custom metric labels: tenant={}, namespace={}, topic={}, isGlobal={}",
clientAppId(), tenant, namespace, topicName.getLocalName(), isGlobal);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
handleTopicPolicyException("removeCustomMetricLabels", ex, asyncResponse);
return null;
});
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ protected void updateTopicPolicy(TopicPolicies data) {
topicPolicies.getEntryFilters().updateTopicValue(data.getEntryFilters(), isGlobalPolicies);
topicPolicies.getDispatcherPauseOnAckStatePersistentEnabled()
.updateTopicValue(data.getDispatcherPauseOnAckStatePersistentEnabled(), isGlobalPolicies);
topicPolicies.getCustomMetricLabels().updateTopicValue(data.getCustomMetricLabels(), isGlobalPolicies);
this.subscriptionPolicies = data.getSubscriptionPolicies();

updateEntryFilters();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,25 @@ public static void generate(PulsarService pulsar, boolean includeTopicMetrics, b

brokerStats.updateStats(topicStats);

// Get and convert custom metric labels if feature is enabled
String[] customMetricLabelAndValues = null;
if (pulsar.getConfiguration().isExposeCustomTopicMetricLabelsEnabled()) {
Map<String, String> customMetricLabels =
topic.getHierarchyTopicPolicies().getCustomMetricLabels().get();
if (customMetricLabels != null && !customMetricLabels.isEmpty()) {
customMetricLabelAndValues = new String[customMetricLabels.size() * 2];
int index = 0;
for (Map.Entry<String, String> entry : customMetricLabels.entrySet()) {
customMetricLabelAndValues[index++] = entry.getKey();
customMetricLabelAndValues[index++] = entry.getValue();
}
}
}

if (includeTopicMetrics) {
topicsCount.add(1);
TopicStats.printTopicStats(stream, topicStats, compactorMXBean, cluster, namespace, name,
splitTopicAndPartitionIndexLabel);
splitTopicAndPartitionIndexLabel, customMetricLabelAndValues);
} else {
namespaceStats.updateStats(topicStats);
}
Expand Down
Loading
Loading