Skip to content

Commit

Permalink
[feat][misc] PIP-264: Add OpenTelemetry HTTP rate limiting filter met…
Browse files Browse the repository at this point in the history
…ric (apache#23042)
  • Loading branch information
dragosvictor authored and hanmz committed Feb 12, 2025
1 parent a017427 commit cd856f4
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
package org.apache.pulsar.broker.web;

import com.google.common.util.concurrent.RateLimiter;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.prometheus.client.Counter;
import java.io.IOException;
import javax.servlet.Filter;
Expand All @@ -33,15 +37,32 @@ public class RateLimitingFilter implements Filter {

private final RateLimiter limiter;

public RateLimitingFilter(double rateLimit) {
limiter = RateLimiter.create(rateLimit);
public static final String RATE_LIMIT_REQUEST_COUNT_METRIC_NAME =
"pulsar.web.filter.rate_limit.request.count";
private final LongCounter rateLimitRequestCounter;

public static final AttributeKey<String> RATE_LIMIT_RESULT =
AttributeKey.stringKey("pulsar.web.filter.rate_limit.result");
public enum Result {
ACCEPTED,
REJECTED;
public final Attributes attributes = Attributes.of(RATE_LIMIT_RESULT, name().toLowerCase());
}

@Deprecated
private static final Counter httpRejectedRequests = Counter.build()
.name("pulsar_broker_http_rejected_requests")
.help("Counter of HTTP requests rejected by rate limiting")
.register();

public RateLimitingFilter(double rateLimit, Meter meter) {
limiter = RateLimiter.create(rateLimit);
rateLimitRequestCounter = meter.counterBuilder(RATE_LIMIT_REQUEST_COUNT_METRIC_NAME)
.setDescription("Counter of HTTP requests processed by the rate limiting filter.")
.setUnit("{request}")
.build();
}

@Override
public void init(FilterConfig filterConfig) throws ServletException {
}
Expand All @@ -50,9 +71,11 @@ public void init(FilterConfig filterConfig) throws ServletException {
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
if (limiter.tryAcquire()) {
rateLimitRequestCounter.add(1, Result.ACCEPTED.attributes);
chain.doFilter(request, response);
} else {
httpRejectedRequests.inc();
rateLimitRequestCounter.add(1, Result.REJECTED.attributes);
HttpServletResponse httpResponse = (HttpServletResponse) response;
httpResponse.sendError(429, "Too Many Requests");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ private static class FilterInitializer {

if (config.isHttpRequestsLimitEnabled()) {
filterHolders.add(new FilterHolder(
new RateLimitingFilter(config.getHttpRequestsMaxPerSecond())));
new RateLimitingFilter(config.getHttpRequestsMaxPerSecond(),
pulsarService.getOpenTelemetry().getMeter())));
}

// wait until the PulsarService is ready to serve incoming requests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
*/
package org.apache.pulsar.broker.web;

import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
import static org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand Down Expand Up @@ -61,6 +61,7 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.broker.web.RateLimitingFilter.Result;
import org.apache.pulsar.broker.web.WebExecutorThreadPoolStats.LimitType;
import org.apache.pulsar.broker.web.WebExecutorThreadPoolStats.UsageType;
import org.apache.pulsar.client.admin.PulsarAdmin;
Expand Down Expand Up @@ -270,12 +271,29 @@ public void testTlsAuthDisallowInsecure() throws Exception {
public void testRateLimiting() throws Exception {
setupEnv(false, false, false, false, 10.0, false);

// setupEnv makes a HTTP call to create the cluster.
var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
assertMetricLongSumValue(metrics, RateLimitingFilter.RATE_LIMIT_REQUEST_COUNT_METRIC_NAME,
Result.ACCEPTED.attributes, 1);
assertThat(metrics).noneSatisfy(metricData -> assertThat(metricData)
.hasName(RateLimitingFilter.RATE_LIMIT_REQUEST_COUNT_METRIC_NAME)
.hasLongSumSatisfying(
sum -> sum.hasPointsSatisfying(point -> point.hasAttributes(Result.REJECTED.attributes))));

// Make requests without exceeding the max rate
for (int i = 0; i < 5; i++) {
makeHttpRequest(false, false);
Thread.sleep(200);
}

metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
assertMetricLongSumValue(metrics, RateLimitingFilter.RATE_LIMIT_REQUEST_COUNT_METRIC_NAME,
Result.ACCEPTED.attributes, 6);
assertThat(metrics).noneSatisfy(metricData -> assertThat(metricData)
.hasName(RateLimitingFilter.RATE_LIMIT_REQUEST_COUNT_METRIC_NAME)
.hasLongSumSatisfying(
sum -> sum.hasPointsSatisfying(point -> point.hasAttributes(Result.REJECTED.attributes))));

try {
for (int i = 0; i < 500; i++) {
makeHttpRequest(false, false);
Expand All @@ -285,6 +303,12 @@ public void testRateLimiting() throws Exception {
} catch (IOException e) {
assertTrue(e.getMessage().contains("429"));
}

metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
assertMetricLongSumValue(metrics, RateLimitingFilter.RATE_LIMIT_REQUEST_COUNT_METRIC_NAME,
Result.ACCEPTED.attributes, value -> assertThat(value).isGreaterThan(6));
assertMetricLongSumValue(metrics, RateLimitingFilter.RATE_LIMIT_REQUEST_COUNT_METRIC_NAME,
Result.REJECTED.attributes, value -> assertThat(value).isPositive());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
public class PulsarWorkerOpenTelemetry implements Closeable {

public static final String SERVICE_NAME = "pulsar-function-worker";
public static final String INSTRUMENTATION_SCOPE_NAME = "org.apache.pulsar.function_worker";

private final OpenTelemetryService openTelemetryService;

@Getter
Expand All @@ -38,7 +40,7 @@ public PulsarWorkerOpenTelemetry(WorkerConfig workerConfig) {
.serviceName(SERVICE_NAME)
.serviceVersion(PulsarVersion.getVersion())
.build();
meter = openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.function_worker");
meter = openTelemetryService.getOpenTelemetry().getMeter(INSTRUMENTATION_SCOPE_NAME);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.worker.rest;

import io.opentelemetry.api.OpenTelemetry;
import io.prometheus.client.jetty.JettyStatisticsCollector;
import java.util.ArrayList;
import java.util.EnumSet;
Expand All @@ -30,6 +31,7 @@
import org.apache.pulsar.broker.web.JettyRequestLogFactory;
import org.apache.pulsar.broker.web.RateLimitingFilter;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
import org.apache.pulsar.functions.worker.PulsarWorkerOpenTelemetry;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource;
Expand Down Expand Up @@ -219,7 +221,8 @@ private static class FilterInitializer {

if (config.isHttpRequestsLimitEnabled()) {
filterHolders.add(new FilterHolder(
new RateLimitingFilter(config.getHttpRequestsMaxPerSecond())));
new RateLimitingFilter(config.getHttpRequestsMaxPerSecond(),
OpenTelemetry.noop().getMeter(PulsarWorkerOpenTelemetry.INSTRUMENTATION_SCOPE_NAME))));
}

if (config.isAuthenticationEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.proxy.server;

import static org.apache.pulsar.proxy.server.AdminProxyHandler.INIT_PARAM_REQUEST_BUFFER_SIZE;
import io.opentelemetry.api.OpenTelemetry;
import io.prometheus.client.jetty.JettyStatisticsCollector;
import java.io.IOException;
import java.net.URI;
Expand All @@ -37,6 +38,7 @@
import org.apache.pulsar.broker.web.RateLimitingFilter;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
import org.apache.pulsar.jetty.tls.JettySslContextFactory;
import org.apache.pulsar.proxy.stats.PulsarProxyOpenTelemetry;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.ConnectionLimit;
import org.eclipse.jetty.server.Connector;
Expand Down Expand Up @@ -191,7 +193,8 @@ private static class FilterInitializer {

if (config.isHttpRequestsLimitEnabled()) {
filterHolders.add(new FilterHolder(
new RateLimitingFilter(config.getHttpRequestsMaxPerSecond())));
new RateLimitingFilter(config.getHttpRequestsMaxPerSecond(),
OpenTelemetry.noop().getMeter(PulsarProxyOpenTelemetry.INSTRUMENTATION_SCOPE_NAME))));
}

if (config.isAuthenticationEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
public class PulsarProxyOpenTelemetry implements Closeable {

public static final String SERVICE_NAME = "pulsar-proxy";
public static final String INSTRUMENTATION_SCOPE_NAME = "org.apache.pulsar.proxy";

private final OpenTelemetryService openTelemetryService;

@Getter
Expand All @@ -39,7 +41,7 @@ public PulsarProxyOpenTelemetry(ProxyConfiguration config) {
.serviceName(SERVICE_NAME)
.serviceVersion(PulsarVersion.getVersion())
.build();
meter = openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.proxy");
meter = openTelemetryService.getOpenTelemetry().getMeter(INSTRUMENTATION_SCOPE_NAME);
}

@Override
Expand Down

0 comments on commit cd856f4

Please sign in to comment.