diff --git a/hudi-client/hudi-client-common/pom.xml b/hudi-client/hudi-client-common/pom.xml index 1778e2e98b48b..9a4a7f2104b92 100644 --- a/hudi-client/hudi-client-common/pom.xml +++ b/hudi-client/hudi-client-common/pom.xml @@ -120,6 +120,16 @@ io.prometheus simpleclient_pushgateway + + com.uber.m3 + tally-m3 + ${tally.version} + + + com.uber.m3 + tally-core + ${tally.version} + diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 9447069a99570..93691e8cdae9b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -68,6 +68,7 @@ import org.apache.hudi.config.metrics.HoodieMetricsDatadogConfig; import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig; import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig; +import org.apache.hudi.config.metrics.HoodieMetricsM3Config; import org.apache.hudi.config.metrics.HoodieMetricsPrometheusConfig; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; @@ -2238,6 +2239,26 @@ public int getGraphiteReportPeriodSeconds() { return getInt(HoodieMetricsGraphiteConfig.GRAPHITE_REPORT_PERIOD_IN_SECONDS); } + public String getM3ServerHost() { + return getString(HoodieMetricsM3Config.M3_SERVER_HOST_NAME); + } + + public int getM3ServerPort() { + return getInt(HoodieMetricsM3Config.M3_SERVER_PORT_NUM); + } + + public String getM3Tags() { + return getString(HoodieMetricsM3Config.M3_TAGS); + } + + public String getM3Env() { + return getString(HoodieMetricsM3Config.M3_ENV); + } + + public String getM3Service() { + return getString(HoodieMetricsM3Config.M3_SERVICE); + } + public String getJmxHost() { return getString(HoodieMetricsJmxConfig.JMX_HOST_NAME); } @@ -2745,6 +2766,7 @@ public static class Builder { private boolean isPreCommitValidationConfigSet = false; private boolean isMetricsJmxConfigSet = false; private boolean isMetricsGraphiteConfigSet = false; + private boolean isMetricsM3ConfigSet = false; private boolean isLayoutConfigSet = false; public Builder withEngineType(EngineType engineType) { @@ -2984,6 +3006,12 @@ public Builder withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig mericsGraph return this; } + public Builder withMetricsM3Config(HoodieMetricsM3Config metricsM3Config) { + writeConfig.getProps().putAll(metricsM3Config.getProps()); + isMetricsM3ConfigSet = true; + return this; + } + public Builder withPreCommitValidatorConfig(HoodiePreCommitValidatorConfig validatorConfig) { writeConfig.getProps().putAll(validatorConfig.getProps()); isPreCommitValidationConfigSet = true; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsM3Config.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsM3Config.java new file mode 100644 index 0000000000000..cc675eebfbbf4 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsM3Config.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config.metrics; + +import static org.apache.hudi.config.metrics.HoodieMetricsConfig.METRIC_PREFIX; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; + +/** + * Configs for M3 reporter type. + *

+ * {@link org.apache.hudi.metrics.MetricsReporterType#M3} + */ +@ConfigClassProperty(name = "Metrics Configurations for M3", + groupName = ConfigGroups.Names.METRICS, + description = "Enables reporting on Hudi metrics using M3. " + + " Hudi publishes metrics on every commit, clean, rollback etc.") +public class HoodieMetricsM3Config extends HoodieConfig { + + public static final String M3_PREFIX = METRIC_PREFIX + ".m3"; + + public static final ConfigProperty M3_SERVER_HOST_NAME = ConfigProperty + .key(M3_PREFIX + ".host") + .defaultValue("localhost") + .withDocumentation("M3 host to connect to."); + + public static final ConfigProperty M3_SERVER_PORT_NUM = ConfigProperty + .key(M3_PREFIX + ".port") + .defaultValue(9052) + .withDocumentation("M3 port to connect to."); + + public static final ConfigProperty M3_TAGS = ConfigProperty + .key(M3_PREFIX + ".tags") + .defaultValue("") + .withDocumentation("Optional M3 tags applied to all metrics."); + + public static final ConfigProperty M3_ENV = ConfigProperty + .key(M3_PREFIX + ".env") + .defaultValue("production") + .withDocumentation("M3 tag to label the environment (defaults to 'production'), " + + "applied to all metrics."); + + public static final ConfigProperty M3_SERVICE = ConfigProperty + .key(M3_PREFIX + ".service") + .defaultValue("hoodie") + .withDocumentation("M3 tag to label the service name (defaults to 'hoodie'), " + + "applied to all metrics."); + + private HoodieMetricsM3Config() { + super(); + } + + public static HoodieMetricsM3Config.Builder newBuilder() { + return new HoodieMetricsM3Config.Builder(); + } + + public static class Builder { + + private final HoodieMetricsM3Config hoodieMetricsM3Config = new HoodieMetricsM3Config(); + + public HoodieMetricsM3Config.Builder fromFile(File propertiesFile) throws IOException { + try (FileReader reader = new FileReader(propertiesFile)) { + this.hoodieMetricsM3Config.getProps().load(reader); + return this; + } + } + + public HoodieMetricsM3Config.Builder fromProperties(Properties props) { + this.hoodieMetricsM3Config.getProps().putAll(props); + return this; + } + + public HoodieMetricsM3Config.Builder toM3Host(String host) { + hoodieMetricsM3Config.setValue(M3_SERVER_HOST_NAME, host); + return this; + } + + public HoodieMetricsM3Config.Builder onM3Port(int port) { + hoodieMetricsM3Config.setValue(M3_SERVER_PORT_NUM, String.valueOf(port)); + return this; + } + + public HoodieMetricsM3Config.Builder useM3Tags(String tags) { + hoodieMetricsM3Config.setValue(M3_TAGS, tags); + return this; + } + + public HoodieMetricsM3Config.Builder useM3Env(String env) { + hoodieMetricsM3Config.setValue(M3_ENV, env); + return this; + } + + public HoodieMetricsM3Config.Builder useM3Service(String service) { + hoodieMetricsM3Config.setValue(M3_SERVICE, service); + return this; + } + + public HoodieMetricsM3Config build() { + hoodieMetricsM3Config.setDefaults(HoodieMetricsM3Config.class.getName()); + return hoodieMetricsM3Config; + } + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java index 243b74b9199ef..76fffd5d0df09 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java @@ -37,6 +37,7 @@ import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig; import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig; +import org.apache.hudi.config.metrics.HoodieMetricsM3Config; import org.apache.hudi.config.metrics.HoodieMetricsPrometheusConfig; import org.apache.hudi.config.metrics.HoodieMetricsDatadogConfig; import org.apache.hudi.exception.HoodieMetadataException; @@ -183,6 +184,15 @@ public static HoodieWriteConfig createMetadataWriteConfig( .withPushgatewayPortNum(writeConfig.getPushGatewayPort()).build(); builder.withProperties(prometheusConfig.getProps()); break; + case M3: + HoodieMetricsM3Config m3Config = HoodieMetricsM3Config.newBuilder() + .onM3Port(writeConfig.getM3ServerPort()) + .toM3Host(writeConfig.getM3ServerHost()) + .useM3Tags(writeConfig.getM3Tags()) + .useM3Service(writeConfig.getM3Service()) + .useM3Env(writeConfig.getM3Env()).build(); + builder.withProperties(m3Config.getProps()); + break; case DATADOG: HoodieMetricsDatadogConfig.Builder datadogConfig = HoodieMetricsDatadogConfig.newBuilder() .withDatadogApiKey(writeConfig.getDatadogApiKey()) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java index 27034735a040c..0d20337fa5c54 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java @@ -27,6 +27,7 @@ import org.apache.hudi.metrics.cloudwatch.CloudWatchMetricsReporter; import org.apache.hudi.metrics.custom.CustomizableMetricsReporter; import org.apache.hudi.metrics.datadog.DatadogMetricsReporter; +import org.apache.hudi.metrics.m3.M3MetricsReporter; import org.apache.hudi.metrics.prometheus.PrometheusReporter; import org.apache.hudi.metrics.prometheus.PushGatewayMetricsReporter; @@ -89,6 +90,9 @@ public static Option createReporter(HoodieWriteConfig config, M case CLOUDWATCH: reporter = new CloudWatchMetricsReporter(config, registry); break; + case M3: + reporter = new M3MetricsReporter(config, registry); + break; default: LOG.error("Reporter type[" + type + "] is not supported."); break; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java index 3c8600159287c..6d05e443e6b9c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java @@ -22,5 +22,5 @@ * Types of the reporter supported, hudi also supports user defined reporter. */ public enum MetricsReporterType { - GRAPHITE, INMEMORY, JMX, DATADOG, CONSOLE, PROMETHEUS_PUSHGATEWAY, PROMETHEUS, CLOUDWATCH + GRAPHITE, INMEMORY, JMX, DATADOG, CONSOLE, PROMETHEUS_PUSHGATEWAY, PROMETHEUS, CLOUDWATCH, M3 } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/m3/M3MetricsReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/m3/M3MetricsReporter.java new file mode 100644 index 0000000000000..a658476ef7544 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/m3/M3MetricsReporter.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metrics.m3; + +import com.codahale.metrics.MetricRegistry; +import com.uber.m3.tally.m3.M3Reporter; +import com.uber.m3.util.Duration; +import com.uber.m3.util.ImmutableMap; +import com.uber.m3.tally.RootScopeBuilder; +import com.uber.m3.tally.Scope; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metrics.MetricsReporter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of M3 Metrics reporter, which can report metrics to a https://m3db.io/ service + */ +public class M3MetricsReporter extends MetricsReporter { + + private static final Logger LOG = LoggerFactory.getLogger(M3MetricsReporter.class); + private final HoodieWriteConfig config; + private final MetricRegistry registry; + private final ImmutableMap tags; + + public M3MetricsReporter(HoodieWriteConfig config, MetricRegistry registry) { + this.config = config; + this.registry = registry; + + ImmutableMap.Builder tagBuilder = new ImmutableMap.Builder<>(); + tagBuilder.putAll(parseOptionalTags(config.getM3Tags())); + tagBuilder.put("service", config.getM3Service()); + tagBuilder.put("env", config.getM3Env()); + this.tags = tagBuilder.build(); + LOG.info(String.format("Building M3 Reporter with M3 tags mapping: %s", tags)); + } + + private static Map parseOptionalTags(String tagValueString) { + Map parsedTags = new HashMap(); + if (!tagValueString.isEmpty()) { + Arrays.stream(tagValueString.split(",")).forEach((tagValuePair) -> { + String[] parsedTagValuePair = Arrays.stream(tagValuePair.split("=")) + .map((tagOrValue) -> tagOrValue.trim()).filter((tagOrValue) -> !tagOrValue.isEmpty()) + .toArray(String[]::new); + if (parsedTagValuePair.length != 2) { + throw new RuntimeException(String.format( + "M3 Reporter tags cannot be initialized with tags [%s] due to not being in format `tag=value, . . .`.", + tagValuePair)); + } + parsedTags.put(parsedTagValuePair[0], parsedTagValuePair[1]); + }); + } + return parsedTags; + } + + @Override + public void start() {} + + @Override + public void report() { + /* + Although com.uber.m3.tally.Scope supports automatically submitting metrics in an interval + via a background task, it does not seem to support + - an API for explicitly flushing/emitting all metrics + - Taking in an external com.codahale.metrics.MetricRegistry metrics registry and automatically + adding any new counters/gauges whenever they are added to the registry + Due to this, this implementation emits metrics by creating a Scope, adding all metrics from + the HUDI metircs registry as counters/gauges to the scope, and then closing the Scope. Since + closing this Scope will implicitly flush all M3 metrics, the reporting intervals + are configured to be Integer.MAX_VALUE. + */ + synchronized (this) { + try (Scope scope = new RootScopeBuilder() + .reporter(new M3Reporter.Builder( + new InetSocketAddress(config.getM3ServerHost(), config.getM3ServerPort())) + .includeHost(true).commonTags(tags) + .build()) + .reportEvery(Duration.ofSeconds(Integer.MAX_VALUE)) + .tagged(tags)) { + + M3ScopeReporterAdaptor scopeReporter = new M3ScopeReporterAdaptor(registry, scope); + scopeReporter.start(Integer.MAX_VALUE, TimeUnit.SECONDS); + scopeReporter.report(); + scopeReporter.stop(); + } catch (Exception e) { + LOG.error(String.format("Error reporting metrics to M3: %s", e)); + } + } + } + + @Override + public void stop() {} +} + + + + + + diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/m3/M3ScopeReporterAdaptor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/m3/M3ScopeReporterAdaptor.java new file mode 100644 index 0000000000000..ae66914400b9b --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/m3/M3ScopeReporterAdaptor.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metrics.m3; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Metered; +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.Timer; +import com.uber.m3.tally.Scope; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedMap; +import java.util.concurrent.TimeUnit; +import org.apache.hudi.common.util.collection.Pair; + +/** + * Implementation of com.codahale.metrics.ScheduledReporter, to emit metrics from + * com.codahale.metrics.MetricRegistry to M3 + */ +public class M3ScopeReporterAdaptor extends ScheduledReporter { + private final Scope scope; + + protected M3ScopeReporterAdaptor(MetricRegistry registry, Scope scope) { + super(registry, "hudi-m3-reporter", MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.SECONDS); + this.scope = scope; + } + + @Override + public void start(long period, TimeUnit unit) { + } + + @Override + public void stop() { + } + + @Override + public void report(SortedMap gauges, SortedMap counters, + SortedMap histograms, SortedMap meters, + SortedMap timers) { + /* + When reporting, process each com.codahale.metrics metric and add counters & gauges to + the passed-in com.uber.m3.tally.Scope with the same name and value. This is needed + for the Scope to register these metrics + */ + report(scope, + gauges, + counters, + histograms, + meters, + timers); + } + + private void report(Scope scope, + Map gauges, + Map counters, + Map histograms, + Map meters, + Map timers) { + + for (Entry entry : gauges.entrySet()) { + scope.gauge(entry.getKey()).update( + ((Number) entry.getValue().getValue()).doubleValue()); + } + + for (Entry entry : counters.entrySet()) { + scope.counter(entry.getKey()).inc( + ((Number) entry.getValue().getCount()).longValue()); + } + + for (Entry entry : histograms.entrySet()) { + scope.gauge(MetricRegistry.name(entry.getKey(), "count")).update( + entry.getValue().getCount()); + reportSnapshot(entry.getKey(), entry.getValue().getSnapshot()); + } + + for (Entry entry : meters.entrySet()) { + reportMetered(entry.getKey(), entry.getValue()); + } + + for (Entry entry : timers.entrySet()) { + reportTimer(entry.getKey(), entry.getValue()); + } + } + + private void reportMetered(String name, Metered meter) { + scope.counter(MetricRegistry.name(name, "count")).inc(meter.getCount()); + List> meterGauges = Arrays.asList( + Pair.of("m1_rate", meter.getOneMinuteRate()), + Pair.of("m5_rate", meter.getFiveMinuteRate()), + Pair.of("m15_rate", meter.getFifteenMinuteRate()), + Pair.of("mean_rate", meter.getMeanRate()) + ); + for (Pair pair : meterGauges) { + scope.gauge(MetricRegistry.name(name, pair.getLeft())).update(pair.getRight()); + } + } + + private void reportSnapshot(String name, Snapshot snapshot) { + List> snapshotGauges = Arrays.asList( + Pair.of("max", snapshot.getMax()), + Pair.of("mean", snapshot.getMean()), + Pair.of("min", snapshot.getMin()), + Pair.of("stddev", snapshot.getStdDev()), + Pair.of("p50", snapshot.getMedian()), + Pair.of("p75", snapshot.get75thPercentile()), + Pair.of("p95", snapshot.get95thPercentile()), + Pair.of("p98", snapshot.get98thPercentile()), + Pair.of("p99", snapshot.get99thPercentile()), + Pair.of("p999", snapshot.get999thPercentile()) + ); + for (Pair pair : snapshotGauges) { + scope.gauge(MetricRegistry.name(name, pair.getLeft())).update(pair.getRight().doubleValue()); + } + } + + private void reportTimer(String name, Timer timer) { + reportMetered(name, timer); + reportSnapshot(name, timer.getSnapshot()); + } + +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java new file mode 100644 index 0000000000000..e7299d706b894 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metrics.m3; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; + +import java.util.UUID; +import org.apache.hudi.common.testutils.NetworkTestUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metrics.HoodieMetrics; +import org.apache.hudi.metrics.Metrics; +import org.apache.hudi.metrics.MetricsReporterType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class TestM3Metrics { + + @Mock + HoodieWriteConfig config; + HoodieMetrics hoodieMetrics; + Metrics metrics; + + @BeforeEach + public void start() { + when(config.isMetricsOn()).thenReturn(true); + when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.M3); + when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID()); + } + + @Test + public void testRegisterGauge() { + when(config.getM3ServerHost()).thenReturn("localhost"); + when(config.getM3ServerPort()).thenReturn(NetworkTestUtils.nextFreePort()); + when(config.getTableName()).thenReturn("raw_table"); + when(config.getM3Env()).thenReturn("dev"); + when(config.getM3Service()).thenReturn("hoodie"); + when(config.getM3Tags()).thenReturn("tag1=value1,tag2=value2"); + when(config.getMetricReporterMetricsNamePrefix()).thenReturn(""); + hoodieMetrics = new HoodieMetrics(config); + metrics = hoodieMetrics.getMetrics(); + metrics.registerGauge("metric1", 123L); + assertEquals("123", metrics.getRegistry().getGauges().get("metric1").getValue().toString()); + metrics.shutdown(); + } + + @Test + public void testEmptyM3Tags() { + when(config.getM3ServerHost()).thenReturn("localhost"); + when(config.getM3ServerPort()).thenReturn(NetworkTestUtils.nextFreePort()); + when(config.getTableName()).thenReturn("raw_table"); + when(config.getM3Env()).thenReturn("dev"); + when(config.getM3Service()).thenReturn("hoodie"); + when(config.getM3Tags()).thenReturn(""); + when(config.getMetricReporterMetricsNamePrefix()).thenReturn(""); + hoodieMetrics = new HoodieMetrics(config); + metrics = hoodieMetrics.getMetrics(); + metrics.registerGauge("metric1", 123L); + assertEquals("123", metrics.getRegistry().getGauges().get("metric1").getValue().toString()); + metrics.shutdown(); + } + + @Test + public void testInvalidM3Tags() { + when(config.getTableName()).thenReturn("raw_table"); + when(config.getMetricReporterMetricsNamePrefix()).thenReturn(""); + assertThrows(RuntimeException.class, () -> { + hoodieMetrics = new HoodieMetrics(config); + }); + } +} diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml index cdbacfd8d5591..064ce639e8180 100644 --- a/packaging/hudi-flink-bundle/pom.xml +++ b/packaging/hudi-flink-bundle/pom.xml @@ -127,6 +127,8 @@ io.prometheus:simpleclient_dropwizard io.prometheus:simpleclient_pushgateway io.prometheus:simpleclient_common + com.uber.m3:tally-m3 + com.uber.m3:tally-core org.eclipse.jetty:* @@ -210,6 +212,10 @@ org.openjdk.jol. org.apache.hudi.org.openjdk.jol. + + com.uber.m3. + org.apache.hudi.com.uber.m3. + diff --git a/packaging/hudi-integ-test-bundle/pom.xml b/packaging/hudi-integ-test-bundle/pom.xml index 5b3974ad08deb..a21b0ac110a20 100644 --- a/packaging/hudi-integ-test-bundle/pom.xml +++ b/packaging/hudi-integ-test-bundle/pom.xml @@ -164,6 +164,8 @@ io.prometheus:simpleclient_dropwizard io.prometheus:simpleclient_pushgateway io.prometheus:simpleclient_common + com.uber.m3:tally-m3 + com.uber.m3:tally-core org.openjdk.jol:jol-core @@ -272,6 +274,10 @@ org.eclipse.jetty. org.apache.hudi.org.eclipse.jetty. + + com.uber.m3. + org.apache.hudi.com.uber.m3. + diff --git a/packaging/hudi-kafka-connect-bundle/pom.xml b/packaging/hudi-kafka-connect-bundle/pom.xml index 5431d83884926..3cc55e6dbeaf1 100644 --- a/packaging/hudi-kafka-connect-bundle/pom.xml +++ b/packaging/hudi-kafka-connect-bundle/pom.xml @@ -125,6 +125,8 @@ io.prometheus:simpleclient_dropwizard io.prometheus:simpleclient_pushgateway io.prometheus:simpleclient_common + com.uber.m3:tally-m3 + com.uber.m3:tally-core com.google.protobuf:protobuf-java org.scala-lang:* @@ -182,6 +184,10 @@ com.fasterxml.jackson. org.apache.hudi.com.fasterxml.jackson. + + com.uber.m3. + org.apache.hudi.com.uber.m3. + diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml index 8b85def7c77d0..9f60db19e0292 100644 --- a/packaging/hudi-spark-bundle/pom.xml +++ b/packaging/hudi-spark-bundle/pom.xml @@ -112,6 +112,9 @@ io.prometheus:simpleclient_dropwizard io.prometheus:simpleclient_pushgateway io.prometheus:simpleclient_common + com.uber.m3:tally-m3 + com.uber.m3:tally-core + com.yammer.metrics:metrics-core org.apache.hive:hive-common @@ -201,6 +204,10 @@ org.roaringbitmap. org.apache.hudi.org.roaringbitmap. + + com.uber.m3. + org.apache.hudi.com.uber.m3. + diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml index 50dd9ef3e21f9..b992e5bbeb8c4 100644 --- a/packaging/hudi-utilities-bundle/pom.xml +++ b/packaging/hudi-utilities-bundle/pom.xml @@ -141,6 +141,8 @@ io.prometheus:simpleclient_dropwizard io.prometheus:simpleclient_pushgateway io.prometheus:simpleclient_common + com.uber.m3:tally-m3 + com.uber.m3:tally-core org.apache.spark:spark-streaming-kafka-0-10_${scala.binary.version} org.apache.spark:spark-token-provider-kafka-0-10_${scala.binary.version} org.apache.kafka:kafka_${scala.binary.version} @@ -237,6 +239,10 @@ org.roaringbitmap. org.apache.hudi.org.roaringbitmap. + + com.uber.m3. + org.apache.hudi.com.uber.m3. + diff --git a/packaging/hudi-utilities-slim-bundle/pom.xml b/packaging/hudi-utilities-slim-bundle/pom.xml index 23e9123d61a51..3919b103465c4 100644 --- a/packaging/hudi-utilities-slim-bundle/pom.xml +++ b/packaging/hudi-utilities-slim-bundle/pom.xml @@ -127,6 +127,8 @@ io.prometheus:simpleclient_dropwizard io.prometheus:simpleclient_pushgateway io.prometheus:simpleclient_common + com.uber.m3:tally-m3 + com.uber.m3:tally-core org.apache.spark:spark-streaming-kafka-0-10_${scala.binary.version} org.apache.spark:spark-token-provider-kafka-0-10_${scala.binary.version} org.apache.kafka:kafka_${scala.binary.version} @@ -196,6 +198,10 @@ com.google.protobuf. org.apache.hudi.com.google.protobuf. + + com.uber.m3. + org.apache.hudi.com.uber.m3. + diff --git a/pom.xml b/pom.xml index fbb41ccd130c1..65e947a747527 100644 --- a/pom.xml +++ b/pom.xml @@ -130,6 +130,7 @@ 1.5.6 0.9.47 0.25 + 0.13.0 0.8.0 4.5.13 4.4.13 @@ -1110,7 +1111,6 @@ metrics-jmx ${metrics.version} - io.prometheus simpleclient @@ -1131,6 +1131,16 @@ simpleclient_pushgateway ${prometheus.version} + + com.uber.m3 + tally-m3 + ${tally.version} + + + com.uber.m3 + tally-core + ${tally.version} + com.beust